BusThread.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. // BusThread.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include "BusThread.h"
  4. //#include "Logger.h"
  5. //#include "CDInterface.h"
  6. //#include <iostream>
  7. //#include "PacketAnalizer.h"
  8. //#include "LogicClient.h"
  9. //#include "LocalConfig.h"
  10. //#include "DevBusManager.h"
  11. //#include "ShareMemWR.h"
  12. //#include "ClientManager.h"
  13. /*
  14. inline string CurrentDateTime()
  15. {
  16. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  17. char buf[100] = { 0 };
  18. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  19. return buf;
  20. }
  21. BusThread::BusThread(void)
  22. {
  23. m_bLocal = true;
  24. }
  25. BusThread::~BusThread(void)
  26. {
  27. }
  28. bool BusThread::ReceivedFromLocalBus(ResDataObject& packet)
  29. {
  30. bool ret = true;
  31. //mLog::FINFO("Packet from Local Bus {$} ", packet.encode());
  32. //correct it's direction
  33. try {
  34. PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_LOCAL);
  35. //do the normal dispatch
  36. ret = ReceivedFromBus(packet);
  37. }
  38. catch (...)
  39. {
  40. ret = false;
  41. }
  42. return ret;
  43. }
  44. bool BusThread::ReceivedFromEthBus(ResDataObject& packet)
  45. {
  46. bool ret = true;
  47. //mLog::FINFO( "Packet from Eth Bus {$}",packet.encode());
  48. //correct it's direction
  49. try {
  50. PacketAnalizer::UpdatePacketRoute(packet, CCOS_PACKET_ROUTE_ETH);
  51. //do the normal dispatch
  52. ret = ReceivedFromBus(packet);
  53. }
  54. catch (...)
  55. {
  56. ret = false;
  57. }
  58. return ret;
  59. }
  60. //1.command comes from client
  61. //2.dispatch command comes from device
  62. bool BusThread::ReceivedFromBus(ResDataObject& packet)
  63. {
  64. bool ret = true;
  65. //RES_//mLog::FINFO(packet,"Packet from Bus");
  66. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
  67. PACKET_TYPE type = PacketAnalizer::GetPacketType(&packet);
  68. if (PACKET_TYPE_REQ == type)
  69. {
  70. switch (cmd) {
  71. case PACKET_CMD_OPEN:
  72. case PACKET_CMD_CLOSE:
  73. ret = HandleOpenClose(packet, cmd);
  74. break;
  75. case PACKET_CMD_GET:
  76. case PACKET_CMD_ADD:
  77. case PACKET_CMD_DEL:
  78. case PACKET_CMD_UPDATE://更新对象
  79. case PACKET_CMD_EXE:
  80. case PACKET_CMD_DATA:
  81. case PACKET_CMD_MSG:
  82. case PACKET_CMD_ONLINE://设备掉线后重新注册到Root
  83. case PACKET_CMD_PART_UPDATE://只更新对象的子节点
  84. ret = HandleOthers(packet, cmd);
  85. break;
  86. }
  87. }
  88. return ret;
  89. }
  90. CMD_ECHO DoSendPacket(ResDataObject& packet, bool Local, bool Block, string& TargetbusId)
  91. {
  92. //do send
  93. //block or packet
  94. if (Block)
  95. {
  96. //get context
  97. ResDataObject resImg;
  98. ImgDataInfo ImgData;
  99. string MachineId;
  100. if (PacketAnalizer::GetDestinationMachineId(packet, MachineId) == false)
  101. {
  102. return CMD_ECHO_IGNORE;
  103. }
  104. printf("Got Req of PACKET_CMD_DATA!!!---------------------\n");
  105. if (MachineId != (const char*)getLocalMachineId())
  106. {
  107. PacketAnalizer::GetPacketContext(&packet, resImg);
  108. ImgData.SetResDataObject(resImg);
  109. printf("Going to send SMPacket!!!---------------------\n");
  110. //target is not in this machine
  111. if (BusSendSMPacket(packet, Local, TargetbusId, (ImgData.nShareMemID)) == false)
  112. {
  113. //failed to send
  114. return CMD_ECHO_NOTARGET;
  115. }
  116. return CMD_ECHO_OK;
  117. }
  118. //same machineId,just send it as normal packet
  119. }
  120. if (BusSendPacket(packet, Local, TargetbusId) == false)
  121. {
  122. //failed to send
  123. return CMD_ECHO_NOTARGET;
  124. }
  125. return CMD_ECHO_OK;
  126. }
  127. bool BusThread::SendCcosPacket(ResDataObject& packet)
  128. {
  129. //std::cout << CurrentDateTime << " BusThread::SendCcosPacket Send Packet " << packet.encode() << endl;
  130. CCOS_PACKET_ROUTE Route;
  131. PacketAnalizer::GetPacketRoute(&packet, Route);
  132. bool sendtoLocalMachine = false;
  133. bool sendBlockData = false;
  134. //packet cmd
  135. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&packet);
  136. if (cmd == PACKET_CMD_DATA)
  137. {
  138. sendBlockData = true;
  139. }
  140. string DesBusId;
  141. if (!PacketAnalizer::GetDestinationBusId(packet, DesBusId))
  142. {
  143. return false;
  144. }
  145. //Precaculate direction
  146. if (Route == CCOS_PACKET_ROUTE_ANY)
  147. {
  148. //choose one
  149. Route = g_BusManager.GetBusIdDirection(DesBusId);
  150. }
  151. //caculate again
  152. if (Route == CCOS_PACKET_ROUTE_LOCAL)
  153. {
  154. //target in this machine
  155. sendtoLocalMachine = true;
  156. }
  157. else if (Route == CCOS_PACKET_ROUTE_ETH)
  158. {
  159. //target is not in this machine
  160. }
  161. else if (Route == CCOS_PACKET_ROUTE_ANY)
  162. {
  163. //if still any direction,then try local and try eth
  164. //local first
  165. std::cout << "Send local first...... to " << DesBusId << endl;
  166. if (DoSendPacket(packet, true, sendBlockData, DesBusId) == CMD_ECHO_OK)
  167. {
  168. return CMD_ECHO_OK;
  169. }
  170. std::cout << "Send Eth......to " << DesBusId << endl;
  171. //try eth
  172. return DoSendPacket(packet, false, sendBlockData, DesBusId);
  173. }
  174. else
  175. {
  176. //ignore
  177. return CMD_ECHO_IGNORE;
  178. }
  179. //do send
  180. std::cout << "Send ?????.....to " << DesBusId << " ? local " << sendtoLocalMachine << endl;
  181. return DoSendPacket(packet, sendtoLocalMachine, sendBlockData, DesBusId);
  182. }
  183. static string ProcessKey(const char* pszKey) {
  184. char szKeys[256];
  185. strcpy(szKeys, pszKey);
  186. char* pt = szKeys;
  187. while (*pt != 0)
  188. {
  189. if (*pt == '/' || *pt == '{' || *pt == '}' || *pt == '-')
  190. *pt = '_';
  191. pt++;
  192. }
  193. return szKeys;
  194. }
  195. bool BusThread::HandleOpenClose(ResDataObject& packet, PACKET_CMD cmd)
  196. {
  197. // stringstream stream;
  198. //stream << "LogicClient_" << szClientName << "_" << GetCurrentProcessId();
  199. //m_strClientName = stream.str();
  200. if (cmd == PACKET_CMD_OPEN)
  201. {
  202. char szClientID[512];
  203. UINT64 cltid = 0, addr = 0;
  204. string openKey = PacketAnalizer::GetPacketKey(&packet);
  205. //if(openKey.length() > 48)
  206. // openKey.erase(48,openKey.length()-48);
  207. string findkey = ProcessKey(openKey.c_str());
  208. PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false);
  209. PacketAnalizer::GetPacketHandleAddr(&packet, addr, false);
  210. LogicClient* pClient = g_ClientManager.GetClient(cltid, addr, findkey.c_str());
  211. sprintf_s(szClientID, "BUSCLT_%d_%d_%d_%s", cltid, addr, GetCurrentProcessId(), findkey.c_str());
  212. std::cout << CurrentDateTime << "Get Open Request From Ebus Open Key " << openKey << endl;
  213. std::cout << "Use Client id to connect mqtt " << szClientID << " gened by findkey " << findkey << endl;
  214. ResDataObject ebusHandle;
  215. PacketAnalizer::GetPacketHandle(&packet, ebusHandle);
  216. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  217. pHandle->SetResDataObject(ebusHandle);
  218. if (pClient != nullptr)
  219. {
  220. std::cout << "Connect seconed times .Reuse Connection ......." << endl;
  221. ////直接复用连接,考虑过一段时间重新关闭打开
  222. //std::cout << szClientID << " Had alread opend reuse it" << endl;
  223. //ResDataObject resRes, resResponse;
  224. //pClient->GetDeviceResource(&resRes);
  225. //PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes);
  226. //PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
  227. ////ResDataObject resTopic;
  228. ////PacketAnalizer::GetContextTopic(pCmd, resTopic);
  229. //std::cout << "Transfer packet org " << packet.encode() << endl;
  230. //std::cout << "Dest packet : " << resResponse.encode() << endl;
  231. //SendCcosPacket(resResponse);
  232. //return true;
  233. pClient->Close();
  234. }
  235. else
  236. {
  237. std::cout << "Connect First times .crate Connection ......." << endl;
  238. pClient = new LogicClient(szClientID, ("EBUS"+(string)getRootpath()).c_str());
  239. }
  240. std::cout << "" << CurrentDateTime() << "EUBS-CLIENT Set LogicClient Ebus handle [" << pClient->GetClientName() << "] Handle " << ebusHandle.encode() << endl;
  241. pClient->SetEbusFileHandle(pHandle);
  242. if (pClient->Open(openKey.c_str(), ALL_ACCESS, 1002) == RET_SUCCEED)
  243. {
  244. std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Succeed.." << endl;
  245. //把客户端对象存进去
  246. g_ClientManager.RegistClient((UINT64)pClient);
  247. g_ClientManager.RegistClient(cltid, addr, findkey.c_str(), pClient);
  248. //PacketAnalizer::UpdateOpenRequest(packet, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)pClient);
  249. //CcosDevFileHandle handle;
  250. //ResDataObject handleRes;
  251. //PacketAnalizer::GetPacketHandle(&packet, handleRes);
  252. //handle.SetResDataObject(handleRes);
  253. ResDataObject resRes, resResponse;
  254. pClient->GetDeviceResource(&resRes);
  255. PacketAnalizer::MakeOpenResponse(packet, resResponse, resRes);
  256. PacketAnalizer::UpdateOpenRequest(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
  257. PacketAnalizer::UpdateHandleId(&resResponse,(UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去
  258. //ResDataObject resTopic;
  259. //PacketAnalizer::GetContextTopic(pCmd, resTopic);
  260. std::cout << "" << CurrentDateTime() << " Open Success .. Transfer packet org " << endl;//<< packet.encode()
  261. //std::cout << "Dest packet : " << resResponse.encode() << endl;
  262. SendCcosPacket(resResponse);
  263. std::cout << "###################### " << pClient->GetClientName() << " Make Use Point as HandleID " << (UINT64)pClient << endl;
  264. pClient->SetPacketArrivedCallback([this, pClient](ResDataObject* res, const char* topic, void* conn_void) {
  265. ccos_mqtt_connection* conn = (ccos_mqtt_connection*)conn_void;
  266. //std::cout << "Send back to Ebus packet " << res->encode() << endl;
  267. UINT64 addr, proc;
  268. addr = proc = 0;
  269. PacketAnalizer::GetPacketHandleAddr(res, addr, false);
  270. PacketAnalizer::GetPacketHandleProcId(res, proc, false);
  271. //g_ClientManager.GetClient((UINT32)proc, (UINT32)addr, )
  272. if (addr == 0 && proc == 0)
  273. {
  274. //这里是Notify
  275. //ResDataObject resEbusHandle;
  276. CcosDevFileHandle* pHandle = pClient->GetEbusFileHanle();
  277. //pHandle->GetResDataObject(resEbusHandle);
  278. PacketAnalizer::UpdateNotifyHandle(*res, *pHandle);
  279. PacketAnalizer::UpdateOpenRequest(*res, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pClient);
  280. PacketAnalizer::UpdateHandleId(res, (UINT64)pClient); //把LogicClient 的对象指针作为HnaleID 传回去
  281. std::cout << "###################### " << pClient->GetClientName() << " Used Point as HandleID" << (UINT64)pClient << endl;
  282. }
  283. SendCcosPacket(*res);
  284. });
  285. }
  286. else {
  287. std::cout << "" << CurrentDateTime() << szClientID << " EUBS-CLIENT Open Mqtt Server Failed.." << endl;
  288. return false;
  289. }
  290. }
  291. else if (cmd == PACKET_CMD_CLOSE)
  292. {
  293. UINT64 conn;
  294. UINT64 cltid = 0, addr = 0;
  295. PacketAnalizer::GetPacketHandleAddr(&packet, conn, true);
  296. PacketAnalizer::GetPacketHandleProcId(&packet, cltid, false);
  297. PacketAnalizer::GetPacketHandleAddr(&packet, addr, false);
  298. //要判断下指针是否我分配过
  299. if (g_ClientManager.FindClient(conn))
  300. {
  301. g_ClientManager.UnRegistClient(conn);
  302. LogicClient* pClient = (LogicClient*)conn;
  303. delete pClient;
  304. }
  305. //CloseConnection((ccos_mqtt_connection*)conn);
  306. }
  307. return true;
  308. }
  309. bool BusThread::HandleOthers(ResDataObject& packet, PACKET_CMD cmd)
  310. {
  311. std::cout << "BusThread::HandleOthers " << cmd << endl;
  312. UINT64 conn = 0;
  313. //PacketAnalizer::GetPacketHandleAddr(&packet, conn, true);
  314. PacketAnalizer::GetHandleId(&packet, conn);
  315. string key;
  316. ResDataObject resParam, resHandle;
  317. //要判断下指针是否我分配过
  318. if (g_ClientManager.FindClient(conn))
  319. {
  320. LogicClient* pClient = (LogicClient*)conn;
  321. switch (cmd) {
  322. case PACKET_CMD_GET:
  323. break;
  324. case PACKET_CMD_ADD:
  325. case PACKET_CMD_DEL:
  326. break;
  327. case PACKET_CMD_UPDATE://更新对象
  328. break;
  329. case PACKET_CMD_EXE:
  330. //std::cout << pClient->get
  331. //todo 将Owner透传过去
  332. pClient->Action_Trans(&packet);
  333. break;
  334. case PACKET_CMD_DATA:
  335. case PACKET_CMD_MSG:
  336. break;
  337. case PACKET_CMD_ONLINE://设备掉线后重新注册到Root
  338. case PACKET_CMD_PART_UPDATE://只更新对象的子节点
  339. break;
  340. }
  341. }
  342. return false;
  343. }
  344. //work
  345. bool BusThread::Exec()
  346. {
  347. while (WaitForInQue())
  348. {
  349. bool status = true;
  350. //pump In&Out que
  351. while (status)
  352. {
  353. ResDataObject Packet;
  354. status = false;
  355. //check req first
  356. if (PopReqDataObject(Packet))
  357. {
  358. status = true;
  359. //send it to cdi
  360. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&Packet);
  361. PACKET_TYPE type = PacketAnalizer::GetPacketType(& Packet);
  362. string key = PacketAnalizer::GetPacketKey(&Packet);
  363. std::cout << "------------ From Ebus type: [" << type << "] cmd : [" << cmd << "] key [" << key << "]" << endl;
  364. //CDInterface *p = CDInterface::GetCDI();
  365. if (m_bLocal)
  366. {
  367. ReceivedFromLocalBus(Packet);
  368. }
  369. else
  370. {
  371. ReceivedFromEthBus(Packet);
  372. }
  373. //p->ReceivedFromBus(Packet);
  374. }
  375. }
  376. }
  377. return true;
  378. }
  379. bool BusThread::OnStartThread(void)
  380. {
  381. printf("========== BusThread::OnStartThread 0X%08X \n", GetCurrentThreadId());
  382. #ifndef TEMP_COMP
  383. //mLog::FINFO("Bus Thread Begin Entry.....");
  384. #endif
  385. return true;
  386. }
  387. bool BusThread::OnEndThread(void)
  388. {
  389. printf("========== BusThread::OnEndThread 0X%08X \n", GetCurrentThreadId());
  390. #ifndef TEMP_COMP
  391. Logger *pLog = (Logger*)m_pThreadLogger;
  392. //mLog::FINFO( "Bus Thread End Exit.....");
  393. #endif
  394. return true;
  395. }
  396. */