eBusService.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997
  1. #define _WINSOCK_DEPRECATED_NO_WARNINGS
  2. /*
  3. #include <vector>
  4. #include <string>
  5. #include "eBusService.h"
  6. #include "eBus.h"
  7. #include "base64.h"
  8. #include "Logger.h"
  9. #include "PacketAnalizer.h"
  10. #include "shareMemWR.h"
  11. #include "P2PModule.h"
  12. #include "ShareMemory_IF.h"
  13. #include "ShareMemory_Block.h"
  14. #include "common_api.h"
  15. using namespace std;
  16. string GeteBusLogPath()
  17. {
  18. string ret = "";
  19. char szFilename[MAX_PATH] = { 0 };
  20. DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH);
  21. if (res == 0)
  22. {
  23. return ret;
  24. }
  25. string fullpath = szFilename;
  26. string FileTitle = GetFileTitle(fullpath);
  27. string logfile = GetProcessDirectory() + "\\logs\\" + FileTitle;
  28. SYSTEMTIME st = { 0 };
  29. GetLocalTime(&st);
  30. //日,时,分,秒,毫
  31. sprintf_s(szFilename, "_%u_%02d%02d%02d%02d%03d.log",GetCurrentProcessId(),st.wDay,st.wHour, st.wMinute,st.wSecond,st.wMilliseconds);
  32. logfile += szFilename;
  33. return logfile;
  34. }
  35. void IniteBusService(bool EnableLog)
  36. {
  37. string logfile = GeteBusLogPath();
  38. BUSC::Client::OpenLogger(logfile.c_str());
  39. BUSC::Client::Prepare();
  40. }
  41. void QuiteBusService()
  42. {
  43. BUSC::Client::Quit();
  44. }
  45. bool FindRouterIpAddress(vector<string> &zeroConfigIp, string &serverIp)
  46. {
  47. if (zeroConfigIp.size() == 0)
  48. {
  49. //err log
  50. return false;
  51. }
  52. //zc ip is specific
  53. if ((serverIp).size() > 0)
  54. {
  55. for (DWORD i = 0; i < zeroConfigIp.size(); i++)
  56. {
  57. if (zeroConfigIp[i] == (serverIp))
  58. {
  59. //good to go
  60. return true;
  61. }
  62. }
  63. //no match
  64. return false;
  65. }
  66. //no optional ip or no match zc server
  67. //connect anyway if only one zc server exists
  68. if (zeroConfigIp.size() == 1)
  69. {
  70. //put some warning log here
  71. serverIp = zeroConfigIp[0];
  72. return true;
  73. }
  74. //multiple router...
  75. if (zeroConfigIp.size() > 1)
  76. {
  77. //put some warning log here
  78. //warning here
  79. //serverIp = zeroConfigIp[0];
  80. return false;
  81. }
  82. //err log
  83. return false;
  84. }
  85. bool IsLocalip(string &ServerIp)
  86. {
  87. //for test
  88. string Temp;
  89. struct hostent* pHost;
  90. static string ExIp = "127.0.0.1";
  91. if (ServerIp == ExIp)
  92. {
  93. return true;
  94. }
  95. char cHostName[128] = { 0 };
  96. gethostname(cHostName, sizeof(cHostName));
  97. if (cHostName[0] != 0)
  98. {
  99. pHost = gethostbyname(cHostName);
  100. if (pHost)
  101. {
  102. for (int i = 0; pHost->h_addr_list[i] != NULL; i++)
  103. {
  104. Temp = "";
  105. Temp = inet_ntoa(*(struct in_addr *)pHost->h_addr_list[i]);
  106. Temp = ReplaceSubString(Temp, string(" "), string(""));//trim space
  107. if (Temp == ServerIp)
  108. {
  109. return true;
  110. }
  111. }
  112. }
  113. }
  114. return false;
  115. }
  116. eBusService::eBusService(void)
  117. {
  118. m_TickCount = 0;
  119. m_OfflineHit = 0;
  120. m_ConnectionStatus = false;
  121. m_Parent = NULL;
  122. m_pBlobReceiver = NULL;
  123. m_pBusLog = NULL;
  124. m_pCTMap = NULL;
  125. m_ConnectEvt = CreateEvent(0, 0, 0, 0);
  126. }
  127. eBusService::~eBusService(void)
  128. {
  129. if (m_pBlobReceiver)
  130. {
  131. m_pBlobReceiver->StopWait();
  132. delete m_pBlobReceiver;
  133. m_pBlobReceiver = NULL;
  134. }
  135. if (m_pBusLog)
  136. {
  137. ReleseLogger(m_pBusLog);
  138. }
  139. if (m_pCTMap)
  140. {
  141. delete m_pCTMap;
  142. m_pCTMap = NULL;
  143. }
  144. CloseHandle(m_ConnectEvt);
  145. }
  146. void eBusService::SetParent(HANDLE Parent)
  147. {
  148. m_Parent = Parent;
  149. }
  150. bool eBusService::IsConnected()
  151. {
  152. if (m_ConnectionStatus)
  153. {
  154. if ((GetTickCount() - m_TickCount) > (120 * 1000))
  155. {
  156. printf("Hearbeat Timeout.CurTick:%d,LastTick:%d\n", GetTickCount(), m_TickCount);
  157. DisConnect();
  158. }
  159. }
  160. return m_ConnectionStatus;
  161. }
  162. bool eBusService::Connect(bool Local, DString &busId, DString &Ipaddress, DString &Port)
  163. {
  164. Thread_Lock();
  165. try {
  166. if (m_pCTMap == NULL)
  167. {
  168. m_pCTMap = new ClientsThreadMap();
  169. }
  170. do {
  171. m_Local = Local;
  172. m_busId = busId;
  173. m_Ipaddress = Ipaddress;
  174. m_Port = Port;
  175. if (m_pBlobReceiver)
  176. {
  177. m_pBlobReceiver->StopWait();
  178. delete m_pBlobReceiver;
  179. m_pBlobReceiver = NULL;
  180. }
  181. ResetEvent(m_ConnectEvt);
  182. // Port = DString::From(BUSC::Client::GetDefStatusPort());
  183. m_pBlobReceiver = new BUSC::BLOBReceiver();
  184. if (m_pBlobReceiver == NULL)
  185. {
  186. printf("EBUS Client malloc Failed.!!!!!\n");
  187. Thread_UnLock();
  188. return false;
  189. }
  190. if (Local)
  191. {
  192. if (m_Ipaddress.IsEmpty())
  193. {
  194. m_Ipaddress = "127.0.0.1";
  195. }
  196. m_pBlobReceiver->SetSourceID(busId);
  197. m_pBlobReceiver->SetTcpNoDelay(true);
  198. //gaofei
  199. m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
  200. {
  201. SetEvent(m_ConnectEvt);
  202. });
  203. m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
  204. m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
  205. m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
  206. m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
  207. m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
  208. printf("========**** eBusService::Connect Thread 0X%08X \n", GetCurrentThreadId());
  209. //Sleep(10000);
  210. m_pBlobReceiver->StartWait();
  211. if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
  212. {
  213. printf("========**** eBusService::Connect Thread Timeout Over 0X%08X \n", GetCurrentThreadId());
  214. if (m_pBlobReceiver)
  215. {
  216. m_pBlobReceiver->StopWait();
  217. delete m_pBlobReceiver;
  218. m_pBlobReceiver = NULL;
  219. }
  220. printf("EBUS Connect Timeout.!!!!!\n");
  221. Thread_UnLock();
  222. return false;
  223. }
  224. printf("========**** eBusService::Connect Thread Over 0X%08X \n", GetCurrentThreadId());
  225. m_OfflineHit = 0;
  226. m_TickCount = GetTickCount();
  227. m_ConnectionStatus = true;
  228. printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  229. Thread_UnLock();
  230. return true;
  231. }
  232. else if (Local == 0)
  233. {
  234. //Local:0
  235. //Ethernet Connection
  236. //RouterIp:ipaddress
  237. //vector<string> theIpList = GetZeroConfigIP(10, 1000);
  238. string Ip = Ipaddress;
  239. //if RouterIp is empty,eBus will find one .
  240. //if eBus found more than one,it will fail and return false.
  241. //eth connect only outside machine ip
  242. if (Ip.size() != 0 && IsLocalip(Ip) == false)
  243. {
  244. //if (Ip == "127.0.0.1" || FindRouterIpAddress(theIpList, Ip))
  245. {
  246. //we got routerIp,check org&new one
  247. //Ip = "192.168.2.77";
  248. m_Ipaddress = Ip.c_str();
  249. {
  250. ResetEvent(m_ConnectEvt);
  251. m_pBlobReceiver->ConnectTo(m_Ipaddress);
  252. m_pBlobReceiver->SetSourceID(busId);
  253. //gaofei
  254. m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
  255. {
  256. SetEvent(m_ConnectEvt);
  257. });
  258. m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
  259. m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
  260. m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
  261. m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
  262. m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
  263. m_pBlobReceiver->SetTcpNoDelay(true);
  264. m_pBlobReceiver->StartWait();
  265. if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
  266. {
  267. if (m_pBlobReceiver)
  268. {
  269. m_pBlobReceiver->StopWait();
  270. delete m_pBlobReceiver;
  271. m_pBlobReceiver = NULL;
  272. }
  273. printf("EBUS Connect Timeout.IP:%s!!!!!\n",(const char*)m_Ipaddress);
  274. Thread_UnLock();
  275. return false;
  276. }
  277. m_TickCount = GetTickCount();
  278. m_OfflineHit = 0;
  279. m_ConnectionStatus = true;
  280. printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  281. Thread_UnLock();
  282. return true;
  283. }
  284. }
  285. }
  286. }
  287. else
  288. {
  289. //do nothing
  290. }
  291. } while (0);
  292. }
  293. catch (ResDataObjectExption &exp)
  294. {
  295. //exp.what()
  296. Logger *p = GetGlobalLogger();
  297. //mLog::FDEBUG( exp.what());
  298. }
  299. catch (...)
  300. {
  301. Logger *p = GetGlobalLogger();
  302. //mLog::FDEBUG( "Unknown Exp Happened\n");
  303. }
  304. if (m_pBlobReceiver)
  305. {
  306. m_pBlobReceiver->StopWait();
  307. delete m_pBlobReceiver;
  308. m_pBlobReceiver = NULL;
  309. }
  310. m_TickCount =0;
  311. m_ConnectionStatus = false;
  312. Thread_UnLock();
  313. return false;
  314. }
  315. void eBusService::DisConnect()
  316. {
  317. Thread_Lock();
  318. if (m_pBlobReceiver)
  319. {
  320. m_pBlobReceiver->StopWait();
  321. delete m_pBlobReceiver;
  322. m_pBlobReceiver = NULL;
  323. }
  324. m_TickCount = 0;
  325. m_ConnectionStatus = false;
  326. if (m_pCTMap)
  327. {
  328. m_pCTMap->Disconnected();
  329. }
  330. printf("Disconnct Ebus.Local:%d,busId:%s,ServerIp:%s,Port:%s\n",m_Local,(const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  331. Thread_UnLock();
  332. }
  333. void eBusService::SetLogPath(const char *pPath)
  334. {
  335. if (m_pBusLog == NULL)
  336. {
  337. m_pBusLog = CreateLogger();
  338. }
  339. if (m_pBusLog)
  340. {
  341. if (m_pBusLog->IsLogFilePathExist() == false)
  342. {
  343. m_pBusLog->SetLogFilepath(pPath);
  344. }
  345. }
  346. }
  347. void DebugPrintPacket(ResDataObject &packet)
  348. {
  349. string Context = packet.encode();
  350. printf("ResDataObject:%s\n", Context.c_str());
  351. }
  352. bool eBusService::SendSMPacket(const char *pTargetID, const char *pContext, unsigned long long nShareMemID, DWORD ChannelId)
  353. {
  354. bool ret = true;
  355. if (m_ConnectionStatus == false)
  356. {
  357. return false;
  358. }
  359. LPVOID pData = NULL;
  360. DWORD DataLen = 0;
  361. CShareMemory_Block smBlock;
  362. if (smBlock.ShareMemoryBlockSet((DWORD)nShareMemID, SM_READWRITE))
  363. {
  364. if (smBlock.GetBlockDataAddr(pData, DataLen))
  365. {
  366. ret = ((eBus*)m_Parent)->SendPacket(pTargetID, pContext, (const char *)pData, DataLen);
  367. //ret = SendPacket(pTargetID, pContext, pData, DataLen, ChannelId);
  368. if (ret)
  369. {
  370. m_OfflineHit = 0;
  371. }
  372. else
  373. {
  374. if (m_OfflineHit)
  375. {
  376. //Disconnected
  377. //mLog::FERROR("SendSMPacket Failed.offline callback Hited");
  378. DisConnect();
  379. }
  380. }
  381. }
  382. else
  383. {
  384. ret = false;
  385. //mLog::FERROR("CShareMemory_Block::GetBlockDataAddr Failed.");
  386. }
  387. }
  388. else
  389. {
  390. ret = false;
  391. //mLog::FERROR("CShareMemory_Block::ShareMemoryBlockSet Failed.");
  392. }
  393. return ret;
  394. }
  395. bool eBusService::SendPacket(const char *pTargetID, const char *pContext, DWORD ChannelId)
  396. {
  397. bool ret = false;
  398. //mLog::FDEBUG("Send Msg Entry");
  399. if (m_ConnectionStatus == false)
  400. {
  401. return ret;
  402. }
  403. DString dstrDestID = pTargetID;
  404. DString strMsg = pContext;
  405. BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
  406. BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
  407. //if (m_Local == false)
  408. {
  409. if (Wrapper->IsConnected() == false)
  410. {
  411. pClient->ConnectTo(m_Ipaddress);
  412. pClient->SetTcpNoDelay(true);
  413. pClient->SetSourceID(m_busId);
  414. Wrapper->SetConnection(true);//suppose connected
  415. }
  416. }
  417. SYSTEMTIME st = { 0 };
  418. #ifdef _DEBUG
  419. //GetLocalTime(&st);
  420. //printf("time-%02d:%02d:%02d:%03d,packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
  421. #endif
  422. //ret = client.Send(dstrDestID, ChannelId, strMsg.data());
  423. ret = pClient->Post(dstrDestID, ChannelId, strMsg);
  424. //ret = LocalClient.Post(dstrDestID, ChannelId, strMsg);
  425. //mLog::FDEBUG("From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  426. //mLog::FINFO("Msg: {$}", pContext);
  427. if (ret)
  428. {
  429. m_OfflineHit = 0;
  430. }
  431. else
  432. {
  433. //mLog::FERROR("From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  434. //try again
  435. Wrapper->SetConnection(false);
  436. pClient = Wrapper->GetBusClient();
  437. if (Wrapper->IsConnected() == false)
  438. {
  439. pClient->ConnectTo(m_Ipaddress);
  440. pClient->SetTcpNoDelay(true);
  441. pClient->SetSourceID(m_busId);
  442. Wrapper->SetConnection(true);//suppose connected
  443. }
  444. ret = pClient->Post(dstrDestID, ChannelId, strMsg);
  445. //mLog::FDEBUG("Post Try Again: From {$},Send Msg To {$},Channel:{$},OffLine:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  446. if (ret)
  447. {
  448. m_OfflineHit = 0;
  449. }
  450. else
  451. {
  452. if (m_OfflineHit)
  453. {
  454. //Disconnected
  455. DisConnect();
  456. }
  457. }
  458. }
  459. #ifdef _DEBUG
  460. //GetLocalTime(&st);
  461. //printf("time-%02d:%02d:%02d:%03d,after packet send result:%d\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
  462. #endif
  463. return ret;
  464. }
  465. bool eBusService::SendPacket(const char *pTargetID, const char *pContext, const char *pBlock, DWORD BlockSize, DWORD ChannelId)
  466. {
  467. bool ret = false;
  468. //mLog::FDEBUG("Send BLOCK Entry");
  469. if (m_ConnectionStatus == false)
  470. {
  471. return ret;
  472. }
  473. DString dstrDestID = pTargetID;
  474. DString strMsg = pContext;
  475. //PRINTA_DEBUG(m_Logger, "<eBusMessenger Send> %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024)));
  476. BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
  477. BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
  478. //if (m_Local == false)
  479. {
  480. if (Wrapper->IsConnected() == false)
  481. {
  482. pClient->ConnectTo(m_Ipaddress);
  483. pClient->SetTcpNoDelay(true);
  484. pClient->SetSourceID(m_busId);
  485. Wrapper->SetConnection(true);//suppose connected
  486. }
  487. }
  488. SYSTEMTIME st = { 0 };
  489. #ifdef _DEBUG
  490. //GetLocalTime(&st);
  491. //printf("time-%02d:%02d:%02d:%03d,block packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
  492. #endif
  493. auto fun = [&](BUSC::BUSOutStream & stream)
  494. {
  495. stream.Write(pBlock, BlockSize);
  496. };
  497. ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
  498. //mLog::FDEBUG("From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  499. if (ret)
  500. {
  501. m_OfflineHit = 0;
  502. }
  503. else
  504. {
  505. //mLog::FERROR("From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  506. //try again
  507. Wrapper->SetConnection(false);
  508. pClient = Wrapper->GetBusClient();
  509. if (Wrapper->IsConnected() == false)
  510. {
  511. pClient->ConnectTo(m_Ipaddress);
  512. pClient->SetTcpNoDelay(true);
  513. pClient->SetSourceID(m_busId);
  514. Wrapper->SetConnection(true);//suppose connected
  515. }
  516. ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
  517. //mLog::FDEBUG("Post Tray Again:From {$},Send BLOCK To {$},Channel:{$},Offline:{$},Res:{$}", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  518. if (ret)
  519. {
  520. m_OfflineHit = 0;
  521. }
  522. else
  523. {
  524. if (m_OfflineHit)
  525. {
  526. //Disconnected
  527. DisConnect();
  528. }
  529. }
  530. }
  531. #ifdef _DEBUG
  532. //GetLocalTime(&st);
  533. //printf("time-%02d:%02d:%02d:%03d,after block packet send result:%d-------------\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
  534. #endif
  535. //bb.Detach();
  536. return ret;
  537. }
  538. void eBusService::OnHeartBeat(const void *sender, EventArgs_Null * arg)
  539. {
  540. //printf("OnlineMessage\n");
  541. if (sender)
  542. {
  543. //printf("OnlineMessage:%s\n", p);
  544. }
  545. m_OfflineHit = 0;
  546. m_TickCount = GetTickCount();
  547. }
  548. void eBusService::OnlineMessage(const void *sender, EventArgs_Null * arg)
  549. {
  550. if (arg != NULL)
  551. {
  552. printf("OnlineMessage\n");
  553. }
  554. //DisConnect();
  555. //SetEvent(m_ConnectEvt);
  556. //just tag it
  557. m_OfflineHit = 0;
  558. }
  559. void eBusService::OnOfflineMessage(const void *sender, EventArgs_Null * arg)
  560. {
  561. if (arg != NULL)
  562. {
  563. printf("OnOfflineMessage\n");
  564. }
  565. //DisConnect();
  566. //just tag it
  567. m_OfflineHit = 1;
  568. }
  569. void eBusService::Quit()
  570. {
  571. //BUSC::Client::Quit();
  572. if (m_pCTMap)
  573. {
  574. m_pCTMap->Clear();
  575. }
  576. }
  577. RET_STATUS eBusService::BufferToNotify(void * pOemImage, DWORD ImageSize, ImgDataInfo* PImgDataInfo)
  578. {
  579. RET_STATUS Ret = RET_SUCCEED;
  580. //for memory leak test
  581. //PImgDataInfo->nShareMemID = 1;
  582. //return Ret;
  583. const char *pSmName = Get_Circle_SM_Object(ImageSize);
  584. CShareMemory_Circle SM;
  585. if (SM.Open(pSmName, SM_READWRITE) >= 0)
  586. {
  587. ShareMemoryBlockID smid = SM.Push(pOemImage, ImageSize, true);
  588. if (smid != 0xffffffff)
  589. {
  590. PImgDataInfo->nShareMemID = smid;
  591. }
  592. else
  593. {
  594. //MessageBox(NULL, "Push failed", "Sys Lib", MB_OK);
  595. printf("\n Push failed \n");
  596. Ret = RET_FAILED;
  597. }
  598. SM.Close();
  599. }
  600. else
  601. {
  602. MessageBox(NULL, "Open failed", "Sys Lib", MB_OK);
  603. Ret = RET_FAILED;
  604. }
  605. return Ret;
  606. }
  607. void eBusService::BlobDataArrived(const char *pMsg, unsigned char *pBlockData, DWORD BlockDataLen)
  608. {
  609. SYSTEMTIME st = { 0 };
  610. #ifdef _DEBUG
  611. //GetLocalTime(&st);
  612. //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  613. //printf("time-%02d:%02d:%02d:%03d,Block packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
  614. #endif
  615. //-----got packet---------------
  616. ResDataObject packet;
  617. try {
  618. //mLog::FDEBUG( "BlobMsg:Got Packet.BlockLen is {$}", BlockDataLen);
  619. //mLog::FINFO( "Blob Packet Context:{\n{$}}", pMsg);
  620. if (packet.decode(pMsg))
  621. {
  622. //mLog::FDEBUG( "BlobMsg:Decode Packet Succeed");
  623. if (m_Parent)
  624. {
  625. //check block
  626. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
  627. if (cmd == PACKET_CMD_DATA)
  628. {
  629. ImgDataInfo ImgData;
  630. ResDataObject resImg;
  631. PacketAnalizer::GetPacketContext(&packet, resImg);
  632. ImgData.SetResDataObject(resImg);
  633. //Need Block Data to Share Memory
  634. Thread_Lock();
  635. if (BufferToNotify(pBlockData, BlockDataLen, &ImgData) == RET_SUCCEED)
  636. {
  637. Thread_UnLock();
  638. //BufferToNotify ok
  639. //And Update the Id of memory
  640. //mLog::FDEBUG( "BlobMsg:BufferToNotify Succeed");
  641. resImg.clear();
  642. ImgData.GetResDataObject(resImg);
  643. if (PacketAnalizer::UpdatePacketContext(packet, resImg))
  644. {
  645. //mLog::FDEBUG( "BlobMsg:Dispatch packet to BusThread");
  646. ((eBus*)m_Parent)->PacketArrived(packet);
  647. }
  648. }
  649. else
  650. {
  651. Thread_UnLock();
  652. }
  653. }
  654. else
  655. {
  656. ((eBus*)m_Parent)->PacketArrived(packet);
  657. }
  658. }
  659. }
  660. else
  661. {
  662. //mLog::FDEBUG( "Decode Packet Failed");
  663. }
  664. }
  665. catch (ResDataObjectExption &exp)
  666. {
  667. //exp.what()
  668. Logger *p = GetGlobalLogger();
  669. //mLog::FERROR( exp.what());
  670. }
  671. catch (...)
  672. {
  673. Logger *p = GetGlobalLogger();
  674. //mLog::FERROR( "Unknown Exp Happened\n");
  675. }
  676. }
  677. void eBusService::OnBlobMessage(const void *sender, EventArgs_BUSBLOB * arg)
  678. {
  679. EventArgs_BUSBLOB * e = (EventArgs_BUSBLOB*)(arg);
  680. if (!arg) return;
  681. //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  682. m_OfflineHit = 0;
  683. m_TickCount = GetTickCount();
  684. DWORD Command = arg->m_Command;
  685. DString SourceID = arg->m_SourceID;
  686. DString Message = arg->MsgAs <DString>();
  687. DWORD MessageID = arg->m_MessageID;//???
  688. if (Command == CCOS_HW_CHANNEL + 1)
  689. {
  690. UINT8 *pBlockData = (UINT8*)arg->m_BLOB;//block data buff
  691. DWORD BlockDataLen = arg->m_BLOB.GetCount();//block data len??
  692. string strMessage = Message.constBuffer();
  693. BlobDataArrived(strMessage.c_str(), pBlockData, BlockDataLen);
  694. }
  695. else
  696. {
  697. //put some log here
  698. }
  699. //return acks-------------------
  700. //message ack
  701. //e->m_AckMessage = "";
  702. //blob ack,for what??
  703. //BlockBuffer bb;
  704. //e->m_AckBLOB = bb;
  705. //Thread_UnLock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  706. }
  707. void eBusService::OnBUSMessage(const void *sender, EventArgs_BUSMessage * arg)
  708. {
  709. if (!arg) return;
  710. //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  711. m_OfflineHit = 0;
  712. m_TickCount = GetTickCount();
  713. DWORD Command = arg->m_Command;
  714. DString SourceID = arg->m_SourceID;
  715. DString Message = arg->MsgAs <DString>();
  716. DWORD MessageID = arg->m_MessageID;
  717. if (Command == BUSCMD_BUSServerConnected)
  718. {
  719. SetEvent(m_ConnectEvt);
  720. }
  721. else if (Command == CCOS_HW_CHANNEL)
  722. {
  723. string strMessage = Message.constBuffer();
  724. SYSTEMTIME st = { 0 };
  725. #ifdef _DEBUG
  726. //GetLocalTime(&st);
  727. //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  728. //printf("time-%02d:%02d:%02d:%03d,packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
  729. #endif
  730. //CBase64::Decode(Message.constBuffer(), strMessage);
  731. //-----got packet---------------
  732. ResDataObject packet;
  733. try {
  734. //mLog::FDEBUG( "Got Msg Packet.TID:{$}",GetCurrentThreadId());
  735. //mLog::FINFO( "Packet Context:{\n{$}}",strMessage.c_str());
  736. if (packet.decode(strMessage.c_str()))
  737. {
  738. //mLog::FDEBUG( "Decode Packet Succeed");
  739. if (m_Parent)
  740. {
  741. ((eBus*)m_Parent)->PacketArrived(packet);
  742. }
  743. }
  744. else
  745. {
  746. //mLog::FDEBUG( "Decode Packet Failed");
  747. }
  748. }
  749. catch (ResDataObjectExption &exp)
  750. {
  751. //exp.what()
  752. Logger *p = GetGlobalLogger();
  753. //mLog::FERROR( "OnBUSMessage:\n");
  754. //mLog::FERROR( exp.what());
  755. //mLog::FERROR( strMessage.c_str());
  756. }
  757. catch (...)
  758. {
  759. Logger *p = GetGlobalLogger();
  760. //mLog::FERROR( "OnBUSMessage:Unknown Exp Happened\n");
  761. //mLog::FERROR( strMessage.c_str());
  762. }
  763. }
  764. }
  765. void eBusService::UnRegistThread(DWORD Tid)
  766. {
  767. if (m_pCTMap)
  768. {
  769. (*m_pCTMap).UnRegistThread(Tid);
  770. }
  771. }
  772. */