#define _WINSOCK_DEPRECATED_NO_WARNINGS /* #include #include #include "eBusService.h" #include "eBus.h" #include "base64.h" #include "Logger.h" #include "PacketAnalizer.h" #include "shareMemWR.h" #include "P2PModule.h" #include "ShareMemory_IF.h" #include "ShareMemory_Block.h" #include "common_api.h" using namespace std; string GeteBusLogPath() { string ret = ""; char szFilename[MAX_PATH] = { 0 }; DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH); if (res == 0) { return ret; } string fullpath = szFilename; string FileTitle = GetFileTitle(fullpath); string logfile = GetProcessDirectory() + "\\logs\\" + FileTitle; SYSTEMTIME st = { 0 }; GetLocalTime(&st); //日,时,分,秒,毫 sprintf_s(szFilename, "_%u_%02d%02d%02d%02d%03d.log",GetCurrentProcessId(),st.wDay,st.wHour, st.wMinute,st.wSecond,st.wMilliseconds); logfile += szFilename; return logfile; } void IniteBusService(bool EnableLog) { string logfile = GeteBusLogPath(); BUSC::Client::OpenLogger(logfile.c_str()); BUSC::Client::Prepare(); } void QuiteBusService() { BUSC::Client::Quit(); } bool FindRouterIpAddress(vector &zeroConfigIp, string &serverIp) { if (zeroConfigIp.size() == 0) { //err log return false; } //zc ip is specific if ((serverIp).size() > 0) { for (DWORD i = 0; i < zeroConfigIp.size(); i++) { if (zeroConfigIp[i] == (serverIp)) { //good to go return true; } } //no match return false; } //no optional ip or no match zc server //connect anyway if only one zc server exists if (zeroConfigIp.size() == 1) { //put some warning log here serverIp = zeroConfigIp[0]; return true; } //multiple router... if (zeroConfigIp.size() > 1) { //put some warning log here //warning here //serverIp = zeroConfigIp[0]; return false; } //err log return false; } bool IsLocalip(string &ServerIp) { //for test string Temp; struct hostent* pHost; static string ExIp = "127.0.0.1"; if (ServerIp == ExIp) { return true; } char cHostName[128] = { 0 }; gethostname(cHostName, sizeof(cHostName)); if (cHostName[0] != 0) { pHost = gethostbyname(cHostName); if (pHost) { for (int i = 0; pHost->h_addr_list[i] != NULL; i++) { Temp = ""; Temp = inet_ntoa(*(struct in_addr *)pHost->h_addr_list[i]); Temp = ReplaceSubString(Temp, string(" "), string(""));//trim space if (Temp == ServerIp) { return true; } } } } return false; } eBusService::eBusService(void) { m_TickCount = 0; m_OfflineHit = 0; m_ConnectionStatus = false; m_Parent = NULL; m_pBlobReceiver = NULL; m_pBusLog = NULL; m_pCTMap = NULL; m_ConnectEvt = CreateEvent(0, 0, 0, 0); } eBusService::~eBusService(void) { if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } if (m_pBusLog) { ReleseLogger(m_pBusLog); } if (m_pCTMap) { delete m_pCTMap; m_pCTMap = NULL; } CloseHandle(m_ConnectEvt); } void eBusService::SetParent(HANDLE Parent) { m_Parent = Parent; } bool eBusService::IsConnected() { if (m_ConnectionStatus) { if ((GetTickCount() - m_TickCount) > (120 * 1000)) { printf("Hearbeat Timeout.CurTick:%d,LastTick:%d\n", GetTickCount(), m_TickCount); DisConnect(); } } return m_ConnectionStatus; } bool eBusService::Connect(bool Local, DString &busId, DString &Ipaddress, DString &Port) { Thread_Lock(); try { if (m_pCTMap == NULL) { m_pCTMap = new ClientsThreadMap(); } do { m_Local = Local; m_busId = busId; m_Ipaddress = Ipaddress; m_Port = Port; if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } ResetEvent(m_ConnectEvt); // Port = DString::From(BUSC::Client::GetDefStatusPort()); m_pBlobReceiver = new BUSC::BLOBReceiver(); if (m_pBlobReceiver == NULL) { printf("EBUS Client malloc Failed.!!!!!\n"); Thread_UnLock(); return false; } if (Local) { if (m_Ipaddress.IsEmpty()) { m_Ipaddress = "127.0.0.1"; } m_pBlobReceiver->SetSourceID(busId); m_pBlobReceiver->SetTcpNoDelay(true); //gaofei m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __) { SetEvent(m_ConnectEvt); }); m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage); m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage); m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage); m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage); m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat); printf("========**** eBusService::Connect Thread 0X%08X \n", GetCurrentThreadId()); //Sleep(10000); m_pBlobReceiver->StartWait(); if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT) { printf("========**** eBusService::Connect Thread Timeout Over 0X%08X \n", GetCurrentThreadId()); if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } printf("EBUS Connect Timeout.!!!!!\n"); Thread_UnLock(); return false; } printf("========**** eBusService::Connect Thread Over 0X%08X \n", GetCurrentThreadId()); m_OfflineHit = 0; m_TickCount = GetTickCount(); m_ConnectionStatus = true; printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); return true; } else if (Local == 0) { //Local:0 //Ethernet Connection //RouterIp:ipaddress //vector theIpList = GetZeroConfigIP(10, 1000); string Ip = Ipaddress; //if RouterIp is empty,eBus will find one . //if eBus found more than one,it will fail and return false. //eth connect only outside machine ip if (Ip.size() != 0 && IsLocalip(Ip) == false) { //if (Ip == "127.0.0.1" || FindRouterIpAddress(theIpList, Ip)) { //we got routerIp,check org&new one //Ip = "192.168.2.77"; m_Ipaddress = Ip.c_str(); { ResetEvent(m_ConnectEvt); m_pBlobReceiver->ConnectTo(m_Ipaddress); m_pBlobReceiver->SetSourceID(busId); //gaofei m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __) { SetEvent(m_ConnectEvt); }); m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage); m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage); m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage); m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage); m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat); m_pBlobReceiver->SetTcpNoDelay(true); m_pBlobReceiver->StartWait(); if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT) { if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } printf("EBUS Connect Timeout.IP:%s!!!!!\n",(const char*)m_Ipaddress); Thread_UnLock(); return false; } m_TickCount = GetTickCount(); m_OfflineHit = 0; m_ConnectionStatus = true; printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); return true; } } } } else { //do nothing } } while (0); } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); //mLog::FDEBUG( exp.what()); } catch (...) { Logger *p = GetGlobalLogger(); //mLog::FDEBUG( "Unknown Exp Happened\n"); } if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } m_TickCount =0; m_ConnectionStatus = false; Thread_UnLock(); return false; } void eBusService::DisConnect() { Thread_Lock(); if (m_pBlobReceiver) { m_pBlobReceiver->StopWait(); delete m_pBlobReceiver; m_pBlobReceiver = NULL; } m_TickCount = 0; m_ConnectionStatus = false; if (m_pCTMap) { m_pCTMap->Disconnected(); } printf("Disconnct Ebus.Local:%d,busId:%s,ServerIp:%s,Port:%s\n",m_Local,(const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port); Thread_UnLock(); } void eBusService::SetLogPath(const char *pPath) { if (m_pBusLog == NULL) { m_pBusLog = CreateLogger(); } if (m_pBusLog) { if (m_pBusLog->IsLogFilePathExist() == false) { m_pBusLog->SetLogFilepath(pPath); } } } void DebugPrintPacket(ResDataObject &packet) { string Context = packet.encode(); printf("ResDataObject:%s\n", Context.c_str()); } bool eBusService::SendSMPacket(const char *pTargetID, const char *pContext, unsigned long long nShareMemID, DWORD ChannelId) { bool ret = true; if (m_ConnectionStatus == false) { return false; } LPVOID pData = NULL; DWORD DataLen = 0; CShareMemory_Block smBlock; if (smBlock.ShareMemoryBlockSet((DWORD)nShareMemID, SM_READWRITE)) { if (smBlock.GetBlockDataAddr(pData, DataLen)) { ret = ((eBus*)m_Parent)->SendPacket(pTargetID, pContext, (const char *)pData, DataLen); //ret = SendPacket(pTargetID, pContext, pData, DataLen, ChannelId); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected //mLog::FERROR("SendSMPacket Failed.offline callback Hited"); DisConnect(); } } } else { ret = false; //mLog::FERROR("CShareMemory_Block::GetBlockDataAddr Failed."); } } else { ret = false; //mLog::FERROR("CShareMemory_Block::ShareMemoryBlockSet Failed."); } return ret; } bool eBusService::SendPacket(const char *pTargetID, const char *pContext, DWORD ChannelId) { bool ret = false; //mLog::FDEBUG("Send Msg Entry"); if (m_ConnectionStatus == false) { return ret; } DString dstrDestID = pTargetID; DString strMsg = pContext; BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()]; BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient(); //if (m_Local == false) { if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } } SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext); #endif //ret = client.Send(dstrDestID, ChannelId, strMsg.data()); ret = pClient->Post(dstrDestID, ChannelId, strMsg); //ret = LocalClient.Post(dstrDestID, ChannelId, strMsg); //mLog::FDEBUG("From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); //mLog::FINFO("Msg: {$}", pContext); if (ret) { m_OfflineHit = 0; } else { //mLog::FERROR("From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); //try again Wrapper->SetConnection(false); pClient = Wrapper->GetBusClient(); if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } ret = pClient->Post(dstrDestID, ChannelId, strMsg); //mLog::FDEBUG("Post Try Again: From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected DisConnect(); } } } #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,after packet send result:%d\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret); #endif return ret; } bool eBusService::SendPacket(const char *pTargetID, const char *pContext, const char *pBlock, DWORD BlockSize, DWORD ChannelId) { bool ret = false; //mLog::FDEBUG("Send BLOCK Entry"); if (m_ConnectionStatus == false) { return ret; } DString dstrDestID = pTargetID; DString strMsg = pContext; //PRINTA_DEBUG(m_Logger, " %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024))); BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()]; BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient(); //if (m_Local == false) { if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } } SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,block packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext); #endif auto fun = [&](BUSC::BUSOutStream & stream) { stream.Write(pBlock, BlockSize); }; ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun); //mLog::FDEBUG("From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { //mLog::FERROR("From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); //try again Wrapper->SetConnection(false); pClient = Wrapper->GetBusClient(); if (Wrapper->IsConnected() == false) { pClient->ConnectTo(m_Ipaddress); pClient->SetTcpNoDelay(true); pClient->SetSourceID(m_busId); Wrapper->SetConnection(true);//suppose connected } ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun); //mLog::FDEBUG("Post Tray Again:From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret); if (ret) { m_OfflineHit = 0; } else { if (m_OfflineHit) { //Disconnected DisConnect(); } } } #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,after block packet send result:%d-------------\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret); #endif //bb.Detach(); return ret; } void eBusService::OnHeartBeat(const void *sender, EventArgs_Null * arg) { //printf("OnlineMessage\n"); if (sender) { //printf("OnlineMessage:%s\n", p); } m_OfflineHit = 0; m_TickCount = GetTickCount(); } void eBusService::OnlineMessage(const void *sender, EventArgs_Null * arg) { if (arg != NULL) { printf("OnlineMessage\n"); } //DisConnect(); //SetEvent(m_ConnectEvt); //just tag it m_OfflineHit = 0; } void eBusService::OnOfflineMessage(const void *sender, EventArgs_Null * arg) { if (arg != NULL) { printf("OnOfflineMessage\n"); } //DisConnect(); //just tag it m_OfflineHit = 1; } void eBusService::Quit() { //BUSC::Client::Quit(); if (m_pCTMap) { m_pCTMap->Clear(); } } RET_STATUS eBusService::BufferToNotify(void * pOemImage, DWORD ImageSize, ImgDataInfo* PImgDataInfo) { RET_STATUS Ret = RET_SUCCEED; //for memory leak test //PImgDataInfo->nShareMemID = 1; //return Ret; const char *pSmName = Get_Circle_SM_Object(ImageSize); CShareMemory_Circle SM; if (SM.Open(pSmName, SM_READWRITE) >= 0) { ShareMemoryBlockID smid = SM.Push(pOemImage, ImageSize, true); if (smid != 0xffffffff) { PImgDataInfo->nShareMemID = smid; } else { //MessageBox(NULL, "Push failed", "Sys Lib", MB_OK); printf("\n Push failed \n"); Ret = RET_FAILED; } SM.Close(); } else { MessageBox(NULL, "Open failed", "Sys Lib", MB_OK); Ret = RET_FAILED; } return Ret; } void eBusService::BlobDataArrived(const char *pMsg, unsigned char *pBlockData, DWORD BlockDataLen) { SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); //printf("time-%02d:%02d:%02d:%03d,Block packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str()); #endif //-----got packet--------------- ResDataObject packet; try { //mLog::FDEBUG( "BlobMsg:Got Packet.BlockLen is {$}", BlockDataLen); //mLog::FINFO( "Blob Packet Context:{\n{$}}", pMsg); if (packet.decode(pMsg)) { //mLog::FDEBUG( "BlobMsg:Decode Packet Succeed"); if (m_Parent) { //check block PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet); if (cmd == PACKET_CMD_DATA) { ImgDataInfo ImgData; ResDataObject resImg; PacketAnalizer::GetPacketContext(&packet, resImg); ImgData.SetResDataObject(resImg); //Need Block Data to Share Memory Thread_Lock(); if (BufferToNotify(pBlockData, BlockDataLen, &ImgData) == RET_SUCCEED) { Thread_UnLock(); //BufferToNotify ok //And Update the Id of memory //mLog::FDEBUG( "BlobMsg:BufferToNotify Succeed"); resImg.clear(); ImgData.GetResDataObject(resImg); if (PacketAnalizer::UpdatePacketContext(packet, resImg)) { //mLog::FDEBUG( "BlobMsg:Dispatch packet to BusThread"); ((eBus*)m_Parent)->PacketArrived(packet); } } else { Thread_UnLock(); } } else { ((eBus*)m_Parent)->PacketArrived(packet); } } } else { //mLog::FDEBUG( "Decode Packet Failed"); } } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); //mLog::FERROR( exp.what()); } catch (...) { Logger *p = GetGlobalLogger(); //mLog::FERROR( "Unknown Exp Happened\n"); } } void eBusService::OnBlobMessage(const void *sender, EventArgs_BUSBLOB * arg) { EventArgs_BUSBLOB * e = (EventArgs_BUSBLOB*)(arg); if (!arg) return; //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 m_OfflineHit = 0; m_TickCount = GetTickCount(); DWORD Command = arg->m_Command; DString SourceID = arg->m_SourceID; DString Message = arg->MsgAs (); DWORD MessageID = arg->m_MessageID;//??? if (Command == CCOS_HW_CHANNEL + 1) { UINT8 *pBlockData = (UINT8*)arg->m_BLOB;//block data buff DWORD BlockDataLen = arg->m_BLOB.GetCount();//block data len?? string strMessage = Message.constBuffer(); BlobDataArrived(strMessage.c_str(), pBlockData, BlockDataLen); } else { //put some log here } //return acks------------------- //message ack //e->m_AckMessage = ""; //blob ack,for what?? //BlockBuffer bb; //e->m_AckBLOB = bb; //Thread_UnLock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 } void eBusService::OnBUSMessage(const void *sender, EventArgs_BUSMessage * arg) { if (!arg) return; //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况 m_OfflineHit = 0; m_TickCount = GetTickCount(); DWORD Command = arg->m_Command; DString SourceID = arg->m_SourceID; DString Message = arg->MsgAs (); DWORD MessageID = arg->m_MessageID; if (Command == BUSCMD_BUSServerConnected) { SetEvent(m_ConnectEvt); } else if (Command == CCOS_HW_CHANNEL) { string strMessage = Message.constBuffer(); SYSTEMTIME st = { 0 }; #ifdef _DEBUG //GetLocalTime(&st); //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); //printf("time-%02d:%02d:%02d:%03d,packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str()); #endif //CBase64::Decode(Message.constBuffer(), strMessage); //-----got packet--------------- ResDataObject packet; try { //mLog::FDEBUG( "Got Msg Packet.TID:{$}",GetCurrentThreadId()); //mLog::FINFO( "Packet Context:{\n{$}}",strMessage.c_str()); if (packet.decode(strMessage.c_str())) { //mLog::FDEBUG( "Decode Packet Succeed"); if (m_Parent) { ((eBus*)m_Parent)->PacketArrived(packet); } } else { //mLog::FDEBUG( "Decode Packet Failed"); } } catch (ResDataObjectExption &exp) { //exp.what() Logger *p = GetGlobalLogger(); //mLog::FERROR( "OnBUSMessage:\n"); //mLog::FERROR( exp.what()); //mLog::FERROR( strMessage.c_str()); } catch (...) { Logger *p = GetGlobalLogger(); //mLog::FERROR( "OnBUSMessage:Unknown Exp Happened\n"); //mLog::FERROR( strMessage.c_str()); } } } void eBusService::UnRegistThread(DWORD Tid) { if (m_pCTMap) { (*m_pCTMap).UnRegistThread(Tid); } } */