123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203 |
- #include "stdafx.h"
- #define _WINSOCK_DEPRECATED_NO_WARNINGS
- #include <Winsock2.h>
- #include <vector>
- #include <string>
- #include "eBusService.h"
- #include "eBus.h"
- #include "base64.h"
- #include "Logger.h"
- #include "PacketAnalizer.h"
- #include "shareMemWR.h"
- #include "P2PModule.h"
- #include "ShareMemory_IF.h"
- #include "ShareMemory_Block.h"
- //#include "common_api.h"
- using namespace std;
- string GetFileTitle(string &fullpath)
- {
- string ret = "";
- string::size_type firstHit = fullpath.find_last_of('\\');
- string::size_type firstHitSec = fullpath.find_last_of('/');
- if ((firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size())) && (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size())))
- {
- return ret;
- }
- if (firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size()))
- {
- ret = fullpath.substr(firstHitSec + 1);
- }
- else if (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size()))
- {
- ret = fullpath.substr(firstHit + 1);
- }
- else
- {
- //effective both
- if (firstHit > firstHitSec)
- {
- ret = fullpath.substr(firstHit + 1);//kick last
- }
- else
- {
- ret = fullpath.substr(firstHitSec + 1);//kick last
- }
- }
- //got filetitle with ext
- if (ret.size() > 0)
- {
- string::size_type firstHit = ret.find_first_of('.');
- ret = ret.substr(0, firstHit);
- }
- return ret;
- }
- /*
- string GetProcessDirectory()
- {
- string ret = "";
- char szFilename[MAX_PATH] = { 0 };
- DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH);
- if (res == 0)
- {
- return ret;
- }
- string fullpath = szFilename;
- string::size_type firstHit = fullpath.find_last_of('\\');
- if (firstHit == string::npos || firstHit == 0)
- {
- return ret;
- }
- ret = fullpath.substr(0, firstHit);//kick last \
- return ret;
- }*/
- string GeteBusLogPath()
- {
- string ret = "";
- char szFilename[MAX_PATH] = { 0 };
- DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH);
- if (res == 0)
- {
- return ret;
- }
- string fullpath = szFilename;
- string FileTitle = GetFileTitle(fullpath);
- string logfile = GetProcessDirectory() + "\\logs\\" + FileTitle;
- SYSTEMTIME st = { 0 };
- GetLocalTime(&st);
- //日,时,分,秒,毫
- sprintf_s(szFilename, "_%u_%02d%02d%02d%02d%03d.log",GetCurrentProcessId(),st.wDay,st.wHour, st.wMinute,st.wSecond,st.wMilliseconds);
- logfile += szFilename;
- return logfile;
- }
- void IniteBusService(bool EnableLog)
- {
- string logfile = GeteBusLogPath();
- BUSC::Client::OpenLogger(logfile.c_str());
- BUSC::Client::Prepare();
- }
- void QuiteBusService()
- {
- BUSC::Client::Quit();
- }
- vector<string> GetZeroConfigIP(DWORD trytime, DWORD timeperiod)
- {
- vector<string> theList;
- //string local = "";
- //for (DWORD i = 0; i < trytime; i++)
- //{
- // Array <ECOM::ZeroConfig::ServiceNode> arNode;
- // ECOM::ZeroConfig::ZeroConfigClient::QueryAll(ECOM::ZeroConfig::ServiceTypeID::CCOSHardware, &arNode);
- // for (Array <ECOM::ZeroConfig::ServiceNode>::Iterator Iter(arNode); Iter; Iter++)
- // {
- // ECOM::ZeroConfig::ServiceNode & Node = Iter();
- // //PRINTA_INFO(m_Logger, "Got ZeroConfig IPAddress : %s\n", Node.IPAddress);
- // local = Node.IPAddress;
- // theList.push_back(local);
- // }
- // if (theList.size() > 0)
- // {
- // return theList;
- // }
- // Sleep(timeperiod);
- //}
- return theList;
- }
- bool FindRouterIpAddress(vector<string> &zeroConfigIp, string &serverIp)
- {
- if (zeroConfigIp.size() == 0)
- {
- //err log
- return false;
- }
- //zc ip is specific
- if ((serverIp).size() > 0)
- {
- for (DWORD i = 0; i < zeroConfigIp.size(); i++)
- {
- if (zeroConfigIp[i] == (serverIp))
- {
- //good to go
- return true;
- }
- }
- //no match
- return false;
- }
- //no optional ip or no match zc server
- //connect anyway if only one zc server exists
- if (zeroConfigIp.size() == 1)
- {
- //put some warning log here
- serverIp = zeroConfigIp[0];
- return true;
- }
- //multiple router...
- if (zeroConfigIp.size() > 1)
- {
- //put some warning log here
- //warning here
- //serverIp = zeroConfigIp[0];
- return false;
- }
- //err log
- return false;
- }
- string ReplaceSubString(string org, string &keystr, string &replacestr)
- {
- std::string::size_type pos = 0;
- std::string::size_type keylen = keystr.size();
- std::string::size_type replen = replacestr.size();
- while ((pos = org.find(keystr, pos)) != std::string::npos)
- {
- org.replace(pos, keylen, replacestr);
- pos += replen;
- }
- return org;
- }
- bool IsLocalip(string &ServerIp)
- {
- //for test
- string Temp;
- struct hostent* pHost;
- static string ExIp = "127.0.0.1";
- if (ServerIp == ExIp)
- {
- return true;
- }
- char cHostName[128] = { 0 };
- gethostname(cHostName, sizeof(cHostName));
- if (cHostName[0] != 0)
- {
- pHost = gethostbyname(cHostName);
- if (pHost)
- {
- for (int i = 0; pHost->h_addr_list[i] != NULL; i++)
- {
- Temp = "";
- Temp = inet_ntoa(*(struct in_addr *)pHost->h_addr_list[i]);
- Temp = ReplaceSubString(Temp, string(" "), string(""));//trim space
- if (Temp == ServerIp)
- {
- return true;
- }
- }
- }
- }
- return false;
- }
- eBusService::eBusService(void)
- {
- m_TickCount = 0;
- m_OfflineHit = 0;
- m_ConnectionStatus = false;
- m_Parent = NULL;
- m_pBlobReceiver = NULL;
- m_pBusLog = NULL;
- m_pCTMap = NULL;
- m_ConnectEvt = CreateEvent(0, 0, 0, 0);
- }
- eBusService::~eBusService(void)
- {
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
- if (m_pBusLog)
- {
- ReleseLogger(m_pBusLog);
- }
- if (m_pCTMap)
- {
- delete m_pCTMap;
- m_pCTMap = NULL;
- }
- CloseHandle(m_ConnectEvt);
- }
- void eBusService::SetParent(HANDLE Parent)
- {
- m_Parent = Parent;
- }
- bool eBusService::IsConnected()
- {
- if (m_ConnectionStatus)
- {
- if ((GetTickCount() - m_TickCount) > (120 * 1000))
- {
- printf("Hearbeat Timeout.CurTick:%d,LastTick:%d\n", GetTickCount(), m_TickCount);
- DisConnect();
- }
- }
- return m_ConnectionStatus;
- }
- bool eBusService::Connect(bool Local, DString &busId, DString &Ipaddress, DString &Port)
- {
- Thread_Lock();
- try {
- if (m_pCTMap == NULL)
- {
- m_pCTMap = new ClientsThreadMap();
- }
- do {
- m_Local = Local;
- m_busId = busId;
- m_Ipaddress = Ipaddress;
- m_Port = Port;
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
- ResetEvent(m_ConnectEvt);
- // Port = DString::From(BUSC::Client::GetDefStatusPort());
- m_pBlobReceiver = new BUSC::BLOBReceiver();
- if (m_pBlobReceiver == NULL)
- {
- printf("EBUS Client malloc Failed.!!!!!\n");
- Thread_UnLock();
- return false;
- }
- if (Local)
- {
- //Local Connection
- //Local:1
- //Port:eBusDefault or selfdefine
- //srcBusId:BusId
- //BUSC::MessageClient::SetLogFileName("AlphaLog.log");
- //BUSC::MessageClient::Prepare();
- //m_pBlobReceiver->Prepare();
- if (m_Ipaddress.IsEmpty())
- {
- m_Ipaddress = "127.0.0.1";
- }
- m_pBlobReceiver->SetSourceID(busId);
- m_pBlobReceiver->SetTcpNoDelay(true);
- //gaofei
- m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
- {
- SetEvent(m_ConnectEvt);
- });
- m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
- m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
- m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
- m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
- m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
-
- printf("========**** eBusService::Connect Thread 0X%08X \n", GetCurrentThreadId());
- Sleep(10000);
- m_pBlobReceiver->StartWait();
-
- if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
- {
- printf("========**** eBusService::Connect Thread Timeout Over 0X%08X \n", GetCurrentThreadId());
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
-
- printf("EBUS Connect Timeout.!!!!!\n");
- Thread_UnLock();
- return false;
- }
- printf("========**** eBusService::Connect Thread Over 0X%08X \n", GetCurrentThreadId());
- m_OfflineHit = 0;
- m_TickCount = GetTickCount();
- m_ConnectionStatus = true;
- printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
- Thread_UnLock();
- return true;
- }
- else if (Local == 0)
- {
- //Local:0
- //Ethernet Connection
- //RouterIp:ipaddress
- //vector<string> theIpList = GetZeroConfigIP(10, 1000);
- string Ip = Ipaddress;
- //if RouterIp is empty,eBus will find one .
- //if eBus found more than one,it will fail and return false.
- //eth connect only outside machine ip
- if (Ip.size() != 0 && IsLocalip(Ip) == false)
- {
- //if (Ip == "127.0.0.1" || FindRouterIpAddress(theIpList, Ip))
- {
- //we got routerIp,check org&new one
- //Ip = "192.168.2.77";
- m_Ipaddress = Ip.c_str();
- //if (IsLocalip(Ip))
- //{
- // m_Ipaddress = "";
- // m_TickCount = 0;
- // m_ConnectionStatus = false;
- // return false;
- //}
- //else
- {
- //Port:eBusDefault or selfdefine
- //srcBusId:BusId
- //BUSC::MessageClient::SetLogFileName("AlphaLog.log");
- //BUSC::MessageClient::Prepare();
- //m_pBlobReceiver->Prepare();
- //m_pBlobReceiver->SetLogFileName("AlphaLog_Eth.log");
- ResetEvent(m_ConnectEvt);
- m_pBlobReceiver->ConnectTo(m_Ipaddress);
- m_pBlobReceiver->SetSourceID(busId);
- //gaofei
- m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
- {
- SetEvent(m_ConnectEvt);
- });
- m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
- m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
- m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
- m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
- m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
- m_pBlobReceiver->SetTcpNoDelay(true);
- m_pBlobReceiver->StartWait();
- if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
- {
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
- printf("EBUS Connect Timeout.IP:%s!!!!!\n",(const char*)m_Ipaddress);
- Thread_UnLock();
- return false;
- }
- m_TickCount = GetTickCount();
- m_OfflineHit = 0;
- m_ConnectionStatus = true;
- printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
- Thread_UnLock();
- return true;
- }
- }
- }
- }
- else
- {
- //do nothing
- }
- } while (0);
- }
- catch (ResDataObjectExption &exp)
- {
- //exp.what()
- Logger *p = GetGlobalLogger();
- PRINTA_DEBUG(p, exp.what());
- }
- catch (...)
- {
- Logger *p = GetGlobalLogger();
- PRINTA_DEBUG(p, "Unknown Exp Happened\n");
- }
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
- m_TickCount =0;
- m_ConnectionStatus = false;
- Thread_UnLock();
- return false;
- }
- void eBusService::DisConnect()
- {
- //need to confirm multiple connection
- //BUSC::BLOBClient::Quit();
- Thread_Lock();
- if (m_pBlobReceiver)
- {
- m_pBlobReceiver->StopWait();
- delete m_pBlobReceiver;
- m_pBlobReceiver = NULL;
- }
- m_TickCount = 0;
- m_ConnectionStatus = false;
- if (m_pCTMap)
- {
- m_pCTMap->Disconnected();
- }
- printf("Disconnct Ebus.Local:%d,busId:%s,ServerIp:%s,Port:%s\n",m_Local,(const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
- Thread_UnLock();
- }
- void eBusService::SetLogPath(const char *pPath)
- {
- //DString dstrPath = pPath;
- //BUSC::MessageClient::Prepare();
- //BUSC::MessageClient::SetLogFileName(dstrPath);
- //m_pBlobReceiver->Prepare();
- //m_pBlobReceiver->SetLogFileName(dstrPath);
- if (m_pBusLog == NULL)
- {
- m_pBusLog = CreateLogger();
- }
- if (m_pBusLog)
- {
- if (m_pBusLog->IsLogFilePathExist() == false)
- {
- m_pBusLog->SetLogFilepath(pPath);
- }
- }
- }
- void DebugPrintPacket(ResDataObject &packet)
- {
- string Context = packet.encode();
- printf("ResDataObject:%s\n", Context.c_str());
- }
- bool eBusService::SendSMPacket(const char *pTargetID, const char *pContext, unsigned long long nShareMemID, DWORD ChannelId)
- {
- bool ret = true;
- if (m_ConnectionStatus == false)
- {
- return false;
- }
- LPVOID pData = NULL;
- DWORD DataLen = 0;
- CShareMemory_Block smBlock;
- if (smBlock.ShareMemoryBlockSet((DWORD)nShareMemID, SM_READWRITE))
- {
- if (smBlock.GetBlockDataAddr(pData, DataLen))
- {
- ret = ((eBus*)m_Parent)->SendPacket(pTargetID, pContext, (const char *)pData, DataLen);
- //ret = SendPacket(pTargetID, pContext, pData, DataLen, ChannelId);
- if (ret)
- {
- m_OfflineHit = 0;
- }
- else
- {
- if (m_OfflineHit)
- {
- //Disconnected
- TPRINTA_ERROR("SendSMPacket Failed.offline callback Hited");
- DisConnect();
- }
- }
- }
- else
- {
- ret = false;
- TPRINTA_ERROR("CShareMemory_Block::GetBlockDataAddr Failed.");
- }
- }
- else
- {
- ret = false;
- TPRINTA_ERROR("CShareMemory_Block::ShareMemoryBlockSet Failed.");
- }
- return ret;
- }
- bool eBusService::SendPacket(const char *pTargetID, const char *pContext, DWORD ChannelId)
- {
- bool ret = false;
- TPRINTA_DEBUG("Send Msg Entry");
- if (m_ConnectionStatus == false)
- {
- return ret;
- }
- //for test
- //if (strcmp(pTargetID, "ccosAlpha") != 0)
- //{
- // ret = ret;
- //}
- DString dstrDestID = pTargetID;
- DString strMsg = pContext;
- //BUSC::MessageClient LocalClient;
- //LocalClient.ConnectTo(m_Ipaddress);
- //LocalClient.SetTcpNoDelay(true);
- //LocalClient.SetSourceID(m_busId);
- //PRINTA_DEBUG(m_Logger, "<eBusMessenger Send> %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024)));
- BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
- BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
- //if (m_Local == false)
- {
- if (Wrapper->IsConnected() == false)
- {
- pClient->ConnectTo(m_Ipaddress);
- pClient->SetTcpNoDelay(true);
- pClient->SetSourceID(m_busId);
- Wrapper->SetConnection(true);//suppose connected
- }
- }
- //if (Wrapper->IsConnected() == false)
- //{
- // pClient->SetSourceID(m_busId);
- //}
- /*
- //旧版 begin
- BUSC::BLOBClient client;
- if (m_Local == false)
- {
- client.ConnectTo(m_Ipaddress, m_Port);
- client.SetTcpNoDelay(true);
- }
- client.SetSourceID(m_busId);
- //旧版 end
- */
- //string strBase64;
- //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64);
- //return client.Send(dstrDestID, ChannelId, strBase64.data());
- SYSTEMTIME st = { 0 };
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
- #endif
- //ret = client.Send(dstrDestID, ChannelId, strMsg.data());
- ret = pClient->Post(dstrDestID, ChannelId, strMsg);
- //ret = LocalClient.Post(dstrDestID, ChannelId, strMsg);
- TPRINTA_DEBUG("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
- if (ret)
- {
- m_OfflineHit = 0;
- }
- else
- {
- TPRINTA_ERROR("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
- //try again
- Wrapper->SetConnection(false);
- pClient = Wrapper->GetBusClient();
- if (Wrapper->IsConnected() == false)
- {
- pClient->ConnectTo(m_Ipaddress);
- pClient->SetTcpNoDelay(true);
- pClient->SetSourceID(m_busId);
- Wrapper->SetConnection(true);//suppose connected
- }
- ret = pClient->Post(dstrDestID, ChannelId, strMsg);
- TPRINTA_DEBUG("Post Try Again: From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
- if (ret)
- {
- m_OfflineHit = 0;
- }
- else
- {
- if (m_OfflineHit)
- {
- //Disconnected
- DisConnect();
- }
- }
- }
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,after packet send result:%d\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
- #endif
- return ret;
- }
- bool eBusService::SendPacket(const char *pTargetID, const char *pContext, const char *pBlock, DWORD BlockSize, DWORD ChannelId)
- {
- bool ret = false;
- TPRINTA_DEBUG("Send BLOCK Entry");
- if (m_ConnectionStatus == false)
- {
- return ret;
- }
- DString dstrDestID = pTargetID;
- DString strMsg = pContext;
- //PRINTA_DEBUG(m_Logger, "<eBusMessenger Send> %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024)));
- BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
- BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
- //if (m_Local == false)
- {
- if (Wrapper->IsConnected() == false)
- {
- pClient->ConnectTo(m_Ipaddress);
- pClient->SetTcpNoDelay(true);
- pClient->SetSourceID(m_busId);
- Wrapper->SetConnection(true);//suppose connected
- }
- }
- //if (Wrapper->IsConnected() == false)
- //{
- // pClient->SetSourceID(m_busId);
- //}
- //Wrapper->SetConnection(true);//suppose connected
- //void * pData = vr->GetData();
- //int len = vr->GetLength();
- //BUSC::BLOBStreamClient Client;
- /*
- //旧版begin
- BUSC::BLOBClient client;
- if (m_Local == false)
- {
- client.ConnectTo(m_Ipaddress, m_Port);
- client.SetTcpNoDelay(true);
- }
- client.SetSourceID(m_busId);
- //旧版end
- */
- //string strBase64;
- //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64);
- //BlockBuffer bb;
- //DString msgAck;
- //BlockBuffer bbAck;
- //bb.Attach((UINT8*)pBlock, BlockSize);//add data??
- //return client.SendBLOB(dstrDestID, ChannelId, strBase64.data(), bb, msgAck);//sync mode yes
- SYSTEMTIME st = { 0 };
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,block packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
- #endif
- //ret = client.SendBLOB(dstrDestID, ChannelId, strMsg.data(), bb, msgAck);//sync mode yes
- //ret = client.PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb);
- //ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb);
- auto fun = [&](BUSC::BUSOutStream & stream)
- {
- stream.Write(pBlock, BlockSize);
- };
- ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
- TPRINTA_DEBUG("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
- if (ret)
- {
- m_OfflineHit = 0;
- }
- else
- {
- TPRINTA_ERROR("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
- //try again
- Wrapper->SetConnection(false);
- pClient = Wrapper->GetBusClient();
- if (Wrapper->IsConnected() == false)
- {
- pClient->ConnectTo(m_Ipaddress);
- pClient->SetTcpNoDelay(true);
- pClient->SetSourceID(m_busId);
- Wrapper->SetConnection(true);//suppose connected
- }
- ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
- TPRINTA_DEBUG("Post Tray Again:From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
- if (ret)
- {
- m_OfflineHit = 0;
- }
- else
- {
- if (m_OfflineHit)
- {
- //Disconnected
- DisConnect();
- }
- }
- }
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,after block packet send result:%d-------------\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
- #endif
- //bb.Detach();
- return ret;
- }
- void eBusService::OnHeartBeat(const void *sender, EventArgs_Null * arg)
- {
- //printf("OnlineMessage\n");
- if (sender)
- {
- //printf("OnlineMessage:%s\n", p);
- }
- m_OfflineHit = 0;
- m_TickCount = GetTickCount();
- }
- void eBusService::OnlineMessage(const void *sender, EventArgs_Null * arg)
- {
- if (arg != NULL)
- {
- printf("OnlineMessage\n");
- }
- //DisConnect();
- //SetEvent(m_ConnectEvt);
- //just tag it
- m_OfflineHit = 0;
- }
- void eBusService::OnOfflineMessage(const void *sender, EventArgs_Null * arg)
- {
- if (arg != NULL)
- {
- printf("OnOfflineMessage\n");
- }
- //DisConnect();
- //just tag it
- m_OfflineHit = 1;
- }
- void eBusService::Quit()
- {
- //BUSC::Client::Quit();
- if (m_pCTMap)
- {
- m_pCTMap->Clear();
- }
- }
- RET_STATUS eBusService::BufferToNotify(void * pOemImage, DWORD ImageSize, ImgDataInfo* PImgDataInfo)
- {
- RET_STATUS Ret = RET_SUCCEED;
- //for memory leak test
- //PImgDataInfo->nShareMemID = 1;
- //return Ret;
- const char *pSmName = Get_Circle_SM_Object(ImageSize);
- CShareMemory_Circle SM;
-
- if (SM.Open(pSmName, SM_READWRITE) >= 0)
- {
- ShareMemoryBlockID smid = SM.Push(pOemImage, ImageSize, true);
- if (smid != 0xffffffff)
- {
- PImgDataInfo->nShareMemID = smid;
- }
- else
- {
- //MessageBox(NULL, "Push failed", "Sys Lib", MB_OK);
- printf("\n Push failed \n");
- Ret = RET_FAILED;
- }
- SM.Close();
- }
- else
- {
- MessageBox(NULL, "Open failed", "Sys Lib", MB_OK);
- Ret = RET_FAILED;
- }
- return Ret;
- }
- void eBusService::BlobDataArrived(const char *pMsg, unsigned char *pBlockData, DWORD BlockDataLen)
- {
- SYSTEMTIME st = { 0 };
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
- //printf("time-%02d:%02d:%02d:%03d,Block packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
- #endif
- //-----got packet---------------
- ResDataObject packet;
- try {
- PRINTA_DEBUG(m_pBusLog, "BlobMsg:Got Packet.BlockLen is %d", BlockDataLen);
- PRINTA_TRACE(m_pBusLog, "Blob Packet Context:{\n%s}", pMsg);
- if (packet.decode(pMsg))
- {
- PRINTA_DEBUG(m_pBusLog, "BlobMsg:Decode Packet Succeed");
- if (m_Parent)
- {
- //check block
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
- if (cmd == PACKET_CMD_DATA)
- {
- ImgDataInfo ImgData;
- ResDataObject resImg;
- PacketAnalizer::GetPacketContext(&packet, resImg);
- ImgData.SetResDataObject(resImg);
- //Need Block Data to Share Memory
- Thread_Lock();
- if (BufferToNotify(pBlockData, BlockDataLen, &ImgData) == RET_SUCCEED)
- {
- Thread_UnLock();
- //BufferToNotify ok
- //And Update the Id of memory
- PRINTA_DEBUG(m_pBusLog, "BlobMsg:BufferToNotify Succeed");
- resImg.clear();
- ImgData.GetResDataObject(resImg);
- if (PacketAnalizer::UpdatePacketContext(packet, resImg))
- {
- PRINTA_DEBUG(m_pBusLog, "BlobMsg:Dispatch packet to BusThread");
- ((eBus*)m_Parent)->PacketArrived(packet);
- }
- }
- else
- {
- Thread_UnLock();
- }
- }
- else
- {
- ((eBus*)m_Parent)->PacketArrived(packet);
- }
- }
- }
- else
- {
- PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed");
- }
- }
- catch (ResDataObjectExption &exp)
- {
- //exp.what()
- Logger *p = GetGlobalLogger();
- PRINTA_ERROR(p, exp.what());
- }
- catch (...)
- {
- Logger *p = GetGlobalLogger();
- PRINTA_ERROR(p, "Unknown Exp Happened\n");
- }
- }
- void eBusService::OnBlobMessage(const void *sender, EventArgs_BUSBLOB * arg)
- {
- EventArgs_BUSBLOB * e = (EventArgs_BUSBLOB*)(arg);
- if (!arg) return;
- //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
- m_OfflineHit = 0;
- m_TickCount = GetTickCount();
- DWORD Command = arg->m_Command;
- DString SourceID = arg->m_SourceID;
- DString Message = arg->MsgAs <DString>();
- DWORD MessageID = arg->m_MessageID;//???
- if (Command == CCOS_HW_CHANNEL + 1)
- {
- UINT8 *pBlockData = (UINT8*)arg->m_BLOB;//block data buff
- DWORD BlockDataLen = arg->m_BLOB.GetCount();//block data len??
- string strMessage = Message.constBuffer();
- BlobDataArrived(strMessage.c_str(), pBlockData, BlockDataLen);
- }
- else
- {
- //put some log here
- }
- //return acks-------------------
- //message ack
- //e->m_AckMessage = "";
- //blob ack,for what??
- //BlockBuffer bb;
- //e->m_AckBLOB = bb;
- //Thread_UnLock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
- }
- void eBusService::OnBUSMessage(const void *sender, EventArgs_BUSMessage * arg)
- {
- if (!arg) return;
- //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
- m_OfflineHit = 0;
- m_TickCount = GetTickCount();
- DWORD Command = arg->m_Command;
- DString SourceID = arg->m_SourceID;
- DString Message = arg->MsgAs <DString>();
- DWORD MessageID = arg->m_MessageID;
- if (Command == BUSCMD_BUSServerConnected)
- {
- SetEvent(m_ConnectEvt);
- }
- else if (Command == CCOS_HW_CHANNEL)
- {
- string strMessage = Message.constBuffer();
- SYSTEMTIME st = { 0 };
- #ifdef _DEBUG
- //GetLocalTime(&st);
- //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
- //printf("time-%02d:%02d:%02d:%03d,packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
- #endif
- //CBase64::Decode(Message.constBuffer(), strMessage);
- //-----got packet---------------
- ResDataObject packet;
- try {
- PRINTA_DEBUG(m_pBusLog, "Got Msg Packet.TID:%d",GetCurrentThreadId());
- PRINTA_TRACE(m_pBusLog, "Packet Context:{\n%s}",strMessage.c_str());
- if (packet.decode(strMessage.c_str()))
- {
- PRINTA_DEBUG(m_pBusLog, "Decode Packet Succeed");
- if (m_Parent)
- {
- ((eBus*)m_Parent)->PacketArrived(packet);
- }
- }
- else
- {
- PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed");
- }
- }
- catch (ResDataObjectExption &exp)
- {
- //exp.what()
- Logger *p = GetGlobalLogger();
- PRINTA_ERROR(p, "OnBUSMessage:\n");
- PRINTA_ERROR(p, exp.what());
- PRINTA_ERROR(p, strMessage.c_str());
- }
- catch (...)
- {
- Logger *p = GetGlobalLogger();
- PRINTA_ERROR(p, "OnBUSMessage:Unknown Exp Happened\n");
- PRINTA_ERROR(p, strMessage.c_str());
- }
- }
- //arg->m_AckMessage = "";
- //Thread_UnLock();这个可能带来风险,
- }
- void eBusService::UnRegistThread(DWORD Tid)
- {
- if (m_pCTMap)
- {
- (*m_pCTMap).UnRegistThread(Tid);
- }
- }
|