#include "stdafx.h" #define _WINSOCK_DEPRECATED_NO_WARNINGS #include #include #include #include "eBusService.h" #include "eBus.h" #include "base64.h" #include "Logger.h" #include "PacketAnalizer.h" #include "shareMemWR.h" #include "P2PModule.h" #include "ShareMemory_IF.h" #include "ShareMemory_Block.h" //#include "common_api.h" using namespace std; string GetFileTitle(string &fullpath) { string ret = ""; string::size_type firstHit = fullpath.find_last_of('\\'); string::size_type firstHitSec = fullpath.find_last_of('/'); if ((firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size())) && (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size()))) { return ret; } if (firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size())) { ret = fullpath.substr(firstHitSec + 1); } else if (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size())) { ret = fullpath.substr(firstHit + 1); } else { //effective both if (firstHit > firstHitSec) { ret = fullpath.substr(firstHit + 1);//kick last } else { ret = fullpath.substr(firstHitSec + 1);//kick last } } //got filetitle with ext if (ret.size() > 0) { string::size_type firstHit = ret.find_first_of('.'); ret = ret.substr(0, firstHit); } return ret; } /* string GetProcessDirectory() { string ret = ""; char szFilename[MAX_PATH] = { 0 }; DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH); if (res == 0) { return ret; } string fullpath = szFilename; string::size_type firstHit = fullpath.find_last_of('\\'); if (firstHit == string::npos || firstHit == 0) { return ret; } ret = fullpath.substr(0, firstHit);//kick last \ return ret; }*/ string GeteBusLogPath() { string ret = ""; char szFilename[MAX_PATH] = { 0 }; DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH); if (res == 0) { return ret; } string fullpath = szFilename; string FileTitle = GetFileTitle(fullpath); string logfile = GetProcessDirectory() + "\\logs\\" + FileTitle; SYSTEMTIME st = { 0 }; GetLocalTime(&st); //日,时,分,秒,毫 sprintf_s(szFilename, "_%u_%02d%02d%02d%02d%03d.log",GetCurrentProcessId(),st.wDay,st.wHour, st.wMinute,st.wSecond,st.wMilliseconds); logfile += szFilename; return logfile; } void IniteBusService(bool EnableLog) { string logfile = GeteBusLogPath(); BUSC::Client::OpenLogger(logfile.c_str()); BUSC::Client::Prepare(); } void QuiteBusService() { BUSC::Client::Quit(); } vector GetZeroConfigIP(DWORD trytime, DWORD timeperiod) { vector theList; //string local = ""; //for (DWORD i = 0; i < trytime; i++) //{ // Array arNode; // ECOM::ZeroConfig::ZeroConfigClient::QueryAll(ECOM::ZeroConfig::ServiceTypeID::CCOSHardware, &arNode); // for (Array ::Iterator Iter(arNode); Iter; Iter++) // { // ECOM::ZeroConfig::ServiceNode & Node = Iter(); // //PRINTA_INFO(m_Logger, "Got ZeroConfig IPAddress : %s\n", Node.IPAddress); // local = Node.IPAddress; // theList.push_back(local); // } // if (theList.size() > 0) // { // return theList; // } // Sleep(timeperiod); //} return theList; } bool FindRouterIpAddress(vector &zeroConfigIp, string &serverIp) { if (zeroConfigIp.size() == 0) { //err log return false; } //zc ip is specific if ((serverIp).size() > 0) { for (DWORD i = 0; i < zeroConfigIp.size(); i++) { if (zeroConfigIp[i] == (serverIp)) { //good to go return true; } } //no match return false; } //no optional ip or no match zc server //connect anyway if only one zc server exists if (zeroConfigIp.size() == 1) { //put some warning log here serverIp = zeroConfigIp[0]; return true; } //multiple router... if (zeroConfigIp.size() > 1) { //put some warning log here //warning here //serverIp = zeroConfigIp[0]; return false; } //err log return false; } string ReplaceSubString(string org, string &keystr, string &replacestr) { std::string::size_type pos = 0; std::string::size_type keylen = keystr.size(); std::string::size_type replen = replacestr.size(); while ((pos = org.find(keystr, pos)) != std::string::npos) { org.replace(pos, keylen, replacestr); pos += replen; } return org; } bool IsLocalip(string &ServerIp) { //for test string Temp; struct hostent* pHost; static string ExIp = "127.0.0.1"; if (ServerIp == ExIp) { return true; } char cHostName[128] = { 0 }; gethostname(cHostName, sizeof(cHostName)); if (cHostName[0] != 0) { pHost = gethostbyname(cHostName); if (pHost) { for (int i = 0; pHost->h_addr_list[i] != NULL; i++) { Temp = ""; Temp = inet_ntoa(*(struct in_addr *)pHost->h_addr_list[i]); Temp = ReplaceSubString(Temp, string(" "), string(""));//trim space if (Temp == ServerIp) { return true; } } } } return false; } eBusService::eBusService(void) { m_TickCount = 0; m_OfflineHit = 0; m_ConnectionStatus = false; m_Parent = NULL; m_pBlobReceiver = NULL; m_pBusLog = NULL; m_pCTMap = NULL; m_ConnectEvt = CreateEvent(0, 0, 0, 0); } eBusService::~eBusService(void) { if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } if (m_pBusLog) { ReleseLogger(m_pBusLog); } if (m_pCTMap) { delete m_pCTMap; m_pCTMap = NULL; } CloseHandle(m_ConnectEvt); } void eBusService::SetParent(HANDLE Parent) { m_Parent = Parent; } bool eBusService::IsConnected() { if (m_ConnectionStatus) { if ((GetTickCount() - m_TickCount) > (120 * 1000)) { printf("Hearbeat Timeout.CurTick:%d,LastTick:%d\n", GetTickCount(), m_TickCount); DisConnect(); } } return m_ConnectionStatus; } bool eBusService::Connect(bool Local, DString &busId, DString &Ipaddress, DString &Port) { Thread_Lock(); try { if (m_pCTMap == NULL) { m_pCTMap = new ClientsThreadMap(); } do { m_Local = Local; m_busId = busId; m_Ipaddress = Ipaddress; m_Port = Port; if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } ResetEvent(m_ConnectEvt); // Port = DString::From(BUSC::Client::GetDefStatusPort()); m_pBlobReceiver = new BUSC::BLOBReceiver(); if (m_pBlobReceiver == NULL) { printf("EBUS Client malloc Failed.!!!!!\n"); Thread_UnLock(); return false; } if (Local) { //Local Connection //Local:1 //Port:eBusDefault or selfdefine //srcBusId:BusId //BUSC::MessageClient::SetLogFileName("AlphaLog.log"); //BUSC::MessageClient::Prepare(); //m_pBlobReceiver->Prepare(); if (m_Ipaddress.IsEmpty()) { m_Ipaddress = "127.0.0.1"; } m_pBlobReceiver->SetSourceID(busId); m_pBlobReceiver->SetTcpNoDelay(true); //gaofei m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __) { SetEvent(m_ConnectEvt); }); m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage); m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage); m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage); m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage); m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat); printf("========**** eBusService::Connect Thread 0X%08X \n", GetCurrentThreadId()); Sleep(10000); m_pBlobReceiver->StartWait(); if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT) { printf("========**** eBusService::Connect Thread Timeout Over 0X%08X \n", GetCurrentThreadId()); if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } printf("EBUS Connect Timeout.!!!!!\n"); Thread_UnLock(); return false; } printf("========**** eBusService::Connect Thread Over 0X%08X \n", GetCurrentThreadId()); m_OfflineHit = 0; m_TickCount = GetTickCount(); m_ConnectionStatus = true; printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); return true; } else if (Local == 0) { //Local:0 //Ethernet Connection //RouterIp:ipaddress //vector theIpList = GetZeroConfigIP(10, 1000); string Ip = Ipaddress; //if RouterIp is empty,eBus will find one . //if eBus found more than one,it will fail and return false. //eth connect only outside machine ip if (Ip.size() != 0 && IsLocalip(Ip) == false) { //if (Ip == "127.0.0.1" || FindRouterIpAddress(theIpList, Ip)) { //we got routerIp,check org&new one //Ip = "192.168.2.77"; m_Ipaddress = Ip.c_str(); //if (IsLocalip(Ip)) //{ // m_Ipaddress = ""; // m_TickCount = 0; // m_ConnectionStatus = false; // return false; //} //else { //Port:eBusDefault or selfdefine //srcBusId:BusId //BUSC::MessageClient::SetLogFileName("AlphaLog.log"); //BUSC::MessageClient::Prepare(); //m_pBlobReceiver->Prepare(); //m_pBlobReceiver->SetLogFileName("AlphaLog_Eth.log"); ResetEvent(m_ConnectEvt); m_pBlobReceiver->ConnectTo(m_Ipaddress); m_pBlobReceiver->SetSourceID(busId); //gaofei m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __) { SetEvent(m_ConnectEvt); }); m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage); m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage); m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage); m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage); m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat); m_pBlobReceiver->SetTcpNoDelay(true); m_pBlobReceiver->StartWait(); if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT) { if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } printf("EBUS Connect Timeout.IP:%s!!!!!\n",(const char*)m_Ipaddress); Thread_UnLock(); return false; } m_TickCount = GetTickCount(); m_OfflineHit = 0; m_ConnectionStatus = true; printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); return true; } } } } else { //do nothing } } while (0); } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); PRINTA_DEBUG(p, exp.what()); } catch (...) { Logger *p = GetGlobalLogger(); PRINTA_DEBUG(p, "Unknown Exp Happened\n"); } if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } m_TickCount =0; m_ConnectionStatus = false; Thread_UnLock(); return false; } void eBusService::DisConnect() { //need to confirm multiple connection //BUSC::BLOBClient::Quit(); Thread_Lock(); if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } m_TickCount = 0; m_ConnectionStatus = false; if (m_pCTMap) { m_pCTMap->Disconnected(); } printf("Disconnct Ebus.Local:%d,busId:%s,ServerIp:%s,Port:%s\n",m_Local,(const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); } void eBusService::SetLogPath(const char *pPath) { //DString dstrPath = pPath; //BUSC::MessageClient::Prepare(); //BUSC::MessageClient::SetLogFileName(dstrPath); //m_pBlobReceiver->Prepare(); //m_pBlobReceiver->SetLogFileName(dstrPath); if (m_pBusLog == NULL) { m_pBusLog = CreateLogger(); } if (m_pBusLog) { if (m_pBusLog->IsLogFilePathExist() == false) { m_pBusLog->SetLogFilepath(pPath); } } } void DebugPrintPacket(ResDataObject &packet) { string Context = packet.encode(); printf("ResDataObject:%s\n", Context.c_str()); } bool eBusService::SendSMPacket(const char *pTargetID, const char *pContext, unsigned long long nShareMemID, DWORD ChannelId) { bool ret = true; if (m_ConnectionStatus == false) { return false; } LPVOID pData = NULL; DWORD DataLen = 0; CShareMemory_Block smBlock; if (smBlock.ShareMemoryBlockSet((DWORD)nShareMemID, SM_READWRITE)) { if (smBlock.GetBlockDataAddr(pData, DataLen)) { ret = ((eBus*)m_Parent)->SendPacket(pTargetID, pContext, (const char *)pData, DataLen); //ret = SendPacket(pTargetID, pContext, pData, DataLen, ChannelId); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected TPRINTA_ERROR("SendSMPacket Failed.offline callback Hited"); DisConnect(); } } } else { ret = false; TPRINTA_ERROR("CShareMemory_Block::GetBlockDataAddr Failed."); } } else { ret = false; TPRINTA_ERROR("CShareMemory_Block::ShareMemoryBlockSet Failed."); } return ret; } bool eBusService::SendPacket(const char *pTargetID, const char *pContext, DWORD ChannelId) { bool ret = false; TPRINTA_DEBUG("Send Msg Entry"); if (m_ConnectionStatus == false) { return ret; } //for test //if (strcmp(pTargetID, "ccosAlpha") != 0) //{ // ret = ret; //} DString dstrDestID = pTargetID; DString strMsg = pContext; //BUSC::MessageClient LocalClient; //LocalClient.ConnectTo(m_Ipaddress); //LocalClient.SetTcpNoDelay(true); //LocalClient.SetSourceID(m_busId); //PRINTA_DEBUG(m_Logger, " %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024))); BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()]; BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient(); //if (m_Local == false) { if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } } //if (Wrapper->IsConnected() == false) //{ // pClient->SetSourceID(m_busId); //} /* //旧版 begin BUSC::BLOBClient client; if (m_Local == false) { client.ConnectTo(m_Ipaddress, m_Port); client.SetTcpNoDelay(true); } client.SetSourceID(m_busId); //旧版 end */ //string strBase64; //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64); //return client.Send(dstrDestID, ChannelId, strBase64.data()); SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext); #endif //ret = client.Send(dstrDestID, ChannelId, strMsg.data()); ret = pClient->Post(dstrDestID, ChannelId, strMsg); //ret = LocalClient.Post(dstrDestID, ChannelId, strMsg); TPRINTA_DEBUG("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { TPRINTA_ERROR("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); //try again Wrapper->SetConnection(false); pClient = Wrapper->GetBusClient(); if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } ret = pClient->Post(dstrDestID, ChannelId, strMsg); TPRINTA_DEBUG("Post Try Again: From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected DisConnect(); } } } #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,after packet send result:%d\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret); #endif return ret; } bool eBusService::SendPacket(const char *pTargetID, const char *pContext, const char *pBlock, DWORD BlockSize, DWORD ChannelId) { bool ret = false; TPRINTA_DEBUG("Send BLOCK Entry"); if (m_ConnectionStatus == false) { return ret; } DString dstrDestID = pTargetID; DString strMsg = pContext; //PRINTA_DEBUG(m_Logger, " %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024))); BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()]; BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient(); //if (m_Local == false) { if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } } //if (Wrapper->IsConnected() == false) //{ // pClient->SetSourceID(m_busId); //} //Wrapper->SetConnection(true);//suppose connected //void * pData = vr->GetData(); //int len = vr->GetLength(); //BUSC::BLOBStreamClient Client; /* //旧版begin BUSC::BLOBClient client; if (m_Local == false) { client.ConnectTo(m_Ipaddress, m_Port); client.SetTcpNoDelay(true); } client.SetSourceID(m_busId); //旧版end */ //string strBase64; //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64); //BlockBuffer bb; //DString msgAck; //BlockBuffer bbAck; //bb.Attach((UINT8*)pBlock, BlockSize);//add data?? //return client.SendBLOB(dstrDestID, ChannelId, strBase64.data(), bb, msgAck);//sync mode yes SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,block packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext); #endif //ret = client.SendBLOB(dstrDestID, ChannelId, strMsg.data(), bb, msgAck);//sync mode yes //ret = client.PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb); //ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb); auto fun = [&](BUSC::BUSOutStream & stream) { stream.Write(pBlock, BlockSize); }; ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun); TPRINTA_DEBUG("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { TPRINTA_ERROR("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); //try again Wrapper->SetConnection(false); pClient = Wrapper->GetBusClient(); if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun); TPRINTA_DEBUG("Post Tray Again:From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected DisConnect(); } } } #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,after block packet send result:%d-------------\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret); #endif //bb.Detach(); return ret; } void eBusService::OnHeartBeat(const void *sender, EventArgs_Null * arg) { //printf("OnlineMessage\n"); if (sender) { //printf("OnlineMessage:%s\n", p); } m_OfflineHit = 0; m_TickCount = GetTickCount(); } void eBusService::OnlineMessage(const void *sender, EventArgs_Null * arg) { if (arg != NULL) { printf("OnlineMessage\n"); } //DisConnect(); //SetEvent(m_ConnectEvt); //just tag it m_OfflineHit = 0; } void eBusService::OnOfflineMessage(const void *sender, EventArgs_Null * arg) { if (arg != NULL) { printf("OnOfflineMessage\n"); } //DisConnect(); //just tag it m_OfflineHit = 1; } void eBusService::Quit() { //BUSC::Client::Quit(); if (m_pCTMap) { m_pCTMap->Clear(); } } RET_STATUS eBusService::BufferToNotify(void * pOemImage, DWORD ImageSize, ImgDataInfo* PImgDataInfo) { RET_STATUS Ret = RET_SUCCEED; //for memory leak test //PImgDataInfo->nShareMemID = 1; //return Ret; const char *pSmName = Get_Circle_SM_Object(ImageSize); CShareMemory_Circle SM; if (SM.Open(pSmName, SM_READWRITE) >= 0) { ShareMemoryBlockID smid = SM.Push(pOemImage, ImageSize, true); if (smid != 0xffffffff) { PImgDataInfo->nShareMemID = smid; } else { //MessageBox(NULL, "Push failed", "Sys Lib", MB_OK); printf("\n Push failed \n"); Ret = RET_FAILED; } SM.Close(); } else { MessageBox(NULL, "Open failed", "Sys Lib", MB_OK); Ret = RET_FAILED; } return Ret; } void eBusService::BlobDataArrived(const char *pMsg, unsigned char *pBlockData, DWORD BlockDataLen) { SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); //printf("time-%02d:%02d:%02d:%03d,Block packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str()); #endif //-----got packet--------------- ResDataObject packet; try { PRINTA_DEBUG(m_pBusLog, "BlobMsg:Got Packet.BlockLen is %d", BlockDataLen); PRINTA_TRACE(m_pBusLog, "Blob Packet Context:{\n%s}", pMsg); if (packet.decode(pMsg)) { PRINTA_DEBUG(m_pBusLog, "BlobMsg:Decode Packet Succeed"); if (m_Parent) { //check block PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet); if (cmd == PACKET_CMD_DATA) { ImgDataInfo ImgData; ResDataObject resImg; PacketAnalizer::GetPacketContext(&packet, resImg); ImgData.SetResDataObject(resImg); //Need Block Data to Share Memory Thread_Lock(); if (BufferToNotify(pBlockData, BlockDataLen, &ImgData) == RET_SUCCEED) { Thread_UnLock(); //BufferToNotify ok //And Update the Id of memory PRINTA_DEBUG(m_pBusLog, "BlobMsg:BufferToNotify Succeed"); resImg.clear(); ImgData.GetResDataObject(resImg); if (PacketAnalizer::UpdatePacketContext(packet, resImg)) { PRINTA_DEBUG(m_pBusLog, "BlobMsg:Dispatch packet to BusThread"); ((eBus*)m_Parent)->PacketArrived(packet); } } else { Thread_UnLock(); } } else { ((eBus*)m_Parent)->PacketArrived(packet); } } } else { PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed"); } } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); PRINTA_ERROR(p, exp.what()); } catch (...) { Logger *p = GetGlobalLogger(); PRINTA_ERROR(p, "Unknown Exp Happened\n"); } } void eBusService::OnBlobMessage(const void *sender, EventArgs_BUSBLOB * arg) { EventArgs_BUSBLOB * e = (EventArgs_BUSBLOB*)(arg); if (!arg) return; //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 m_OfflineHit = 0; m_TickCount = GetTickCount(); DWORD Command = arg->m_Command; DString SourceID = arg->m_SourceID; DString Message = arg->MsgAs (); DWORD MessageID = arg->m_MessageID;//??? if (Command == CCOS_HW_CHANNEL + 1) { UINT8 *pBlockData = (UINT8*)arg->m_BLOB;//block data buff DWORD BlockDataLen = arg->m_BLOB.GetCount();//block data len?? string strMessage = Message.constBuffer(); BlobDataArrived(strMessage.c_str(), pBlockData, BlockDataLen); } else { //put some log here } //return acks------------------- //message ack //e->m_AckMessage = ""; //blob ack,for what?? //BlockBuffer bb; //e->m_AckBLOB = bb; //Thread_UnLock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 } void eBusService::OnBUSMessage(const void *sender, EventArgs_BUSMessage * arg) { if (!arg) return; //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 m_OfflineHit = 0; m_TickCount = GetTickCount(); DWORD Command = arg->m_Command; DString SourceID = arg->m_SourceID; DString Message = arg->MsgAs (); DWORD MessageID = arg->m_MessageID; if (Command == BUSCMD_BUSServerConnected) { SetEvent(m_ConnectEvt); } else if (Command == CCOS_HW_CHANNEL) { string strMessage = Message.constBuffer(); SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); //printf("time-%02d:%02d:%02d:%03d,packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str()); #endif //CBase64::Decode(Message.constBuffer(), strMessage); //-----got packet--------------- ResDataObject packet; try { PRINTA_DEBUG(m_pBusLog, "Got Msg Packet.TID:%d",GetCurrentThreadId()); PRINTA_TRACE(m_pBusLog, "Packet Context:{\n%s}",strMessage.c_str()); if (packet.decode(strMessage.c_str())) { PRINTA_DEBUG(m_pBusLog, "Decode Packet Succeed"); if (m_Parent) { ((eBus*)m_Parent)->PacketArrived(packet); } } else { PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed"); } } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); PRINTA_ERROR(p, "OnBUSMessage:\n"); PRINTA_ERROR(p, exp.what()); PRINTA_ERROR(p, strMessage.c_str()); } catch (...) { Logger *p = GetGlobalLogger(); PRINTA_ERROR(p, "OnBUSMessage:Unknown Exp Happened\n"); PRINTA_ERROR(p, strMessage.c_str()); } } //arg->m_AckMessage = ""; //Thread_UnLock();这个可能带来风险, } void eBusService::UnRegistThread(DWORD Tid) { if (m_pCTMap) { (*m_pCTMap).UnRegistThread(Tid); } }