SysIF.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. #include "StdAfx.h"
  2. #include "SysIF.h"
  3. #include "CDInterface.h"
  4. #include "CcosLock.h"
  5. #include "LocalConfig.h"
  6. #include "CcosFileHandle.h"
  7. #include "PacketAnalizer.h"
  8. #include "Logger.h"
  9. #include "mqtt/async_client.h"
  10. SysIF::SysIF(void)
  11. {
  12. m_Lock = (HANDLE)new CcosLock();
  13. m_InternalLock = (HANDLE)new CcosLock();
  14. }
  15. SysIF::~SysIF(void)
  16. {
  17. delete (CcosLock*)m_Lock;
  18. m_Lock = NULL;
  19. delete (CcosLock*)m_InternalLock;
  20. m_InternalLock = NULL;
  21. }
  22. bool SysIF::Lock(DWORD timeout)
  23. {
  24. if (((CcosLock*)m_Lock)->Thread_Lock(timeout) == WAIT_OBJECT_0)
  25. {
  26. return true;
  27. }
  28. return false;
  29. }
  30. void SysIF::UnLock()
  31. {
  32. ((CcosLock*)m_Lock)->Thread_UnLock();
  33. }
  34. bool SysIF::InternalLock(DWORD timeout)
  35. {
  36. if (((CcosLock*)m_InternalLock)->Thread_Lock(timeout) == WAIT_OBJECT_0)
  37. {
  38. return true;
  39. }
  40. return false;
  41. }
  42. void SysIF::InternalUnLock()
  43. {
  44. ((CcosLock*)m_InternalLock)->Thread_UnLock();
  45. }
  46. RET_STATUS HW_ACTION SysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  47. {
  48. assert(0);//supposed not call in
  49. return RET_NOSUPPORT;
  50. }
  51. /*
  52. //-----------------------------Client SYSIF----------------------------------------
  53. ClientSysIF::ClientSysIF(void)
  54. {
  55. m_pRequestList = new MsgVector<DWORD>();
  56. }
  57. ClientSysIF::~ClientSysIF(void)
  58. {
  59. delete m_pRequestList;
  60. }
  61. void ClientSysIF::RegistClientToCDI()
  62. {
  63. CDInterface *pcdi = CDInterface::GetCDI();
  64. if (pcdi)
  65. {
  66. LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
  67. //pcdi->RegistClient((UINT64)pSys);
  68. }
  69. }
  70. void ClientSysIF::UnRegistClientFromCDI()
  71. {
  72. CDInterface *pcdi = CDInterface::GetCDI();
  73. if (pcdi)
  74. {
  75. LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
  76. //pcdi->UnRegistClient((UINT64)pSys);
  77. }
  78. }
  79. RET_STATUS HW_ACTION ClientSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  80. {
  81. //kick notify&response
  82. PACKET_TYPE Type = PacketAnalizer::GetPacketType(pCmd);
  83. if (Type != PACKET_TYPE_REQ)
  84. {
  85. return RET_FAILED;
  86. }
  87. //regist packet as req
  88. DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd);
  89. InternalLock();
  90. m_pRequestList->PushBack(Idx);
  91. //GPRINTA_DEBUG("FromClient Idx: %d", Idx);
  92. InternalUnLock();
  93. //find CDI
  94. CDInterface *pcdi = CDInterface::GetCDI();
  95. if (pcdi)
  96. {
  97. //if (pcdi->ReceivedFromClient(*pCmd))
  98. {
  99. return RET_SUCCEED;
  100. }
  101. }
  102. #
  103. return RET_FAILED;
  104. }
  105. //notify to lower layer
  106. RET_STATUS ClientSysIF::CmdToLogicDev(ResDataObject *pCmd)
  107. {
  108. if (m_pLogicDev)
  109. {
  110. PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd);
  111. if (type1 == PACKET_TYPE_RES)
  112. {
  113. DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd);
  114. InternalLock();
  115. if (m_pRequestList->FindAndClear(Idx) == false)
  116. {
  117. //some log here
  118. InternalUnLock();
  119. return RET_FAILED;
  120. }
  121. InternalUnLock();
  122. //GPRINTA_DEBUG("ToClient Idx: %d", Idx);
  123. }
  124. else if (type1 == PACKET_TYPE_REQ)
  125. {
  126. //some log here
  127. //GPRINTA_DEBUG("ToClient Idx is REQ");
  128. return RET_FAILED;
  129. }
  130. //GPRINTA_DEBUG("ToClient Cmd OK");
  131. return m_pLogicDev->CmdToLogicDev(pCmd);
  132. }
  133. return RET_NOSUPPORT;
  134. }
  135. bool ClientSysIF::IsAllRequestFinished()
  136. {
  137. InternalLock();
  138. bool ret = (m_pRequestList->Size() == 0);
  139. InternalUnLock();
  140. return ret;
  141. }
  142. void ClientSysIF::Clear()
  143. {
  144. InternalLock();
  145. m_pRequestList->Clear();
  146. InternalUnLock();
  147. }
  148. */
  149. //-----------------------------Server SYSIF----------------------------------------
  150. ServerSysIF::ServerSysIF(void)
  151. {
  152. m_pDeviceReqThread = NULL;
  153. m_pDeviceResThread = NULL;
  154. }
  155. ServerSysIF::~ServerSysIF(void)
  156. {
  157. }
  158. RET_STATUS HW_ACTION ServerSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  159. {
  160. //all notify
  161. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  162. if (type == PACKET_TYPE_NOTIFY)
  163. {
  164. LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
  165. //fill up the device info
  166. if (PacketAnalizer::GetPacketHandleExistance(pCmd) == false)
  167. {
  168. if (PacketAnalizer::UpdateDeviceNotifyResponse((*pCmd),
  169. getLocalMachineId(),
  170. getLocalEbusId(),
  171. (UINT64)GetCurrentProcessId(),
  172. (UINT64)pSys) == false)
  173. {
  174. return RET_FAILED;
  175. }
  176. }
  177. //send Packet
  178. if (m_pDeviceResThread)
  179. {
  180. if (m_pDeviceResThread->PushResDataObject(*pCmd))
  181. {
  182. return RET_SUCCEED;
  183. }
  184. }
  185. /*
  186. if (m_pDeviceThread)
  187. {
  188. if (m_pDeviceThread->PushResDataObject(*pCmd))
  189. {
  190. return RET_SUCCEED;
  191. }
  192. }*/
  193. }
  194. return RET_FAILED;
  195. }
  196. //notify to lower layer
  197. RET_STATUS ServerSysIF::CmdToLogicDev(ResDataObject *pCmd)
  198. {
  199. if (m_pLogicDev)
  200. {
  201. //do not use CmdToLogicDev,it's use less for the device
  202. //only act request packet
  203. PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd);
  204. if (type1 == PACKET_TYPE_REQ)
  205. {
  206. //do request
  207. //ResDataObject response;
  208. //RET_STATUS ret = m_pLogicDev->Request(pCmd, &response);
  209. //send Packet
  210. if (m_pDeviceReqThread->PushReqDataObject(*pCmd))
  211. {
  212. return RET_SUCCEED;
  213. }
  214. /*
  215. if (m_pDeviceThread->PushReqDataObject(*pCmd))
  216. {
  217. return RET_SUCCEED;
  218. }*/
  219. }
  220. }
  221. return RET_FAILED;
  222. }
  223. /*
  224. void ServerSysIF::SetWorkThread(Dual_Driver_Thread *p)
  225. {
  226. m_pDeviceThread = p;
  227. }*/
  228. void ServerSysIF::SetWorkThread(Work_Thread* pDeviceReqThread, Work_Thread* pDeviceResThread)
  229. {
  230. m_pDeviceReqThread = pDeviceReqThread;
  231. m_pDeviceResThread = pDeviceResThread;
  232. }
  233. /*
  234. string SERVER_ADDRESS;
  235. //创建额外连接,需要提供回调函数
  236. SYSIF_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  237. {
  238. //Logger* pDevLog = GetLogHandle();
  239. std::cout << "=======Driver Module Init ============" << std::endl;
  240. //PRINTA_INFO(pDevLog, "=======Driver Module Init ============");
  241. //连接MQTT broker
  242. // 开启接受通知 线程
  243. mqtt::async_client *pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID);
  244. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  245. std::get<0>(*connection) = pMqttClient;
  246. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  247. std::get<0>(*pfilter) = nullptr;
  248. std::get<2>(*connection) = pfilter;
  249. mqtt::connect_options connOpts;
  250. connOpts.set_clean_session(false);
  251. connOpts.set_automatic_reconnect(true);
  252. // Install the callback(s) before connecting.
  253. //callback *pCallback = new callback(*pMqttClient, connOpts, onmsg);
  254. //pMqttClient->set_callback(*pCallback);
  255. auto func = [onmsg,&connection] (mqtt::const_message_ptr msg) -> void {
  256. ResDataObject req;
  257. req.decode(msg->get_payload_str().c_str());
  258. string topic = msg->get_topic().c_str();
  259. //取消息钩子
  260. const void* pHook = std::get<2>(*connection);
  261. if (pHook != nullptr) {
  262. ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
  263. ccos_mqtt_msg_filter_func func = std::get<0>(*filter);
  264. //消息钩子函数存在
  265. if (func != nullptr) {
  266. ResDataObject* resObj = std::get<2>(*filter);
  267. HANDLE hEvent = std::get<1>(*filter);
  268. if (func(&req)) {
  269. //勾住了,是该钩子的消息,则通知
  270. PacketAnalizer::GetPacketContext(&req, *resObj);
  271. SetEvent(hEvent);
  272. return ;
  273. }
  274. }
  275. }
  276. onmsg(&req, topic.c_str());
  277. };
  278. std::get<1>(*connection) = &func;
  279. pMqttClient->set_message_callback( func );
  280. //std::function<void(const string& cause)>
  281. pMqttClient->set_connected_handler([](const string& cause) {
  282. std::cout << "Connecting to the MQTT server Succeeded.." << endl;
  283. });
  284. //std::function<void(const properties&, ReasonCode)>
  285. pMqttClient->set_disconnected_handler([](const mqtt::properties&, mqtt::ReasonCode) {
  286. std::cout << "MQTT Server Connection lost...." << endl;
  287. });
  288. // Start the connection.
  289. // When completed, the callback will subscribe to topic.
  290. std::cout << "=======Connecting to the MQTT ============" << std::endl;
  291. try {
  292. std::cout << "Connecting to the MQTT server..." << std::flush;
  293. //PRINTA_INFO(pDevLog, "Connecting to the MQTT server...");
  294. //
  295. pMqttClient->connect(connOpts);
  296. return connection;
  297. }
  298. catch (const mqtt::exception& exc) {
  299. //PRINTA_ERROR(pDevLog, " Unable to connect to MQTT server: %s ", SERVER_ADDRESS);
  300. std::cerr << "\nERROR: Unable to connect to MQTT server: '"
  301. << SERVER_ADDRESS << "'" << exc << std::endl;
  302. }
  303. delete pfilter;
  304. delete connection;
  305. delete pMqttClient;
  306. return nullptr;
  307. }
  308. //关闭并释放连接
  309. SYSIF_API void CloseConnection(ccos_mqtt_connection* hConnection)
  310. {
  311. if (hConnection == nullptr)
  312. {
  313. return;
  314. }
  315. mqtt::async_client* pconn = (mqtt::async_client *)std::get<0>(*hConnection);
  316. if (pconn != nullptr) {
  317. pconn->disconnect();
  318. delete pconn;
  319. }
  320. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection);
  321. if(pfilter != nullptr)
  322. delete pfilter;
  323. delete hConnection;
  324. }
  325. //主动订阅主题
  326. SYSIF_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare )
  327. {
  328. if (hConnection == nullptr)
  329. {
  330. return 0;
  331. }
  332. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  333. if (!pMqttClient->is_connected()) {
  334. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  335. return 0;
  336. }
  337. mqtt::subscribe_options opts;
  338. pMqttClient->subscribe(pszTopic, 1, opts);
  339. return 0;
  340. }
  341. //主题订阅取消
  342. SYSIF_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  343. {
  344. if (hConnection == nullptr)
  345. {
  346. return 0;
  347. }
  348. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  349. if (!pMqttClient->is_connected()) {
  350. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  351. return 0;
  352. }
  353. mqtt::subscribe_options opts;
  354. auto ret = pMqttClient->unsubscribe(pszTopic);
  355. return 2;
  356. }
  357. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  358. SYSIF_API int PublishMsg(ResDataObject *pCmd, const char* pszTopic, const char* pszSenderName)
  359. {
  360. char pszClientID[256];
  361. sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X",pszSenderName==nullptr?"ANONYMOUS":pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
  362. ccos_mqtt_connection *connObj = NewConnection(pszClientID, []( ResDataObject*, const char*) {
  363. });
  364. mqtt::async_client* pConn = (mqtt::async_client*)std::get<0>(*connObj);
  365. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  366. pConn->publish(pszTopic, pCmd->encode());
  367. CloseConnection(connObj);
  368. return 0;
  369. }
  370. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  371. SYSIF_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection )
  372. {
  373. if (hConnection == nullptr)
  374. {
  375. return 0;
  376. }
  377. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  378. if (!pMqttClient->is_connected()) {
  379. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  380. return 0;
  381. }
  382. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, pMqttClient->get_client_id().c_str());
  383. pMqttClient->publish(pszTopic, pAction->encode());
  384. return 2;
  385. }
  386. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  387. SYSIF_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection )
  388. {
  389. if (hConnection == nullptr)
  390. {
  391. return 0;
  392. }
  393. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  394. if (!pMqttClient->is_connected()) {
  395. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  396. return 0;
  397. }
  398. ResDataObject req;
  399. //TODO 这里应该添加 组包逻辑,待完善
  400. //PacketAnalizer::MakeActionRequest(req, );
  401. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
  402. pMqttClient->publish(pszTopic, req.encode());
  403. return 2;
  404. }
  405. /// <summary>
  406. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  407. /// 超时没收到应答返回失败,
  408. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  409. /// </summary>
  410. /// <param name="pAction">要发送的命令Action名</param>
  411. /// <param name="pContext">命令携带的参数</param>
  412. /// <param name="pszTopic">要发送的目标topic</param>
  413. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  414. /// <param name="resObj">应答返回的参数结果</param>
  415. /// <param name="dwWaitTime">等待超时时间</param>
  416. /// <param name="hConnection">复用的MQTT链接句柄</param>
  417. /// <param name="onmsg">复用的链接的消息处理函数</param>
  418. /// <returns>成功返回2,其他返回错误码</returns>
  419. SYSIF_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection,const char* pAction,ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  420. ResDataObject& resObj, DWORD dwWaitTime)
  421. {
  422. if (pszRespTopic != nullptr)
  423. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  424. if (hConnection == nullptr)
  425. {
  426. return 0;
  427. }
  428. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  429. if (!pMqttClient->is_connected()) {
  430. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  431. return 0;
  432. }
  433. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  434. //SubscribeTopic()
  435. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](mqtt::const_message_ptr msg) {
  436. string topic = msg->get_topic().c_str();
  437. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  438. //应答到了,处理
  439. ResDataObject req;
  440. req.decode(msg->get_payload_str().c_str());
  441. PacketAnalizer::GetPacketContext(&req, resObj);
  442. SetEvent(hEvent);
  443. //处理结束 将函数指针换回去
  444. auto oldFuncPointer = std::get<1>(*hConnection);
  445. pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  446. }
  447. else
  448. {
  449. mqtt::async_client::message_handler orgfunc = *(mqtt::async_client::message_handler*)std::get<1>(*hConnection);
  450. orgfunc(msg);
  451. }
  452. };
  453. pMqttClient->set_message_callback(action_func);
  454. pMqttClient->subscribe(pszRespTopic, 1);
  455. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
  456. pMqttClient->publish(pszTopic, req.encode());
  457. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  458. if (ret == WAIT_OBJECT_0) {
  459. //等到应答了
  460. return 2;
  461. }
  462. return 0;
  463. }
  464. /// <summary>
  465. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  466. /// </summary>
  467. /// <param name="hConnection"></param>
  468. /// <param name="pAction"></param>
  469. /// <param name="pContext"></param>
  470. /// <param name="pszTopic"></param>
  471. /// <param name="resObj"></param>
  472. /// <param name="hEvent"></param>
  473. /// <param name="dwWaitTime"></param>
  474. /// <returns></returns>
  475. SYSIF_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection,const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime )
  476. {
  477. if (hConnection == nullptr)
  478. {
  479. return 0;
  480. }
  481. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  482. if (!pMqttClient->is_connected()) {
  483. //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  484. return 0;
  485. }
  486. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  487. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection);
  488. std::get<1>(*pfilter) = hEvent;
  489. std::get<2>(*pfilter) = &resObj;
  490. std::get<0>(*pfilter) = [pAction]( ResDataObject* rsp) -> bool {
  491. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  492. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  493. return true;
  494. return false;
  495. };
  496. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
  497. pMqttClient->publish(pszTopic, req.encode());
  498. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  499. if (ret == WAIT_OBJECT_0) {
  500. //等到应答了
  501. return 2;
  502. }
  503. return 0;
  504. }
  505. /// <summary>
  506. /// 新建MQTT连接发送Ation并等待应答
  507. /// </summary>
  508. /// <param name="pAction"></param>
  509. /// <param name="pContext"></param>
  510. /// <param name="pszTopic"></param>
  511. /// <param name="pszRespTopic"></param>
  512. /// <param name="resObj"></param>
  513. /// <param name="dwWaitTime"></param>
  514. /// <param name="pszSenderName"></param>
  515. /// <returns></returns>
  516. SYSIF_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  517. DWORD dwWaitTime, const char* pszSenderName)
  518. {
  519. ResDataObject req;
  520. if (pszRespTopic != nullptr)
  521. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  522. //临时创建连接并发送和接收应答
  523. char pszClientID[256];
  524. sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
  525. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  526. if(hEvent == NULL)
  527. return 0;
  528. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic) {
  529. //应答到了,处理
  530. PacketAnalizer::GetPacketContext(req, resObj);
  531. SetEvent(hEvent);
  532. });
  533. //发布消息,并等待应答
  534. PublishAction(&req, pszTopic, connObj);
  535. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  536. if (ret == WAIT_OBJECT_0) {
  537. //等到应答了
  538. CloseConnection(connObj);
  539. return 2;
  540. }
  541. CloseConnection(connObj);
  542. return 0;
  543. }
  544. SYSIF_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection )
  545. {
  546. mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
  547. return 0;
  548. }
  549. */
  550. /////////////////////////////////////////////////////////////////////////////
  551. // Callbacks for the success or failures of requested actions.
  552. // This could be used to initiate further action, but here we just log the
  553. // results to the console.
  554. class action_listener : public virtual mqtt::iaction_listener
  555. {
  556. std::string name_;
  557. void on_failure(const mqtt::token& tok) override {
  558. std::cout << name_ << " failure";
  559. if (tok.get_message_id() != 0)
  560. std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
  561. std::cout << std::endl;
  562. }
  563. void on_success(const mqtt::token& tok) override {
  564. std::cout << name_ << " success";
  565. if (tok.get_message_id() != 0)
  566. std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
  567. auto top = tok.get_topics();
  568. if (top && !top->empty())
  569. std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
  570. std::cout << std::endl;
  571. }
  572. public:
  573. action_listener(const std::string& name) : name_(name) {}
  574. };
  575. /////////////////////////////////////////////////////////////////////////////
  576. /**
  577. * Local callback & listener class for use with the client connection.
  578. * This is primarily intended to receive messages, but it will also monitor
  579. * the connection to the broker. If the connection is lost, it will attempt
  580. * to restore the connection and re-subscribe to the topic.
  581. */
  582. /*
  583. class callback : public virtual mqtt::callback,
  584. public virtual mqtt::iaction_listener
  585. {
  586. // Counter for the number of connection retries
  587. int nretry_;
  588. // The MQTT client
  589. mqtt::async_client& cli_;
  590. // Options to use if we need to reconnect
  591. mqtt::connect_options& connOpts_;
  592. // An action listener to display the result of actions.
  593. action_listener subListener_;
  594. bool bConnected;
  595. std::function<void(const ResDataObject*, char*)> onmsg_;
  596. // This deomonstrates manually reconnecting to the broker by calling
  597. // connect() again. This is a possibility for an application that keeps
  598. // a copy of it's original connect_options, or if the app wants to
  599. // reconnect with different options.
  600. // Another way this can be done manually, if using the same options, is
  601. // to just call the async_client::reconnect() method.
  602. void reconnect() {
  603. std::this_thread::sleep_for(std::chrono::milliseconds(2500));
  604. try {
  605. cli_.connect(connOpts_, nullptr, *this);
  606. }
  607. catch (const mqtt::exception& exc) {
  608. std::cerr << "Error: " << exc.what() << std::endl; {
  609. //exit(1);
  610. }
  611. }
  612. catch (...) {
  613. std::cout << "Connect Some Error " << std::endl;
  614. }
  615. }
  616. // Re-connection failure
  617. void on_failure(const mqtt::token& tok) override {
  618. std::cout << "Connection attempt failed" << std::endl;
  619. if (++nretry_ > N_RETRY_ATTEMPTS) {
  620. //exit(1);
  621. }
  622. reconnect();
  623. }
  624. // (Re)connection success
  625. // Either this or connected() can be used for callbacks.
  626. void on_success(const mqtt::token& tok) override {}
  627. // (Re)connection success
  628. void connected(const std::string& cause) override {
  629. bConnected = true;
  630. std::cout << "\nConnection success" << std::endl;
  631. std::cout << "\nSubscribing to topic '" << "'\n"
  632. << "\tfor client " << DEVICE_ID
  633. << " using QoS" << QOS << "\n"
  634. << "\nPress Q<Enter> to quit\n" << std::endl;
  635. //cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
  636. //g_pDPCDeviceObject->ReSubscribe();
  637. }
  638. // Callback for when the connection is lost.
  639. // This will initiate the attempt to manually reconnect.
  640. void connection_lost(const std::string& cause) override {
  641. std::cout << "\nConnection lost" << std::endl;
  642. if (!cause.empty())
  643. std::cout << "\tcause: " << cause << std::endl;
  644. std::cout << "Reconnecting..." << std::endl;
  645. nretry_ = 0;
  646. bConnected = false;
  647. reconnect();
  648. }
  649. // Callback for when a message arrives.
  650. void message_arrived(mqtt::const_message_ptr msg) override {
  651. std::cout << "Message arrived" << std::endl;
  652. std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
  653. std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
  654. //订阅的主题到了
  655. ResDataObject req;
  656. req.decode(msg->get_payload_str().c_str());
  657. //onmsg_(req, msg->get_topic().c_str());
  658. //g_pDPCDeviceObject->NOTIFY(msg->get_topic(), msg->to_string());
  659. }
  660. void delivery_complete(mqtt::delivery_token_ptr token) override {
  661. }
  662. public:
  663. callback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::function<void(const ResDataObject*, char*)> onmsg)
  664. : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), onmsg_(onmsg) {
  665. bConnected = false;
  666. }
  667. };*/
  668. /*
  669. callback* g_pCallback = NULL;
  670. string TOPIC_PREFIX;
  671. string ACTION_TOPIC_PREFIX;*/
  672. /*
  673. void LogicDevice::NOTIFY(string topic, string context) {
  674. Logger* pDevLog = GetLogHandle();
  675. PRINTA_INFO(pDevLog, "NotifyTopic Come %s :%s", topic, context);
  676. if (ACTION_TOPIC_PREFIX.length() > 0 && topic._Starts_with(ACTION_TOPIC_PREFIX)) {
  677. // 本设备的Action请求
  678. std::cout << "Action from Topic " << topic << "request " << context << std::endl;
  679. // TODO ... 这里应该改成 一个工作线程来处理Action请求
  680. DispatchAction(topic, context);
  681. }
  682. else {
  683. ResDataObject notify;
  684. notify.add("topic", topic.c_str());
  685. notify.add("context", context.c_str());
  686. string out;
  687. //m_pMidObject->Action("NotifyTopic", notify.encode(), out);
  688. }
  689. }*/
  690. /*
  691. void LogicDevice::ReSubscribe() {
  692. Logger* pDevLog = GetLogHandle();
  693. if (m_pMqttClient->is_connected()) {
  694. for each (string var in m_strSubTopics)
  695. {
  696. PRINTA_INFO(pDevLog, "ReSubscribe topic %s", var);
  697. m_pMqttClient->subscribe(var, QOS, nullptr, g_pCallback->subListener_);
  698. }
  699. }
  700. else {
  701. PRINTA_ERROR(pDevLog, " MQTT connection is not available ");
  702. std::cout << " LogicDevice::ReSubscribe() MQTT connection is not available " << std::endl;
  703. }
  704. }*/
  705. /*
  706. void LogicDevice::SubscribeOne(string topic, bool unSubscribe) {
  707. Logger* pDevLog = GetLogHandle();
  708. if (!m_pMqttClient->is_connected()) {
  709. PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
  710. return;
  711. }
  712. bool bFind = false;
  713. for each (string var in m_strSubTopics)
  714. {
  715. if (var == topic) {
  716. bFind = true;
  717. break;
  718. }
  719. }
  720. if (!bFind && !unSubscribe) {
  721. PRINTA_INFO(pDevLog, "Subscribe topic %s", topic);
  722. m_strSubTopics.push_back(topic);
  723. m_pMqttClient->subscribe(topic, QOS, nullptr, g_pCallback->subListener_);
  724. }
  725. if (bFind && unSubscribe) {
  726. PRINTA_INFO(pDevLog, "UnSubscribe topic %s", topic);
  727. m_strSubTopics.erase(std::remove(m_strSubTopics.begin(), m_strSubTopics.end(), topic), m_strSubTopics.end());
  728. m_pMqttClient->unsubscribe(topic);
  729. }
  730. }
  731. */
  732. /*
  733. void LogicDevice::SubscribeAction()
  734. {
  735. if (ACTION_TOPIC_PREFIX.length() <= 0) {
  736. ostringstream ostopic;
  737. ostopic << TOPIC_PREFIX << "Action/";
  738. ACTION_TOPIC_PREFIX = ostopic.str();
  739. }
  740. Logger* pDevLog = GetLogHandle();
  741. if (m_pMqttClient->is_connected()) {
  742. // Action Topic
  743. // Detector/1234xx/Action
  744. string DevRes = m_pMidObject->GetResource();
  745. //if (m_pMidObject->GetDeviceResource(DevRes))
  746. {
  747. ResDataObject LowLayerRes;
  748. if (LowLayerRes.decode(DevRes.c_str()))
  749. {
  750. //attr
  751. int Idx = LowLayerRes.GetFirstOf("Action");
  752. if (Idx >= 0)
  753. {
  754. ResDataObject actions = LowLayerRes[Idx];
  755. for (size_t i = 0; i < actions.size(); i++)
  756. {
  757. ostringstream ostopic;
  758. ostopic << ACTION_TOPIC_PREFIX << (const char*)actions.GetKey(i);
  759. SubscribeOne(ostopic.str());
  760. }
  761. }
  762. else
  763. {
  764. PRINTA_INFO(pDevLog, "There is No Action to Subscribe ???? .");
  765. }
  766. }
  767. }
  768. //m_pMidObject->Action
  769. }
  770. }*/
  771. /////////////////////////////////////////////////////////////////////////////