eBusService.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203
  1. #include "stdafx.h"
  2. #define _WINSOCK_DEPRECATED_NO_WARNINGS
  3. #include <Winsock2.h>
  4. #include <vector>
  5. #include <string>
  6. #include "eBusService.h"
  7. #include "eBus.h"
  8. #include "base64.h"
  9. #include "Logger.h"
  10. #include "PacketAnalizer.h"
  11. #include "shareMemWR.h"
  12. #include "P2PModule.h"
  13. #include "ShareMemory_IF.h"
  14. #include "ShareMemory_Block.h"
  15. //#include "common_api.h"
  16. using namespace std;
  17. string GetFileTitle(string &fullpath)
  18. {
  19. string ret = "";
  20. string::size_type firstHit = fullpath.find_last_of('\\');
  21. string::size_type firstHitSec = fullpath.find_last_of('/');
  22. if ((firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size())) && (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size())))
  23. {
  24. return ret;
  25. }
  26. if (firstHit == string::npos || firstHit == 0 || (firstHit + 1 == fullpath.size()))
  27. {
  28. ret = fullpath.substr(firstHitSec + 1);
  29. }
  30. else if (firstHitSec == string::npos || firstHitSec == 0 || (firstHitSec + 1 == fullpath.size()))
  31. {
  32. ret = fullpath.substr(firstHit + 1);
  33. }
  34. else
  35. {
  36. //effective both
  37. if (firstHit > firstHitSec)
  38. {
  39. ret = fullpath.substr(firstHit + 1);//kick last
  40. }
  41. else
  42. {
  43. ret = fullpath.substr(firstHitSec + 1);//kick last
  44. }
  45. }
  46. //got filetitle with ext
  47. if (ret.size() > 0)
  48. {
  49. string::size_type firstHit = ret.find_first_of('.');
  50. ret = ret.substr(0, firstHit);
  51. }
  52. return ret;
  53. }
  54. /*
  55. string GetProcessDirectory()
  56. {
  57. string ret = "";
  58. char szFilename[MAX_PATH] = { 0 };
  59. DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH);
  60. if (res == 0)
  61. {
  62. return ret;
  63. }
  64. string fullpath = szFilename;
  65. string::size_type firstHit = fullpath.find_last_of('\\');
  66. if (firstHit == string::npos || firstHit == 0)
  67. {
  68. return ret;
  69. }
  70. ret = fullpath.substr(0, firstHit);//kick last \
  71. return ret;
  72. }*/
  73. string GeteBusLogPath()
  74. {
  75. string ret = "";
  76. char szFilename[MAX_PATH] = { 0 };
  77. DWORD res = GetModuleFileNameA(0, szFilename, MAX_PATH);
  78. if (res == 0)
  79. {
  80. return ret;
  81. }
  82. string fullpath = szFilename;
  83. string FileTitle = GetFileTitle(fullpath);
  84. string logfile = GetProcessDirectory() + "\\logs\\" + FileTitle;
  85. SYSTEMTIME st = { 0 };
  86. GetLocalTime(&st);
  87. //日,时,分,秒,毫
  88. sprintf_s(szFilename, "_%u_%02d%02d%02d%02d%03d.log",GetCurrentProcessId(),st.wDay,st.wHour, st.wMinute,st.wSecond,st.wMilliseconds);
  89. logfile += szFilename;
  90. return logfile;
  91. }
  92. void IniteBusService(bool EnableLog)
  93. {
  94. string logfile = GeteBusLogPath();
  95. BUSC::Client::OpenLogger(logfile.c_str());
  96. BUSC::Client::Prepare();
  97. }
  98. void QuiteBusService()
  99. {
  100. BUSC::Client::Quit();
  101. }
  102. vector<string> GetZeroConfigIP(DWORD trytime, DWORD timeperiod)
  103. {
  104. vector<string> theList;
  105. //string local = "";
  106. //for (DWORD i = 0; i < trytime; i++)
  107. //{
  108. // Array <ECOM::ZeroConfig::ServiceNode> arNode;
  109. // ECOM::ZeroConfig::ZeroConfigClient::QueryAll(ECOM::ZeroConfig::ServiceTypeID::CCOSHardware, &arNode);
  110. // for (Array <ECOM::ZeroConfig::ServiceNode>::Iterator Iter(arNode); Iter; Iter++)
  111. // {
  112. // ECOM::ZeroConfig::ServiceNode & Node = Iter();
  113. // //PRINTA_INFO(m_Logger, "Got ZeroConfig IPAddress : %s\n", Node.IPAddress);
  114. // local = Node.IPAddress;
  115. // theList.push_back(local);
  116. // }
  117. // if (theList.size() > 0)
  118. // {
  119. // return theList;
  120. // }
  121. // Sleep(timeperiod);
  122. //}
  123. return theList;
  124. }
  125. bool FindRouterIpAddress(vector<string> &zeroConfigIp, string &serverIp)
  126. {
  127. if (zeroConfigIp.size() == 0)
  128. {
  129. //err log
  130. return false;
  131. }
  132. //zc ip is specific
  133. if ((serverIp).size() > 0)
  134. {
  135. for (DWORD i = 0; i < zeroConfigIp.size(); i++)
  136. {
  137. if (zeroConfigIp[i] == (serverIp))
  138. {
  139. //good to go
  140. return true;
  141. }
  142. }
  143. //no match
  144. return false;
  145. }
  146. //no optional ip or no match zc server
  147. //connect anyway if only one zc server exists
  148. if (zeroConfigIp.size() == 1)
  149. {
  150. //put some warning log here
  151. serverIp = zeroConfigIp[0];
  152. return true;
  153. }
  154. //multiple router...
  155. if (zeroConfigIp.size() > 1)
  156. {
  157. //put some warning log here
  158. //warning here
  159. //serverIp = zeroConfigIp[0];
  160. return false;
  161. }
  162. //err log
  163. return false;
  164. }
  165. string ReplaceSubString(string org, string &keystr, string &replacestr)
  166. {
  167. std::string::size_type pos = 0;
  168. std::string::size_type keylen = keystr.size();
  169. std::string::size_type replen = replacestr.size();
  170. while ((pos = org.find(keystr, pos)) != std::string::npos)
  171. {
  172. org.replace(pos, keylen, replacestr);
  173. pos += replen;
  174. }
  175. return org;
  176. }
  177. bool IsLocalip(string &ServerIp)
  178. {
  179. //for test
  180. string Temp;
  181. struct hostent* pHost;
  182. static string ExIp = "127.0.0.1";
  183. if (ServerIp == ExIp)
  184. {
  185. return true;
  186. }
  187. char cHostName[128] = { 0 };
  188. gethostname(cHostName, sizeof(cHostName));
  189. if (cHostName[0] != 0)
  190. {
  191. pHost = gethostbyname(cHostName);
  192. if (pHost)
  193. {
  194. for (int i = 0; pHost->h_addr_list[i] != NULL; i++)
  195. {
  196. Temp = "";
  197. Temp = inet_ntoa(*(struct in_addr *)pHost->h_addr_list[i]);
  198. Temp = ReplaceSubString(Temp, string(" "), string(""));//trim space
  199. if (Temp == ServerIp)
  200. {
  201. return true;
  202. }
  203. }
  204. }
  205. }
  206. return false;
  207. }
  208. eBusService::eBusService(void)
  209. {
  210. m_TickCount = 0;
  211. m_OfflineHit = 0;
  212. m_ConnectionStatus = false;
  213. m_Parent = NULL;
  214. m_pBlobReceiver = NULL;
  215. m_pBusLog = NULL;
  216. m_pCTMap = NULL;
  217. m_ConnectEvt = CreateEvent(0, 0, 0, 0);
  218. }
  219. eBusService::~eBusService(void)
  220. {
  221. if (m_pBlobReceiver)
  222. {
  223. m_pBlobReceiver->StopWait();
  224. delete m_pBlobReceiver;
  225. m_pBlobReceiver = NULL;
  226. }
  227. if (m_pBusLog)
  228. {
  229. ReleseLogger(m_pBusLog);
  230. }
  231. if (m_pCTMap)
  232. {
  233. delete m_pCTMap;
  234. m_pCTMap = NULL;
  235. }
  236. CloseHandle(m_ConnectEvt);
  237. }
  238. void eBusService::SetParent(HANDLE Parent)
  239. {
  240. m_Parent = Parent;
  241. }
  242. bool eBusService::IsConnected()
  243. {
  244. if (m_ConnectionStatus)
  245. {
  246. if ((GetTickCount() - m_TickCount) > (120 * 1000))
  247. {
  248. printf("Hearbeat Timeout.CurTick:%d,LastTick:%d\n", GetTickCount(), m_TickCount);
  249. DisConnect();
  250. }
  251. }
  252. return m_ConnectionStatus;
  253. }
  254. bool eBusService::Connect(bool Local, DString &busId, DString &Ipaddress, DString &Port)
  255. {
  256. Thread_Lock();
  257. try {
  258. if (m_pCTMap == NULL)
  259. {
  260. m_pCTMap = new ClientsThreadMap();
  261. }
  262. do {
  263. m_Local = Local;
  264. m_busId = busId;
  265. m_Ipaddress = Ipaddress;
  266. m_Port = Port;
  267. if (m_pBlobReceiver)
  268. {
  269. m_pBlobReceiver->StopWait();
  270. delete m_pBlobReceiver;
  271. m_pBlobReceiver = NULL;
  272. }
  273. ResetEvent(m_ConnectEvt);
  274. // Port = DString::From(BUSC::Client::GetDefStatusPort());
  275. m_pBlobReceiver = new BUSC::BLOBReceiver();
  276. if (m_pBlobReceiver == NULL)
  277. {
  278. printf("EBUS Client malloc Failed.!!!!!\n");
  279. Thread_UnLock();
  280. return false;
  281. }
  282. if (Local)
  283. {
  284. //Local Connection
  285. //Local:1
  286. //Port:eBusDefault or selfdefine
  287. //srcBusId:BusId
  288. //BUSC::MessageClient::SetLogFileName("AlphaLog.log");
  289. //BUSC::MessageClient::Prepare();
  290. //m_pBlobReceiver->Prepare();
  291. if (m_Ipaddress.IsEmpty())
  292. {
  293. m_Ipaddress = "127.0.0.1";
  294. }
  295. m_pBlobReceiver->SetSourceID(busId);
  296. m_pBlobReceiver->SetTcpNoDelay(true);
  297. //gaofei
  298. m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
  299. {
  300. SetEvent(m_ConnectEvt);
  301. });
  302. m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
  303. m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
  304. m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
  305. m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
  306. m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
  307. printf("========**** eBusService::Connect Thread 0X%08X \n", GetCurrentThreadId());
  308. Sleep(10000);
  309. m_pBlobReceiver->StartWait();
  310. if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
  311. {
  312. printf("========**** eBusService::Connect Thread Timeout Over 0X%08X \n", GetCurrentThreadId());
  313. if (m_pBlobReceiver)
  314. {
  315. m_pBlobReceiver->StopWait();
  316. delete m_pBlobReceiver;
  317. m_pBlobReceiver = NULL;
  318. }
  319. printf("EBUS Connect Timeout.!!!!!\n");
  320. Thread_UnLock();
  321. return false;
  322. }
  323. printf("========**** eBusService::Connect Thread Over 0X%08X \n", GetCurrentThreadId());
  324. m_OfflineHit = 0;
  325. m_TickCount = GetTickCount();
  326. m_ConnectionStatus = true;
  327. printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  328. Thread_UnLock();
  329. return true;
  330. }
  331. else if (Local == 0)
  332. {
  333. //Local:0
  334. //Ethernet Connection
  335. //RouterIp:ipaddress
  336. //vector<string> theIpList = GetZeroConfigIP(10, 1000);
  337. string Ip = Ipaddress;
  338. //if RouterIp is empty,eBus will find one .
  339. //if eBus found more than one,it will fail and return false.
  340. //eth connect only outside machine ip
  341. if (Ip.size() != 0 && IsLocalip(Ip) == false)
  342. {
  343. //if (Ip == "127.0.0.1" || FindRouterIpAddress(theIpList, Ip))
  344. {
  345. //we got routerIp,check org&new one
  346. //Ip = "192.168.2.77";
  347. m_Ipaddress = Ip.c_str();
  348. //if (IsLocalip(Ip))
  349. //{
  350. // m_Ipaddress = "";
  351. // m_TickCount = 0;
  352. // m_ConnectionStatus = false;
  353. // return false;
  354. //}
  355. //else
  356. {
  357. //Port:eBusDefault or selfdefine
  358. //srcBusId:BusId
  359. //BUSC::MessageClient::SetLogFileName("AlphaLog.log");
  360. //BUSC::MessageClient::Prepare();
  361. //m_pBlobReceiver->Prepare();
  362. //m_pBlobReceiver->SetLogFileName("AlphaLog_Eth.log");
  363. ResetEvent(m_ConnectEvt);
  364. m_pBlobReceiver->ConnectTo(m_Ipaddress);
  365. m_pBlobReceiver->SetSourceID(busId);
  366. //gaofei
  367. m_pBlobReceiver->OnMessage.Push(BUSCMD_BUSServerConnected, [this](const void *_, EventArgs_BUSMessage * __)
  368. {
  369. SetEvent(m_ConnectEvt);
  370. });
  371. m_pBlobReceiver->OnMessage.Push(CCOS_HW_CHANNEL, this, &eBusService::OnBUSMessage);
  372. m_pBlobReceiver->OnBLOB.Push(CCOS_HW_CHANNEL + 1, this, &eBusService::OnBlobMessage);
  373. m_pBlobReceiver->OnServerOnline.Push(this, &eBusService::OnlineMessage);
  374. m_pBlobReceiver->OnServerOffline.Push(this, &eBusService::OnOfflineMessage);
  375. m_pBlobReceiver->OnHeartbeat1Minute.Push(this, &eBusService::OnHeartBeat);
  376. m_pBlobReceiver->SetTcpNoDelay(true);
  377. m_pBlobReceiver->StartWait();
  378. if (WaitForSingleObject(m_ConnectEvt, 10000) == WAIT_TIMEOUT)
  379. {
  380. if (m_pBlobReceiver)
  381. {
  382. m_pBlobReceiver->StopWait();
  383. delete m_pBlobReceiver;
  384. m_pBlobReceiver = NULL;
  385. }
  386. printf("EBUS Connect Timeout.IP:%s!!!!!\n",(const char*)m_Ipaddress);
  387. Thread_UnLock();
  388. return false;
  389. }
  390. m_TickCount = GetTickCount();
  391. m_OfflineHit = 0;
  392. m_ConnectionStatus = true;
  393. printf("Connect Ebus Succeed.Local:%d,busId:%s,ServerIp:%s,Port:%s\n", m_Local, (const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  394. Thread_UnLock();
  395. return true;
  396. }
  397. }
  398. }
  399. }
  400. else
  401. {
  402. //do nothing
  403. }
  404. } while (0);
  405. }
  406. catch (ResDataObjectExption &exp)
  407. {
  408. //exp.what()
  409. Logger *p = GetGlobalLogger();
  410. PRINTA_DEBUG(p, exp.what());
  411. }
  412. catch (...)
  413. {
  414. Logger *p = GetGlobalLogger();
  415. PRINTA_DEBUG(p, "Unknown Exp Happened\n");
  416. }
  417. if (m_pBlobReceiver)
  418. {
  419. m_pBlobReceiver->StopWait();
  420. delete m_pBlobReceiver;
  421. m_pBlobReceiver = NULL;
  422. }
  423. m_TickCount =0;
  424. m_ConnectionStatus = false;
  425. Thread_UnLock();
  426. return false;
  427. }
  428. void eBusService::DisConnect()
  429. {
  430. //need to confirm multiple connection
  431. //BUSC::BLOBClient::Quit();
  432. Thread_Lock();
  433. if (m_pBlobReceiver)
  434. {
  435. m_pBlobReceiver->StopWait();
  436. delete m_pBlobReceiver;
  437. m_pBlobReceiver = NULL;
  438. }
  439. m_TickCount = 0;
  440. m_ConnectionStatus = false;
  441. if (m_pCTMap)
  442. {
  443. m_pCTMap->Disconnected();
  444. }
  445. printf("Disconnct Ebus.Local:%d,busId:%s,ServerIp:%s,Port:%s\n",m_Local,(const char*)m_busId, (const char*)m_Ipaddress, (const char*)m_Port);
  446. Thread_UnLock();
  447. }
  448. void eBusService::SetLogPath(const char *pPath)
  449. {
  450. //DString dstrPath = pPath;
  451. //BUSC::MessageClient::Prepare();
  452. //BUSC::MessageClient::SetLogFileName(dstrPath);
  453. //m_pBlobReceiver->Prepare();
  454. //m_pBlobReceiver->SetLogFileName(dstrPath);
  455. if (m_pBusLog == NULL)
  456. {
  457. m_pBusLog = CreateLogger();
  458. }
  459. if (m_pBusLog)
  460. {
  461. if (m_pBusLog->IsLogFilePathExist() == false)
  462. {
  463. m_pBusLog->SetLogFilepath(pPath);
  464. }
  465. }
  466. }
  467. void DebugPrintPacket(ResDataObject &packet)
  468. {
  469. string Context = packet.encode();
  470. printf("ResDataObject:%s\n", Context.c_str());
  471. }
  472. bool eBusService::SendSMPacket(const char *pTargetID, const char *pContext, unsigned long long nShareMemID, DWORD ChannelId)
  473. {
  474. bool ret = true;
  475. if (m_ConnectionStatus == false)
  476. {
  477. return false;
  478. }
  479. LPVOID pData = NULL;
  480. DWORD DataLen = 0;
  481. CShareMemory_Block smBlock;
  482. if (smBlock.ShareMemoryBlockSet((DWORD)nShareMemID, SM_READWRITE))
  483. {
  484. if (smBlock.GetBlockDataAddr(pData, DataLen))
  485. {
  486. ret = ((eBus*)m_Parent)->SendPacket(pTargetID, pContext, (const char *)pData, DataLen);
  487. //ret = SendPacket(pTargetID, pContext, pData, DataLen, ChannelId);
  488. if (ret)
  489. {
  490. m_OfflineHit = 0;
  491. }
  492. else
  493. {
  494. if (m_OfflineHit)
  495. {
  496. //Disconnected
  497. TPRINTA_ERROR("SendSMPacket Failed.offline callback Hited");
  498. DisConnect();
  499. }
  500. }
  501. }
  502. else
  503. {
  504. ret = false;
  505. TPRINTA_ERROR("CShareMemory_Block::GetBlockDataAddr Failed.");
  506. }
  507. }
  508. else
  509. {
  510. ret = false;
  511. TPRINTA_ERROR("CShareMemory_Block::ShareMemoryBlockSet Failed.");
  512. }
  513. return ret;
  514. }
  515. bool eBusService::SendPacket(const char *pTargetID, const char *pContext, DWORD ChannelId)
  516. {
  517. bool ret = false;
  518. TPRINTA_DEBUG("Send Msg Entry");
  519. if (m_ConnectionStatus == false)
  520. {
  521. return ret;
  522. }
  523. //for test
  524. //if (strcmp(pTargetID, "ccosAlpha") != 0)
  525. //{
  526. // ret = ret;
  527. //}
  528. DString dstrDestID = pTargetID;
  529. DString strMsg = pContext;
  530. //BUSC::MessageClient LocalClient;
  531. //LocalClient.ConnectTo(m_Ipaddress);
  532. //LocalClient.SetTcpNoDelay(true);
  533. //LocalClient.SetSourceID(m_busId);
  534. //PRINTA_DEBUG(m_Logger, "<eBusMessenger Send> %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024)));
  535. BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
  536. BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
  537. //if (m_Local == false)
  538. {
  539. if (Wrapper->IsConnected() == false)
  540. {
  541. pClient->ConnectTo(m_Ipaddress);
  542. pClient->SetTcpNoDelay(true);
  543. pClient->SetSourceID(m_busId);
  544. Wrapper->SetConnection(true);//suppose connected
  545. }
  546. }
  547. //if (Wrapper->IsConnected() == false)
  548. //{
  549. // pClient->SetSourceID(m_busId);
  550. //}
  551. /*
  552. //旧版 begin
  553. BUSC::BLOBClient client;
  554. if (m_Local == false)
  555. {
  556. client.ConnectTo(m_Ipaddress, m_Port);
  557. client.SetTcpNoDelay(true);
  558. }
  559. client.SetSourceID(m_busId);
  560. //旧版 end
  561. */
  562. //string strBase64;
  563. //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64);
  564. //return client.Send(dstrDestID, ChannelId, strBase64.data());
  565. SYSTEMTIME st = { 0 };
  566. #ifdef _DEBUG
  567. //GetLocalTime(&st);
  568. //printf("time-%02d:%02d:%02d:%03d,packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
  569. #endif
  570. //ret = client.Send(dstrDestID, ChannelId, strMsg.data());
  571. ret = pClient->Post(dstrDestID, ChannelId, strMsg);
  572. //ret = LocalClient.Post(dstrDestID, ChannelId, strMsg);
  573. TPRINTA_DEBUG("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  574. if (ret)
  575. {
  576. m_OfflineHit = 0;
  577. }
  578. else
  579. {
  580. TPRINTA_ERROR("From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  581. //try again
  582. Wrapper->SetConnection(false);
  583. pClient = Wrapper->GetBusClient();
  584. if (Wrapper->IsConnected() == false)
  585. {
  586. pClient->ConnectTo(m_Ipaddress);
  587. pClient->SetTcpNoDelay(true);
  588. pClient->SetSourceID(m_busId);
  589. Wrapper->SetConnection(true);//suppose connected
  590. }
  591. ret = pClient->Post(dstrDestID, ChannelId, strMsg);
  592. TPRINTA_DEBUG("Post Try Again: From %s,Send Msg To %s,Channel:%d,OffLine:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)dstrDestID, ChannelId, m_OfflineHit, ret);
  593. if (ret)
  594. {
  595. m_OfflineHit = 0;
  596. }
  597. else
  598. {
  599. if (m_OfflineHit)
  600. {
  601. //Disconnected
  602. DisConnect();
  603. }
  604. }
  605. }
  606. #ifdef _DEBUG
  607. //GetLocalTime(&st);
  608. //printf("time-%02d:%02d:%02d:%03d,after packet send result:%d\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
  609. #endif
  610. return ret;
  611. }
  612. bool eBusService::SendPacket(const char *pTargetID, const char *pContext, const char *pBlock, DWORD BlockSize, DWORD ChannelId)
  613. {
  614. bool ret = false;
  615. TPRINTA_DEBUG("Send BLOCK Entry");
  616. if (m_ConnectionStatus == false)
  617. {
  618. return ret;
  619. }
  620. DString dstrDestID = pTargetID;
  621. DString strMsg = pContext;
  622. //PRINTA_DEBUG(m_Logger, "<eBusMessenger Send> %s -> %s [%d]%s", m_pBlobReceiver->GetSourceID(), dstrDestID, Command, strMsg.substr(0, min(strMsg.length(), 1024)));
  623. BusClientWrapper *Wrapper = (*m_pCTMap)[GetCurrentThreadId()];
  624. BUSC::BLOBStreamClient* pClient = Wrapper->GetBusClient();
  625. //if (m_Local == false)
  626. {
  627. if (Wrapper->IsConnected() == false)
  628. {
  629. pClient->ConnectTo(m_Ipaddress);
  630. pClient->SetTcpNoDelay(true);
  631. pClient->SetSourceID(m_busId);
  632. Wrapper->SetConnection(true);//suppose connected
  633. }
  634. }
  635. //if (Wrapper->IsConnected() == false)
  636. //{
  637. // pClient->SetSourceID(m_busId);
  638. //}
  639. //Wrapper->SetConnection(true);//suppose connected
  640. //void * pData = vr->GetData();
  641. //int len = vr->GetLength();
  642. //BUSC::BLOBStreamClient Client;
  643. /*
  644. //旧版begin
  645. BUSC::BLOBClient client;
  646. if (m_Local == false)
  647. {
  648. client.ConnectTo(m_Ipaddress, m_Port);
  649. client.SetTcpNoDelay(true);
  650. }
  651. client.SetSourceID(m_busId);
  652. //旧版end
  653. */
  654. //string strBase64;
  655. //CBase64::Encode((const unsigned char *)strMsg.c_str(), strMsg.size(), strBase64);
  656. //BlockBuffer bb;
  657. //DString msgAck;
  658. //BlockBuffer bbAck;
  659. //bb.Attach((UINT8*)pBlock, BlockSize);//add data??
  660. //return client.SendBLOB(dstrDestID, ChannelId, strBase64.data(), bb, msgAck);//sync mode yes
  661. SYSTEMTIME st = { 0 };
  662. #ifdef _DEBUG
  663. //GetLocalTime(&st);
  664. //printf("time-%02d:%02d:%02d:%03d,block packet send to %s:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, pTargetID, pContext);
  665. #endif
  666. //ret = client.SendBLOB(dstrDestID, ChannelId, strMsg.data(), bb, msgAck);//sync mode yes
  667. //ret = client.PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb);
  668. //ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg.data(), bb);
  669. auto fun = [&](BUSC::BUSOutStream & stream)
  670. {
  671. stream.Write(pBlock, BlockSize);
  672. };
  673. ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
  674. TPRINTA_DEBUG("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  675. if (ret)
  676. {
  677. m_OfflineHit = 0;
  678. }
  679. else
  680. {
  681. TPRINTA_ERROR("From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  682. //try again
  683. Wrapper->SetConnection(false);
  684. pClient = Wrapper->GetBusClient();
  685. if (Wrapper->IsConnected() == false)
  686. {
  687. pClient->ConnectTo(m_Ipaddress);
  688. pClient->SetTcpNoDelay(true);
  689. pClient->SetSourceID(m_busId);
  690. Wrapper->SetConnection(true);//suppose connected
  691. }
  692. ret = pClient->PostBLOB(dstrDestID, ChannelId, strMsg, fun);
  693. TPRINTA_DEBUG("Post Tray Again:From %s,Send BLOCK To %s,Channel:%d,Offline:%d,Res:%d", (const char*)(pClient->GetSourceID()), (const char*)pTargetID, ChannelId, m_OfflineHit, ret);
  694. if (ret)
  695. {
  696. m_OfflineHit = 0;
  697. }
  698. else
  699. {
  700. if (m_OfflineHit)
  701. {
  702. //Disconnected
  703. DisConnect();
  704. }
  705. }
  706. }
  707. #ifdef _DEBUG
  708. //GetLocalTime(&st);
  709. //printf("time-%02d:%02d:%02d:%03d,after block packet send result:%d-------------\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, ret);
  710. #endif
  711. //bb.Detach();
  712. return ret;
  713. }
  714. void eBusService::OnHeartBeat(const void *sender, EventArgs_Null * arg)
  715. {
  716. //printf("OnlineMessage\n");
  717. if (sender)
  718. {
  719. //printf("OnlineMessage:%s\n", p);
  720. }
  721. m_OfflineHit = 0;
  722. m_TickCount = GetTickCount();
  723. }
  724. void eBusService::OnlineMessage(const void *sender, EventArgs_Null * arg)
  725. {
  726. if (arg != NULL)
  727. {
  728. printf("OnlineMessage\n");
  729. }
  730. //DisConnect();
  731. //SetEvent(m_ConnectEvt);
  732. //just tag it
  733. m_OfflineHit = 0;
  734. }
  735. void eBusService::OnOfflineMessage(const void *sender, EventArgs_Null * arg)
  736. {
  737. if (arg != NULL)
  738. {
  739. printf("OnOfflineMessage\n");
  740. }
  741. //DisConnect();
  742. //just tag it
  743. m_OfflineHit = 1;
  744. }
  745. void eBusService::Quit()
  746. {
  747. //BUSC::Client::Quit();
  748. if (m_pCTMap)
  749. {
  750. m_pCTMap->Clear();
  751. }
  752. }
  753. RET_STATUS eBusService::BufferToNotify(void * pOemImage, DWORD ImageSize, ImgDataInfo* PImgDataInfo)
  754. {
  755. RET_STATUS Ret = RET_SUCCEED;
  756. //for memory leak test
  757. //PImgDataInfo->nShareMemID = 1;
  758. //return Ret;
  759. const char *pSmName = Get_Circle_SM_Object(ImageSize);
  760. CShareMemory_Circle SM;
  761. if (SM.Open(pSmName, SM_READWRITE) >= 0)
  762. {
  763. ShareMemoryBlockID smid = SM.Push(pOemImage, ImageSize, true);
  764. if (smid != 0xffffffff)
  765. {
  766. PImgDataInfo->nShareMemID = smid;
  767. }
  768. else
  769. {
  770. //MessageBox(NULL, "Push failed", "Sys Lib", MB_OK);
  771. printf("\n Push failed \n");
  772. Ret = RET_FAILED;
  773. }
  774. SM.Close();
  775. }
  776. else
  777. {
  778. MessageBox(NULL, "Open failed", "Sys Lib", MB_OK);
  779. Ret = RET_FAILED;
  780. }
  781. return Ret;
  782. }
  783. void eBusService::BlobDataArrived(const char *pMsg, unsigned char *pBlockData, DWORD BlockDataLen)
  784. {
  785. SYSTEMTIME st = { 0 };
  786. #ifdef _DEBUG
  787. //GetLocalTime(&st);
  788. //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  789. //printf("time-%02d:%02d:%02d:%03d,Block packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
  790. #endif
  791. //-----got packet---------------
  792. ResDataObject packet;
  793. try {
  794. PRINTA_DEBUG(m_pBusLog, "BlobMsg:Got Packet.BlockLen is %d", BlockDataLen);
  795. PRINTA_TRACE(m_pBusLog, "Blob Packet Context:{\n%s}", pMsg);
  796. if (packet.decode(pMsg))
  797. {
  798. PRINTA_DEBUG(m_pBusLog, "BlobMsg:Decode Packet Succeed");
  799. if (m_Parent)
  800. {
  801. //check block
  802. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
  803. if (cmd == PACKET_CMD_DATA)
  804. {
  805. ImgDataInfo ImgData;
  806. ResDataObject resImg;
  807. PacketAnalizer::GetPacketContext(&packet, resImg);
  808. ImgData.SetResDataObject(resImg);
  809. //Need Block Data to Share Memory
  810. Thread_Lock();
  811. if (BufferToNotify(pBlockData, BlockDataLen, &ImgData) == RET_SUCCEED)
  812. {
  813. Thread_UnLock();
  814. //BufferToNotify ok
  815. //And Update the Id of memory
  816. PRINTA_DEBUG(m_pBusLog, "BlobMsg:BufferToNotify Succeed");
  817. resImg.clear();
  818. ImgData.GetResDataObject(resImg);
  819. if (PacketAnalizer::UpdatePacketContext(packet, resImg))
  820. {
  821. PRINTA_DEBUG(m_pBusLog, "BlobMsg:Dispatch packet to BusThread");
  822. ((eBus*)m_Parent)->PacketArrived(packet);
  823. }
  824. }
  825. else
  826. {
  827. Thread_UnLock();
  828. }
  829. }
  830. else
  831. {
  832. ((eBus*)m_Parent)->PacketArrived(packet);
  833. }
  834. }
  835. }
  836. else
  837. {
  838. PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed");
  839. }
  840. }
  841. catch (ResDataObjectExption &exp)
  842. {
  843. //exp.what()
  844. Logger *p = GetGlobalLogger();
  845. PRINTA_ERROR(p, exp.what());
  846. }
  847. catch (...)
  848. {
  849. Logger *p = GetGlobalLogger();
  850. PRINTA_ERROR(p, "Unknown Exp Happened\n");
  851. }
  852. }
  853. void eBusService::OnBlobMessage(const void *sender, EventArgs_BUSBLOB * arg)
  854. {
  855. EventArgs_BUSBLOB * e = (EventArgs_BUSBLOB*)(arg);
  856. if (!arg) return;
  857. //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  858. m_OfflineHit = 0;
  859. m_TickCount = GetTickCount();
  860. DWORD Command = arg->m_Command;
  861. DString SourceID = arg->m_SourceID;
  862. DString Message = arg->MsgAs <DString>();
  863. DWORD MessageID = arg->m_MessageID;//???
  864. if (Command == CCOS_HW_CHANNEL + 1)
  865. {
  866. UINT8 *pBlockData = (UINT8*)arg->m_BLOB;//block data buff
  867. DWORD BlockDataLen = arg->m_BLOB.GetCount();//block data len??
  868. string strMessage = Message.constBuffer();
  869. BlobDataArrived(strMessage.c_str(), pBlockData, BlockDataLen);
  870. }
  871. else
  872. {
  873. //put some log here
  874. }
  875. //return acks-------------------
  876. //message ack
  877. //e->m_AckMessage = "";
  878. //blob ack,for what??
  879. //BlockBuffer bb;
  880. //e->m_AckBLOB = bb;
  881. //Thread_UnLock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  882. }
  883. void eBusService::OnBUSMessage(const void *sender, EventArgs_BUSMessage * arg)
  884. {
  885. if (!arg) return;
  886. //Thread_Lock();这个可能带来风险,Disconnect的时候,eBus内部的Callback线程有崩溃情况
  887. m_OfflineHit = 0;
  888. m_TickCount = GetTickCount();
  889. DWORD Command = arg->m_Command;
  890. DString SourceID = arg->m_SourceID;
  891. DString Message = arg->MsgAs <DString>();
  892. DWORD MessageID = arg->m_MessageID;
  893. if (Command == BUSCMD_BUSServerConnected)
  894. {
  895. SetEvent(m_ConnectEvt);
  896. }
  897. else if (Command == CCOS_HW_CHANNEL)
  898. {
  899. string strMessage = Message.constBuffer();
  900. SYSTEMTIME st = { 0 };
  901. #ifdef _DEBUG
  902. //GetLocalTime(&st);
  903. //printf("time-%02d:%02d:%02d:%03d,packet arrived\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  904. //printf("time-%02d:%02d:%02d:%03d,packet arrived:%s\n", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, strMessage.c_str());
  905. #endif
  906. //CBase64::Decode(Message.constBuffer(), strMessage);
  907. //-----got packet---------------
  908. ResDataObject packet;
  909. try {
  910. PRINTA_DEBUG(m_pBusLog, "Got Msg Packet.TID:%d",GetCurrentThreadId());
  911. PRINTA_TRACE(m_pBusLog, "Packet Context:{\n%s}",strMessage.c_str());
  912. if (packet.decode(strMessage.c_str()))
  913. {
  914. PRINTA_DEBUG(m_pBusLog, "Decode Packet Succeed");
  915. if (m_Parent)
  916. {
  917. ((eBus*)m_Parent)->PacketArrived(packet);
  918. }
  919. }
  920. else
  921. {
  922. PRINTA_DEBUG(m_pBusLog, "Decode Packet Failed");
  923. }
  924. }
  925. catch (ResDataObjectExption &exp)
  926. {
  927. //exp.what()
  928. Logger *p = GetGlobalLogger();
  929. PRINTA_ERROR(p, "OnBUSMessage:\n");
  930. PRINTA_ERROR(p, exp.what());
  931. PRINTA_ERROR(p, strMessage.c_str());
  932. }
  933. catch (...)
  934. {
  935. Logger *p = GetGlobalLogger();
  936. PRINTA_ERROR(p, "OnBUSMessage:Unknown Exp Happened\n");
  937. PRINTA_ERROR(p, strMessage.c_str());
  938. }
  939. }
  940. //arg->m_AckMessage = "";
  941. //Thread_UnLock();这个可能带来风险,
  942. }
  943. void eBusService::UnRegistThread(DWORD Tid)
  944. {
  945. if (m_pCTMap)
  946. {
  947. (*m_pCTMap).UnRegistThread(Tid);
  948. }
  949. }