LogicDevice.cpp 71 KB


  1. // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include "StdAfx.h"
  4. #include "objbase.h"
  5. #include "LogicDevice.h"
  6. #include "PacketAnalizer.h"
  7. #include "MessageInfo.h"
  8. #include "common_api.h"
  9. #include "LocalConfig.h"
  10. #include "Base64.h"
  11. #include "SystemLogger.hpp"
  12. #include <iostream>
  13. #include<chrono>
  14. //#include "matt/async_client.h"
  15. //#include "mqtt/async_client.h"
  16. //#include "MQTTAsync.h"
  17. //#include "MQTTAsync.h"
  18. Log4CPP::Logger* //mLog::gLogger = nullptr;
  19. //#include "mqttclient.h"
  20. //void msgarrivd(void* client, message_data_t* message);
  21. std::string CurrentDateTime();
  22. //-------------Logic Device SysIF--------------------------
  23. LogicDeviceSysIF::LogicDeviceSysIF(void)
  24. {
  25. }
  26. LogicDeviceSysIF::~LogicDeviceSysIF(void)
  27. {
  28. }
  29. //init
  30. void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
  31. {
  32. m_pLogicDev = p;
  33. p->SetSysLogicDevice(this);
  34. };
  35. LogicDevice* LogicDeviceSysIF::GetLogicDevice()
  36. {
  37. return m_pLogicDev;
  38. }
  39. //Command In and Out
  40. //notify from lower layer
  41. RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  42. {
  43. return RET_FAILED;
  44. };
  45. //notify to lower layer
  46. RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
  47. {
  48. if (m_pLogicDev)
  49. {
  50. return m_pLogicDev->CmdToLogicDev(pCmd);
  51. }
  52. return RET_NOSUPPORT;
  53. };
  54. const std::string SERVER_ADDRESS("127.0.0.1");
  55. /*
  56. string DEVICE_ID; //AS CLIENT_ID
  57. //const std::string CLIENT_ID("paho_cpp_async_subcribe");
  58. //const std::string TOPIC("hello");
  59. mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
  60. const int QOS = 1;
  61. const int N_RETRY_ATTEMPTS = 5;*/
  62. using namespace std;
  63. LogicDevice* g_pDPCDeviceObject = NULL;
  64. string LogHost = "";
  65. //string DEVICE_ID;
  66. //-------------Data Logic Device--------------------------
  67. LogicDevice::LogicDevice(void)
  68. {
  69. m_pLogger = NULL;
  70. m_pSysLogic = NULL;
  71. m_pDevInstance = new char[40];
  72. m_pResErrorList = new ResDataObject();
  73. GUID guid;
  74. string strDevInstance;
  75. CoCreateGuid(&guid);
  76. guid_2_string(guid, strDevInstance);
  77. sprintf_s(m_pDevInstance,40, "%s", strDevInstance.c_str());
  78. m_EvtNotify = CreateEvent(0, 0, 0, 0);
  79. m_SemphRequest = CreateSemaphore(NULL, 0, 100, "LogicDevice_Reqquest");
  80. m_pDrvDPC = NULL;
  81. g_pDPCDeviceObject = this;
  82. m_pMqttConntion = nullptr;
  83. //m_strServer = "tcp://localhost:1883";
  84. m_strServer = SERVER_ADDRESS;// "192.168.2.225";
  85. m_strServerPort = "1883";
  86. m_bMqttUseSSL = false;
  87. m_strMqttUser = "";
  88. m_strMqttPassword = "";
  89. m_strEBusRoot = "";
  90. m_strCCOSRoot = "";
  91. m_pParent = nullptr;
  92. m_topicFilter = nullptr;
  93. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  94. int x = 0;
  95. for ( x = 0; x < sizeof(szPad); x++)
  96. szPad[x] = 'A' + x % 26;
  97. szPad[x-1] = 0;
  98. memset(szPad2, 0, sizeof(szPad2));
  99. }
  100. LogicDevice::~LogicDevice(void)
  101. {
  102. delete []m_pDevInstance;
  103. delete m_pResErrorList;
  104. CloseHandle(m_EvtNotify);
  105. CloseHandle(m_SemphRequest);
  106. m_EvtNotify = NULL;
  107. delete m_pPacketReceivedQue;
  108. }
  109. void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
  110. if (pszEBusRoot[0] == '/') {
  111. m_strEBusRoot = pszEBusRoot + 1;
  112. }
  113. else {
  114. m_strEBusRoot = pszEBusRoot;
  115. }
  116. char szKeys[256];
  117. strcpy(szKeys, pszEBusRoot);
  118. char* pt = szKeys;
  119. while (*pt != 0)
  120. {
  121. if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
  122. *pt = '_';
  123. pt++;
  124. }
  125. m_strClientID = CCOS_CLIENT_ID_PREFIX;
  126. m_strClientID += szKeys;
  127. if(pszCCOSRoot != nullptr)
  128. m_strCCOSRoot = pszCCOSRoot;
  129. //mLog::FINFO("Set MQTT ClientName {$} ", m_strClientID);
  130. SetName(m_strClientID.c_str());
  131. OnSetClientID();
  132. }
  133. void LogicDevice::OnSetClientID() {
  134. }
  135. void LogicDevice::SubscribeSelf() {
  136. if (m_strDevicePath.length() > 0)
  137. if (m_strDevicePath.c_str()[0] != '/')
  138. m_strDevicePath = "/" + m_strDevicePath;
  139. //mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
  140. //std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
  141. if (m_strEBusRoot.length() > 0) {
  142. SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str());
  143. }
  144. if (m_strCCOSRoot.length() > 0)
  145. {
  146. SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot + m_strDevicePath).c_str());
  147. //订阅
  148. }
  149. SubscribeActions();
  150. }
  151. void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  152. {
  153. if (bSubscribe)
  154. SubscribeTopic(m_pMqttConntion, pszTopic);
  155. else
  156. UnSubscribe(m_pMqttConntion, pszTopic);
  157. }
  158. void LogicDevice::NotifyDrvThread()
  159. {
  160. if (m_EvtNotify)
  161. {
  162. SetEvent(m_EvtNotify);
  163. }
  164. }
  165. HANDLE LogicDevice::GetEvtHandle()
  166. {
  167. return m_EvtNotify;
  168. }
  169. //1. init part
  170. void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
  171. {
  172. m_pSysLogic = pLogic;
  173. }
  174. void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
  175. {
  176. m_pLogger = pLogger;
  177. }
  178. Logger *LogicDevice::GetLogHandle()
  179. {
  180. return m_pLogger;
  181. }
  182. void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
  183. {
  184. m_pDrvDPC = pDPC;
  185. }
  186. DriverDPC *LogicDevice::GetDrvDPC()
  187. {
  188. return m_pDrvDPC;
  189. }
  190. RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
  191. {
  192. RET_STATUS ret = RET_FAILED;
  193. if (szEbusDevPath != nullptr) {
  194. auto search = GetEbusChild(szEbusDevPath);
  195. if (search == nullptr)
  196. {
  197. m_subDevices.push_back(pChild);
  198. std::sort(m_subDevices.begin(), m_subDevices.end(), [](LogicDevice* p1, LogicDevice* p2) {
  199. return p1->GetRootPath() < p2->GetRootPath();
  200. });
  201. }
  202. ret = RET_SUCCEED;
  203. //return RET_SUCCEED;
  204. }
  205. return ret;
  206. }
  207. RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
  208. {
  209. RET_STATUS ret = RET_FAILED;
  210. if (szCcosDevPath != nullptr)
  211. {
  212. if (strncmp(m_strCCOSRoot.c_str(), szCcosDevPath, m_strCCOSRoot.length()) == 0)
  213. {
  214. auto search = GetCcosChild(szCcosDevPath);
  215. if (search == nullptr) {
  216. m_subCcosDevices.push_back(pChild);
  217. std::sort(m_subCcosDevices.begin(), m_subCcosDevices.end(), [](LogicDevice* p1, LogicDevice* p2) {
  218. return p1->GetCcosRootPath() < p2->GetCcosRootPath();
  219. });
  220. }
  221. return RET_SUCCEED;
  222. }
  223. }
  224. return ret;
  225. }
  226. LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
  227. {
  228. //std::binary_search(m_subDevices.begin(), m_subDevices.end(), szEbusDevPath, [szEbusDevPath]()->bool {
  229. // return
  230. // });
  231. for each (LogicDevice* var in m_subDevices)
  232. {
  233. if (var->GetRootPath().compare(szEbusDevPath) == 0)
  234. return var;
  235. }
  236. return nullptr;
  237. }
  238. LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
  239. {
  240. for each (LogicDevice * var in m_subDevices)
  241. {
  242. if (var->GetCcosRootPath().compare(szCcosDevPath) == 0)
  243. return var;
  244. }
  245. return nullptr;
  246. }
  247. void SYSTEM_CALL LogicDevice::CompleteInit()
  248. {
  249. if (//mLog::gLogger == nullptr)
  250. {
  251. string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  252. LogHost = ((string)getLogRootpath()).c_str();
  253. if (LogHost.length() <= 1)
  254. {
  255. char szName[256];
  256. sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
  257. LogHost = szName;
  258. }
  259. Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice");
  260. //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  261. Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1);
  262. auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  263. //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  264. }
  265. else
  266. {
  267. string strRoot = ((string)getLogRootpath()).c_str();
  268. if (strRoot.length() > 1 && strRoot != LogHost)
  269. {
  270. string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  271. LogHost = strRoot;
  272. Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice");
  273. //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  274. Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1);
  275. auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  276. //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  277. }
  278. }
  279. //string version;
  280. //if (GetVersion(version, hMyModule))
  281. // //mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
  282. //else
  283. //mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
  284. //mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
  285. if (m_strClientID.length() <= 0)
  286. {
  287. //mLog::FINFO("No Client name ......" );
  288. //std::cout << "No Client name ......." << endl;
  289. return;
  290. }
  291. m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
  292. [this](ResDataObject* req, const char* topic, void* conn) {
  293. //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
  294. //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
  295. // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
  296. // 消息处理如果耗时较长,则需要开线程来处理
  297. if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
  298. strncmp(topic, m_strCCOSRoot.c_str(), m_strCCOSRoot.length()) == 0)
  299. {
  300. //主题以ebus或者ccos开头,给我发的消息,进行处理
  301. ProcessSubscribeRequest(req);
  302. }
  303. else
  304. {
  305. //我主动订阅的外部模块的消息
  306. ProcessSubscribeMsg(req);
  307. }
  308. CmdToLogicDev(req);
  309. });
  310. if (m_pMqttConntion != nullptr)
  311. {
  312. StartThread(); //启动Request线程
  313. SubscribeSelf();
  314. }
  315. }
  316. RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
  317. PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
  318. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
  319. switch (type) {
  320. case PACKET_TYPE_REQ:
  321. ProcessRequest(req, cmd);//请求
  322. break;
  323. case PACKET_TYPE_RES:
  324. ProcessResponse(req, cmd);//应答
  325. break;
  326. case PACKET_TYPE_NOTIFY:
  327. ProcessNotify(req, cmd);
  328. //通知
  329. break;
  330. }
  331. return RET_FAILED;
  332. }
  333. RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
  334. {
  335. ProcessSubscribeRequest(pCmd);
  336. return RET_SUCCEED;
  337. }
  338. RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
  339. {
  340. //Open命令
  341. if (cmd == PACKET_CMD_OPEN) {
  342. /* {
  343. "IDX": "40",
  344. "TYPE" : "0",
  345. "CMD" : "0",
  346. "HANDLE" :
  347. {
  348. "ROUTE": "1",
  349. "FLAGS" : "63",
  350. "LANG" : "en-US",
  351. "HANDLEID" : "0",
  352. "OWNERID" :
  353. {
  354. "EBUSID": "ImageSave",
  355. "MACHINEID" : "DESKTOP-FVD53H8",
  356. "PROCID" : "35408",
  357. "ADDR" : "2631366957696"
  358. },
  359. "DEVID":
  360. {
  361. "EBUSID": "ccosChannel",
  362. "MACHINEID" : "",
  363. "PROCID" : "0",
  364. "ADDR" : "0"
  365. }
  366. },
  367. "KEY": "\/ccosChannel"
  368. ResDataObject resRes, resResponse;
  369. ResDataObject resTopic;
  370. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  371. if (cmd == PACKET_CMD_OPEN )
  372. {
  373. //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
  374. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  375. return RET_SUCCEED;
  376. }
  377. if (cmd == PACKET_CMD_CLOSE)
  378. {
  379. //CLOSE 客户端主动断开
  380. }
  381. } */
  382. ResDataObject resRes, resResponse;
  383. GetDeviceResource(&resRes);
  384. LogicDevice::GetDeviceResource(&resRes);
  385. //std::cout << "For Open Result" << resRes.encode() << endl;
  386. PacketAnalizer::MakeOpenResponse(*pCmd, resResponse, resRes);
  387. PacketAnalizer::UpdateDeviceNotifyResponse(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)m_pMqttConntion);
  388. ResDataObject resTopic;
  389. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  390. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  391. //这里先直接给1,后面取真实状态
  392. ResDataObject NotifyData;
  393. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE, "ConnectionStatus", "1");
  394. //CmdFromLogicDev(&NotifyData);
  395. PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  396. return RET_SUCCEED;
  397. }
  398. else if (cmd == PACKET_CMD_CLOSE)
  399. {
  400. }
  401. else {
  402. PacketArrived(pCmd);
  403. }
  404. return RET_SUCCEED;
  405. }
  406. RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
  407. {
  408. return RET_SUCCEED;
  409. }
  410. RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
  411. {
  412. return RET_SUCCEED;
  413. }
  414. void LogicDevice::PacketArrived(ResDataObject* pRequest)
  415. {
  416. //m_pPacketReceivedQue->Lock();
  417. m_pPacketReceivedQue->InQueue(*pRequest);
  418. //SetEvent(m_EvtNotify);//notify to user
  419. ReleaseSemaphore(m_SemphRequest, 1, NULL);
  420. //m_pPacketReceivedQue->UnLock();
  421. }
  422. bool LogicDevice::OnStartThread()
  423. {
  424. return true;
  425. }
  426. bool LogicDevice::OnEndThread()
  427. {
  428. return true;
  429. }
  430. bool LogicDevice::Exec(void)
  431. {
  432. HANDLE hWait[2];
  433. hWait[0] = m_ExitFlag;
  434. hWait[1] = m_SemphRequest;
  435. DWORD dwRet = WaitForMultipleObjects(2, hWait, FALSE, 3);
  436. if (dwRet == WAIT_OBJECT_0)
  437. {
  438. return false;
  439. }
  440. //else if (dwRet == WAIT_OBJECT_0 + 1)
  441. {
  442. //请求到了
  443. ResDataObject req;
  444. while (m_pPacketReceivedQue->DeQueue(req))
  445. {
  446. ResDataObject resResource, resResponse, resPacket;
  447. ResDataObject resTopic;
  448. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
  449. string keystr = PacketAnalizer::GetPacketKey(&req);
  450. RET_STATUS ret = RET_FAILED;
  451. PacketAnalizer::MakeResponseByReq(resPacket, req, ret);
  452. //mLog::FINFO(" {$} Got {$} Request by {$}", GetCurrentThreadId(), keystr, m_strClientID);
  453. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Got " << keystr << " Request by " << m_strClientID << endl;
  454. if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource")
  455. {
  456. ResDataObject devRes;
  457. if ((ret = GetDeviceResource(&devRes)) == RET_SUCCEED)
  458. {
  459. LogicDevice::GetDeviceResource(&devRes);
  460. PacketAnalizer::UpdatePacketContext(resPacket, devRes);
  461. PacketAnalizer::MakeRetCode(RET_SUCCEED, &resPacket);
  462. }
  463. else
  464. {
  465. PacketAnalizer::MakeRetCode(RET_FAILED, &resPacket);
  466. }
  467. }
  468. else
  469. {
  470. ret = Request(&req, &resPacket);
  471. PacketAnalizer::MakeRetCode(ret, &resPacket);
  472. PacketAnalizer::GetPacketRetCode(&resPacket, ret);
  473. }
  474. //PacketAnalizer::MakeResponseByReq(resPacket, req, ret);
  475. //PacketAnalizer::UpdatePacketContext(resPacket, resResponse);
  476. //PacketAnalizer::MakeRetCode(ret, &resPacket);
  477. PacketAnalizer::GetContextTopic(&req, resTopic);
  478. //mLog::FINFO("Request result {$}", resPacket.encode());
  479. //std::cout << " Request(pCmd, &resResponse) result" << resPacket.encode() << endl;
  480. PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
  481. //mLog::FINFO(" {$} Send {$} Resp {$}", GetCurrentThreadId(), keystr, m_strClientID);
  482. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Send "<< keystr <<" Resp" << m_strClientID << endl;
  483. }
  484. return true;
  485. }
  486. return true;
  487. }
  488. void SYSTEM_CALL LogicDevice::CompleteUnInit()
  489. {
  490. StopThread();
  491. }
  492. int LogicDevice::GetDevice_Thread_Priority()
  493. {
  494. return THREAD_PRIORITY_NONE;
  495. }
  496. RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  497. {
  498. //Get Unit Type (Unit GUID)
  499. if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
  500. {
  501. pDeviceResource->add("LogicDevInstance", m_pDevInstance);
  502. }
  503. ////
  504. size_t idx = (*pDeviceResource)["Attribute"].size();
  505. if (idx > 0)
  506. {
  507. int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
  508. if (erroridx < 0)
  509. {
  510. (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
  511. }
  512. else
  513. {
  514. (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
  515. }
  516. }
  517. else
  518. {
  519. ResDataObject Attribute;
  520. Attribute.add("ErrorList", *m_pResErrorList);
  521. pDeviceResource->add("Attribute", Attribute);
  522. }
  523. return RET_SUCCEED;
  524. }
  525. //notify from lower layer
  526. RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
  527. {
  528. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
  529. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  530. if (type == PACKET_TYPE_NOTIFY)
  531. {
  532. PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  533. }
  534. return RET_SUCCEED;
  535. /*
  536. if (pCmd && m_pSysLogic)
  537. {
  538. return m_pSysLogic->CmdFromLogicDev(pCmd);
  539. }
  540. //put log here
  541. return RET_FAILED;*/
  542. }
  543. std::wstring mb2wc_a(const char* mbstr)
  544. {
  545. std::wstring strVal;
  546. int size = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, NULL, 0);
  547. wchar_t* wcstr = new wchar_t[size + 1];
  548. if (wcstr)
  549. {
  550. memset(wcstr, 0, size * sizeof(wchar_t));
  551. int ret = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, wcstr, size);
  552. if (ret != 0) // MultiByteToWideChar returns 0 if it does not succeed.
  553. {
  554. strVal = wcstr;
  555. }
  556. delete[] wcstr;
  557. wcstr = NULL;
  558. }
  559. return strVal;
  560. }
  561. RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
  562. {
  563. string ResBase64, DesBase64;
  564. wstring wResUTF = ResInfo;
  565. wstring wDesUTF = Description;
  566. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  567. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  568. return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
  569. }
  570. RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  571. {
  572. string ResBase64,DesBase64;
  573. wstring wResUTF = mb2wc_a(ResInfo);
  574. wstring wDesUTF = mb2wc_a(Description);
  575. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  576. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  577. return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
  578. }
  579. RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  580. {
  581. //int ret = 1;
  582. if (Code == 0 || (string)ResInfo == "")
  583. {
  584. //mLog::FERROR("Code or ResInfo is empty");
  585. return RET_FAILED;
  586. }
  587. MessageInfo info;
  588. info.CodeID = Code;
  589. info.Type = nMessageType;
  590. info.Level = Level;
  591. info.Resouceinfo = ResInfo;
  592. info.Description = Description;
  593. string strDevInstanceCode = DevInstance;
  594. strDevInstanceCode += Code;
  595. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  596. {
  597. string strInstancekey = m_pResErrorList->GetKey(i);
  598. if (strInstancekey == strDevInstanceCode)
  599. {
  600. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  601. //{
  602. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  603. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  604. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  605. // {
  606. //ret = 0;
  607. //mLog::FDEBUG("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
  608. return RET_SUCCEED;
  609. // }
  610. //}
  611. //ret = 2;
  612. //break;
  613. }
  614. }
  615. //if (ret==1)
  616. {
  617. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  618. //info.GetResDataObject(tempInfo);
  619. //ErrorInfo.update(Code, tempInfo);
  620. info.GetResDataObject(ErrorInfo);
  621. SYSTEMTIME st;
  622. GetLocalTime(&st);
  623. string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  624. ErrorInfo.add("CreationTime", TimeTag.c_str());
  625. ErrorInfo.add("AppId", pAppId);
  626. ErrorInfo.add("InstanceId", DevInstance);
  627. if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  628. {
  629. m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
  630. }
  631. ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
  632. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  633. //mLog::FDEBUG( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
  634. //CmdFromLogicDev(&NotifyData);
  635. PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  636. }
  637. //else if (ret == 2)
  638. //{
  639. // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
  640. // info.GetResDataObject(tempInfo);
  641. // ErrorInfo.update(Code, tempInfo);
  642. // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  643. // {
  644. // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
  645. // }
  646. // ResNotify.update(DevInstance, ErrorInfo);
  647. // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  648. // RES_//mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
  649. // CmdFromLogicDev(&NotifyData);
  650. //}
  651. //put log here
  652. return RET_SUCCEED;
  653. }
  654. RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
  655. {
  656. AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
  657. //put log here
  658. return RET_FAILED;
  659. }
  660. RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
  661. {
  662. int index = -1;
  663. bool m_bClearAll = false;
  664. //trim empty code string
  665. string CodeStr = Code;
  666. if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
  667. {
  668. CodeStr = " ";
  669. Code = CodeStr.c_str();
  670. m_bClearAll = true;
  671. }
  672. //if ((string)Code == "0" || (string)Code == "")
  673. //{
  674. // m_bClearAll = true;
  675. //}
  676. string strDevInstanceCode = DevInstance;
  677. strDevInstanceCode += Code;
  678. if (m_bClearAll)
  679. {
  680. m_pResErrorList->clear();
  681. }
  682. else
  683. {
  684. MessageInfo info;
  685. info.CodeID = Code;
  686. info.Type = nMessageType;
  687. info.Level = Level;
  688. info.Resouceinfo = ResInfo;
  689. info.Description = Description;
  690. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  691. {
  692. string strInstancekey = m_pResErrorList->GetKey(i);
  693. if (strInstancekey == strDevInstanceCode)
  694. {
  695. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  696. //{
  697. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  698. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  699. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  700. // {
  701. // index = (INT)j;
  702. // break;
  703. // }
  704. //}
  705. index = (INT)i;
  706. break;
  707. }
  708. }
  709. }
  710. MessageInfo info;
  711. info.CodeID = Code;
  712. info.Type = nMessageType;
  713. info.Level = Level;
  714. info.Resouceinfo = ResInfo;
  715. info.Description = Description;
  716. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  717. //info.GetResDataObject(tempInfo);
  718. //ErrorInfo.add(Code, tempInfo);
  719. info.GetResDataObject(ErrorInfo);
  720. ErrorInfo.add("InstanceId", DevInstance);
  721. bool result = false;
  722. if (index>=0)
  723. {
  724. //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
  725. result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
  726. }
  727. if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
  728. {
  729. ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
  730. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
  731. //mLog::FDEBUG( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
  732. //CmdFromLogicDev(&NotifyData);
  733. PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  734. }
  735. return RET_SUCCEED;
  736. }
  737. RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
  738. {
  739. DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
  740. //put log here
  741. return RET_FAILED;
  742. }
  743. RET_STATUS LogicDevice::EvtProcedure()
  744. {
  745. return RET_FAILED;
  746. }
  747. bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
  748. {
  749. //mLog::FERROR("NOT FINISHED YET");
  750. return false;
  751. }
  752. RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
  753. {
  754. std::string strResult = "";
  755. //组Context包
  756. if (m_pLogger)
  757. {
  758. wstring wContect = mb2wc_a(pContext);
  759. CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
  760. }
  761. //Thread_Lock();
  762. //if (NULL != fmt)
  763. //{
  764. // va_list marker = NULL;
  765. // va_start(marker, fmt);
  766. // size_t nLength = _vscprintf(fmt, marker) + 1;
  767. // std::vector<char> vBuffer(nLength, '\0');
  768. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  769. // if (nWritten > 0)
  770. // {
  771. // strResult = &vBuffer[0];
  772. // }
  773. // va_end(marker);
  774. //}
  775. //Thread_UnLock();
  776. ResDataObject SysLogNode;
  777. string guidstr;
  778. GUID DeviceGuid;
  779. //组Log包
  780. if (GetDeviceType(DeviceGuid))
  781. {
  782. guid_2_string(DeviceGuid, guidstr);
  783. SysLogNode.add("Module", guidstr.c_str());
  784. SysLogNode.add("AppId", pAppId);
  785. SysLogNode.add("ThreadId", GetCurrentThreadId());
  786. if (pCode)
  787. {
  788. SysLogNode.add("BusinessKey", pCode);
  789. }
  790. else
  791. {
  792. SysLogNode.add("BusinessKey", "");
  793. }
  794. SysLogNode.add("IP", (const char*)getLocalIpAddress());
  795. SYSTEMTIME st;
  796. GetLocalTime(&st);
  797. string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  798. SysLogNode.add("CreationTime", TimeTag.c_str());
  799. string strLevel = SysLogLevel2str(Level);
  800. SysLogNode.add("Level", strLevel.c_str());
  801. SysLogNode.add("HostName", (const char*)getLocalMachineId());
  802. SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
  803. SysLogNode.add("FreeText", strResult.c_str());
  804. SysLogNode.add("Context", pCode);
  805. ResDataObject NotifyData;
  806. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  807. //CmdFromLogicDev(&NotifyData);
  808. PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  809. }
  810. else
  811. {
  812. PRINTA_ERROR(m_pLogger, "no Guid??");
  813. return RET_FAILED;
  814. }
  815. //打印LOG
  816. switch (Level)
  817. {
  818. case Syslog_Debug:
  819. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  820. //mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
  821. break;
  822. case Syslog_Information:
  823. ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  824. //mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  825. break;
  826. case Syslog_Warning:
  827. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  828. //mLog::Warn("SysLog {$} ", SysLogNode.encode());
  829. break;
  830. case Syslog_Error:
  831. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  832. //mLog::FERROR("SysLog {$} ", SysLogNode.encode());
  833. break;
  834. case Syslog_Fatal:
  835. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  836. //mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  837. break;
  838. default:
  839. //mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
  840. break;
  841. }
  842. return RET_SUCCEED;
  843. }
  844. RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
  845. {
  846. std::string strResult = "";
  847. //组Context包
  848. if (m_pLogger)
  849. {
  850. m_pLogger->Thread_Lock();
  851. if (NULL != fmt)
  852. {
  853. va_list marker = NULL;
  854. va_start(marker, fmt);
  855. size_t nLength = _vscprintf(fmt, marker) + 1;
  856. std::vector<char> vBuffer(nLength, '\0');
  857. int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  858. if (nWritten > 0)
  859. {
  860. strResult = &vBuffer[0];
  861. }
  862. va_end(marker);
  863. }
  864. m_pLogger->Thread_UnLock();
  865. }
  866. else
  867. {
  868. Thread_Lock();
  869. if (NULL != fmt)
  870. {
  871. va_list marker = NULL;
  872. va_start(marker, fmt);
  873. size_t nLength = _vscprintf(fmt, marker) + 1;
  874. std::vector<char> vBuffer(nLength, '\0');
  875. int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  876. if (nWritten > 0)
  877. {
  878. strResult = &vBuffer[0];
  879. }
  880. va_end(marker);
  881. }
  882. Thread_UnLock();
  883. }
  884. ResDataObject SysLogNode;
  885. string guidstr;
  886. GUID DeviceGuid;
  887. //组Log包
  888. if (GetDeviceType(DeviceGuid))
  889. {
  890. guid_2_string(DeviceGuid, guidstr);
  891. SysLogNode.add("Module", guidstr.c_str());
  892. SysLogNode.add("AppId", "");
  893. SysLogNode.add("ThreadId", GetCurrentThreadId());
  894. if (pCode)
  895. {
  896. SysLogNode.add("BusinessKey", pCode);
  897. }
  898. else
  899. {
  900. SysLogNode.add("BusinessKey", "");
  901. }
  902. SysLogNode.add("IP", (const char *)getLocalIpAddress());
  903. SYSTEMTIME st;
  904. GetLocalTime(&st);
  905. string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
  906. SysLogNode.add("CreationTime", TimeTag.c_str());
  907. SysLogNode.add("Level", Level);
  908. SysLogNode.add("HostName", (const char *)getLocalMachineId());
  909. SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
  910. SysLogNode.add("FreeText", strResult.c_str());
  911. ResDataObject NotifyData;
  912. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  913. //CmdFromLogicDev(&NotifyData);
  914. PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  915. if(m_pParent != nullptr)
  916. NotifyParent(&NotifyData, "/Notify");
  917. }
  918. else
  919. {
  920. PRINTA_ERROR(m_pLogger,"no Guid??");
  921. return RET_FAILED;
  922. }
  923. //打印LOG
  924. switch (Level)
  925. {
  926. case Syslog_Debug:
  927. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  928. //mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
  929. break;
  930. case Syslog_Information:
  931. //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  932. //mLog::FINFO("SysLog {$}", SysLogNode.encode());
  933. break;
  934. case Syslog_Warning:
  935. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  936. //mLog::Warn("SysLog {$}", SysLogNode.encode());
  937. break;
  938. case Syslog_Error:
  939. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  940. //mLog::FERROR("SysLog {$}", SysLogNode.encode());
  941. break;
  942. case Syslog_Fatal:
  943. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  944. //mLog::FINFO("SysLog {$}", SysLogNode.encode());
  945. break;
  946. default:
  947. //mLog::FINFO( "SysLog {$}", SysLogNode.encode());
  948. break;
  949. }
  950. return RET_SUCCEED;
  951. }
  952. /////////////////////////////////////////////////////////////////////////////
  953. ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  954. {
  955. return nullptr;
  956. }
  957. std::string CurrentDateTime()
  958. {
  959. // 获取当前时间点
  960. auto now = std::chrono::system_clock::now();
  961. // 将时间长度转换为微秒数
  962. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  963. // 再转成tm格式
  964. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  965. auto now_tm = std::localtime(&now_time_t);
  966. // 可以直接输出到标准输出
  967. // std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl;
  968. // 格式化字符,年月日时分秒
  969. std::string now_time_str;
  970. char buf[64];
  971. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm);
  972. now_time_str += buf;
  973. // 格式化微秒
  974. snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000);
  975. now_time_str += buf;
  976. //printf("%s\n", now_time_str.c_str());
  977. return now_time_str;
  978. }
  979. static string CurrentDateTime2()
  980. {
  981. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  982. char buf[100] = { 0 };
  983. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  984. return buf;
  985. }
  986. bool LogicDevice::GetActions(ResDataObject& resAction)
  987. {
  988. for (int x = 0; x < m_Actions.size(); x++)
  989. {
  990. resAction.update(m_Actions.GetKey(x), "");
  991. }
  992. return true;
  993. }
  994. void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
  995. {
  996. if (m_pParent != nullptr)
  997. {
  998. if (hConnection == nullptr)
  999. {
  1000. hConnection = m_pParent->m_pMqttConntion;
  1001. }
  1002. PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
  1003. }
  1004. }
  1005. const mqtt_qos_t MQTT_QOS = QOS2;
  1006. void LogicDevice::SubscribeActions()
  1007. {
  1008. if (nullptr == m_pMqttConntion)
  1009. return;
  1010. int num = m_Actions.size();
  1011. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
  1012. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
  1013. //if (!pMqttClient->is_connected()) {
  1014. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1015. // return ;
  1016. //}
  1017. if (num <= 0) {
  1018. ResDataObject res;
  1019. GetDeviceResource(&res);
  1020. //std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
  1021. m_Actions = res["Action"];
  1022. num = m_Actions.size();
  1023. }
  1024. std::string pszAction;
  1025. int ret = 0;
  1026. //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1027. //*/
  1028. for (size_t i = 0; i < num; i++)
  1029. {
  1030. pszAction = m_strEBusRoot;
  1031. pszAction += "/Action/";
  1032. pszAction += (m_Actions.GetKey(i));
  1033. //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1034. //pMqttClient->subscribe(pszAction, 1, opts);
  1035. pTopicList->push_back(pszAction);
  1036. ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1037. //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
  1038. if (m_strEBusRoot != m_strCCOSRoot)
  1039. {
  1040. pszAction = m_strCCOSRoot + m_strDevicePath;
  1041. pszAction += "/Action/";
  1042. pszAction += (m_Actions.GetKey(i));
  1043. //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1044. //pMqttClient->subscribe(pszAction, 1, opts);
  1045. pTopicList->push_back(pszAction);
  1046. ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1047. //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
  1048. }
  1049. }
  1050. pszAction = m_strEBusRoot;
  1051. pszAction += "/Action/UpdateDeviceResource";
  1052. //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1053. pTopicList->push_back(pszAction);
  1054. mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1055. //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
  1056. //*/
  1057. //不能订阅#,会造成收两遍请求
  1058. //pszAction = m_strEBusRoot;
  1059. //pszAction += "/Action/#";
  1060. ////pTopicList->push_back(pszAction);
  1061. ////mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd);
  1062. //SubScribeTopic(pszAction.c_str(), true);
  1063. //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1064. //if (m_strEBusRoot != m_strCCOSRoot)
  1065. //{
  1066. // pszAction = m_strCCOSRoot + m_strDevicePath;
  1067. // pszAction += "/Action/#";
  1068. // //pTopicList->push_back(pszAction);
  1069. // //mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd);
  1070. // SubScribeTopic(pszAction.c_str(), true);
  1071. // std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1072. //}
  1073. }
  1074. /*
  1075. void connlost(void* context, char* cause)
  1076. {
  1077. //连接中断
  1078. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1079. std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
  1080. printf("\nConnection lost\n");
  1081. printf(" cause: %s\n", cause);
  1082. std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
  1083. }
  1084. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  1085. {
  1086. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1087. std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
  1088. }
  1089. */
  1090. //发送成功
  1091. //void delivered(void* context, MQTTAsync_token dt)
  1092. //{
  1093. // printf("Message with token value %d delivery confirmed\n", dt);
  1094. // //deliveredtoken = dt;
  1095. //}
  1096. //void connlost(void* context, char* cause)
  1097. //{
  1098. // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
  1099. // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
  1100. //
  1101. // //连接中断
  1102. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1103. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1104. // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
  1105. // return;
  1106. //
  1107. // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1108. // int rc;
  1109. //
  1110. // printf("Reconnecting\n");
  1111. // conn_opts.keepAliveInterval = 20;
  1112. // conn_opts.cleansession = 1;
  1113. // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  1114. // {
  1115. // printf("Failed to start connect, return code %d\n", rc);
  1116. // //finished = 1;
  1117. // }
  1118. //}
  1119. //重连成功
  1120. //void onReconnected(void* context, char* cause)
  1121. //{
  1122. // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
  1123. // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
  1124. //
  1125. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1126. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1127. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1128. // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1129. // const char* client_id = std::get<CLINET_ID_ID>(*connection);
  1130. // int rc;
  1131. //
  1132. // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
  1133. // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
  1134. // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
  1135. // // 重新订阅,暂不重新订阅,看看
  1136. //
  1137. // ///*
  1138. // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1139. // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1140. // //opts.onSuccess = onSubscribe;
  1141. // //opts.onFailure = onSubscribeFailure;
  1142. // //opts.context = client;
  1143. // auto it = pTopicList->begin();
  1144. // while(it != pTopicList->end())
  1145. // {
  1146. // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
  1147. // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
  1148. // it++;
  1149. // }
  1150. // //*/
  1151. //
  1152. //}
  1153. //
  1154. ////MQTT 连接主动断开连接失败
  1155. //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1156. //{
  1157. // printf("Disconnect failed, rc %d\n", response->code);
  1158. // //disc_finished = 1;
  1159. //}
  1160. //
  1161. ////MQTT 连接主动断开连接成功
  1162. //void onDisconnect(void* context, MQTTAsync_successData* response)
  1163. //{
  1164. // //printf("Successful disconnection\n");
  1165. // //disc_finished = 1;
  1166. //}
  1167. //
  1168. //void onSubscribe(void* context, MQTTAsync_successData* response)
  1169. //{
  1170. // printf("Subscribe succeeded\n");
  1171. // //subscribed = 1;
  1172. //}
  1173. //
  1174. //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1175. //{
  1176. // printf("Subscribe failed, rc %d\n", response->code);
  1177. // //finished = 1;
  1178. //}
  1179. //
  1180. //
  1181. //void onConnectFailure(void* context, MQTTAsync_failureData* response)
  1182. //{
  1183. // printf("Connect failed, rc %d\n", response->code);
  1184. // //finished = 1;
  1185. //}
  1186. //void onConnect(void* context, MQTTAsync_successData* response)
  1187. //{
  1188. //
  1189. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1190. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1191. //
  1192. // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
  1193. //
  1194. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1195. // int rc;
  1196. //
  1197. // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
  1198. // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
  1199. // if (hConnected != NULL)
  1200. // SetEvent(hConnected);
  1201. //
  1202. // /*
  1203. // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1204. // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1205. // opts.onSuccess = onSubscribe;
  1206. // opts.onFailure = onSubscribeFailure;
  1207. // opts.context = client;
  1208. // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  1209. // {
  1210. // printf("Failed to start subscribe, return code %d\n", rc);
  1211. // finished = 1;
  1212. // }*/
  1213. //}
  1214. //MQTT消息抵达
  1215. //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1216. void msgarrivd(void* client, message_data_t* message)
  1217. {
  1218. if (client == nullptr)
  1219. {
  1220. printf("************************ somthing happend...");
  1221. return;
  1222. }
  1223. //消息抵达
  1224. ResDataObject req;
  1225. string topic = message->topic_name; //topicName; //msg->get_topic().c_str();
  1226. mqtt_client* pClient = (mqtt_client*)client;
  1227. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1228. string client_id = std::get<CLINET_ID_ID>(*connection);
  1229. std::string payload((const char*)message->message->payload, message->message->payloadlen);
  1230. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl;
  1231. //std::cout << " msg Body " << payload << endl;
  1232. //mLog::FINFO("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload);
  1233. const char* pclient_id = client_id.c_str();
  1234. req.decode(payload.c_str());
  1235. //取消息钩子
  1236. void* pHook = std::get<MSG_HOOK_ID>(*connection);
  1237. if (pHook != nullptr) {
  1238. ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
  1239. ccos_mqtt_msg_filter_func func = std::get<FILTER_FUNC_ID>(*filter);
  1240. //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1241. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl;
  1242. //mLog::FINFO("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic);
  1243. //消息钩子函数存在
  1244. if (func != nullptr) {
  1245. //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc;
  1246. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl;
  1247. //mLog::FINFO("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic);
  1248. ResDataObject* resObj = std::get<FILTER_RES_OBJ_ID>(*filter);
  1249. HANDLE hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1250. if (func(&req)) {
  1251. //勾住了,是该钩子的消息,则通知
  1252. std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1253. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl;
  1254. //mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} ", GetCurrentThreadId(), client_id, topic);
  1255. PacketAnalizer::GetPacketContext(&req, *resObj);
  1256. SetEvent(hEvent);
  1257. //MQTTAsync_freeMessage(&message);
  1258. //MQTTAsync_free(topicName);
  1259. return ;
  1260. }
  1261. }
  1262. }
  1263. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl;
  1264. ccos_mqtt_callback onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
  1265. if (onmsg != nullptr)
  1266. (onmsg)(&req, topic.c_str(), connection);
  1267. else
  1268. {
  1269. //mLog::FINFO("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get<CLINET_ID_ID>(*connection), topic);
  1270. //cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get<CLINET_ID_ID>(*connection) << " When processing " << topic << endl;
  1271. }
  1272. //MQTTAsync_freeMessage(&message);
  1273. //MQTTAsync_free(topicName);
  1274. return ;
  1275. }
  1276. ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
  1277. {
  1278. //Logger* pDevLog = GetLogHandle();
  1279. //std::cout << CurrentDateTime() << "======= LogicDevice::NewConnection ============ " << pszClientID << std::endl;
  1280. //mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
  1281. //连接MQTT broker
  1282. DWORD dwTick = GetTickCount();
  1283. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  1284. HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  1285. //int rc = MQTTAsync_create(&pMqttClient, pszServerUri, pszClientID, MQTTCLIENT_PERSISTENCE_NONE, NULL); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5));
  1286. //if (MQTTASYNC_SUCCESS != rc) {
  1287. // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl;
  1288. // delete connection;
  1289. // return nullptr;
  1290. //}
  1291. mqtt_client* pMqttClient = nullptr;
  1292. pMqttClient = mqtt_lease();
  1293. //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT " << endl;
  1294. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  1295. mqtt_topic_list* pTopicList = new std::list<string>;
  1296. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  1297. CcosLock* pLock = new CcosLock();
  1298. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  1299. pLock->Thread_Lock();
  1300. pLock->Thread_UnLock();
  1301. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  1302. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  1303. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  1304. std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
  1305. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  1306. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  1307. //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd;
  1308. std::get<CONNECTED_HANDLE_ID>(*connection) = hConneted;
  1309. pMqttClient->mqtt_conn_context = connection;
  1310. //设置MQTT连接参数
  1311. //MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1312. mqtt_set_port(pMqttClient, (char*)pszServerPort);
  1313. mqtt_set_host(pMqttClient, (char*)pszServer);
  1314. //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
  1315. mqtt_set_client_id(pMqttClient, (char*)pszClientID);
  1316. mqtt_set_user_name(pMqttClient, (char*)pszUserName);
  1317. mqtt_set_password(pMqttClient, (char*)pszPassword);
  1318. mqtt_set_clean_session(pMqttClient, 1);
  1319. //mqtt_set_version(pMqttClient, 5);
  1320. //mqtt_set_write_buf_size(pMqttClient, 8192);
  1321. //mqtt_set_read_buf_size(pMqttClient, 8192);
  1322. mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1323. mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1324. //if (pszUserName != nullptr && strlen(pszUserName) > 0)
  1325. // conn_opts.username = pszUserName;
  1326. //if (pszPassword != nullptr && strlen(pszPassword) > 0)
  1327. // conn_opts.password = pszPassword;
  1328. //设置回调函数
  1329. //MQTTAsync_setCallbacks(pMqttClient, connection, connlost, msgarrvd, delivered);
  1330. //if ((rc = MQTTAsync_setConnected(pMqttClient, connection, onReconnected)) != MQTTASYNC_SUCCESS)
  1331. //{
  1332. //}
  1333. //conn_opts.keepAliveInterval = 20;
  1334. //conn_opts.cleansession = 1;
  1335. //conn_opts.onSuccess = onConnect;
  1336. //conn_opts.onFailure = onConnectFailure;
  1337. //conn_opts.context = connection;
  1338. //conn_opts.automaticReconnect = 1;
  1339. //conn_opts.minRetryInterval = 2; //seconds
  1340. //conn_opts.maxRetryInterval = 365 * 24 * 60 * 60;
  1341. //连接Broker
  1342. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Connecting 1 to the MQTT server..." << pszClientID << endl;
  1343. /*if ((rc = MQTTAsync_connect(pMqttClient, &conn_opts)) != MQTTASYNC_SUCCESS)*/
  1344. int rc = 0;
  1345. dwTick = GetTickCount();
  1346. while (true)
  1347. {
  1348. int rc = mqtt_connect(pMqttClient);
  1349. if (rc != MQTT_SUCCESS_ERROR)
  1350. {
  1351. //默认10s内尝试
  1352. if (GetTickCount() - dwTick > 10000)
  1353. {
  1354. //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  1355. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server...【" << pszClientID << "】error : " << rc << endl;
  1356. //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode);
  1357. delete connection;
  1358. delete pfilter;
  1359. return nullptr;
  1360. }
  1361. else
  1362. {
  1363. //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  1364. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl;
  1365. Sleep(2000);
  1366. }
  1367. }
  1368. else
  1369. {
  1370. break;
  1371. }
  1372. }
  1373. //DWORD dwWaitTick = GetTickCount();
  1374. //DWORD ret = WaitForSingleObject(hConneted, 20000);
  1375. DWORD dwWaitTick = GetTickCount() - dwTick;
  1376. //if (ret == WAIT_OBJECT_0)
  1377. //{
  1378. //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ******[" << GetTickCount()-dwTick << "ms]*** wait:["<< dwWaitTick <<"ms]*** TID [" << GetCurrentThreadId() << std::endl;
  1379. //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  1380. return connection;
  1381. //}
  1382. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
  1383. //delete connection;
  1384. //delete pfilter;
  1385. //return nullptr;
  1386. }
  1387. //创建额外连接,需要提供回调函数
  1388. LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg,
  1389. DWORD dwOpenTimeout, bool async)
  1390. {
  1391. //Logger* pDevLog = GetLogHandle();
  1392. //std::cout << CurrentDateTime() << "======= MQTT NewConnection ============" << pszClientID << std::endl;
  1393. DWORD dwTick = GetTickCount();
  1394. //PRINTA_INFO(pDevLog, "=======Driver Module Init ============");
  1395. //连接MQTT broker
  1396. // 开启接受通知 线程
  1397. //mqtt_client* pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID, mqtt::create_options(MQTTVERSION_5));
  1398. //ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  1399. //mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
  1400. BUSC::MessageReceiver* pMqttClient = nullptr;
  1401. pMqttClient = new BUSC::MessageReceiver(BUSID::HashFrom(eSTR::DString(string(pszClientID))));
  1402. //MQTTClient_createOptions cropts = MQTTClient_createOptions_initializer;
  1403. //cropts.MQTTVersion = MQTTVERSION_5;
  1404. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  1405. HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  1406. //int rc = MQTTAsync_create(&pMqttClient, SERVER_ADDRESS.c_str(), pszClientID, MQTTCLIENT_PERSISTENCE_NONE,
  1407. // connection); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5));
  1408. //if (MQTTASYNC_SUCCESS != rc) {
  1409. // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl;
  1410. // delete connection;
  1411. // return nullptr;
  1412. //}
  1413. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  1414. //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT 2" << endl;
  1415. mqtt_topic_list* pTopicList = new std::list<string>;
  1416. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  1417. CcosLock* pLock = new CcosLock();
  1418. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  1419. pLock->Thread_Lock();
  1420. pLock->Thread_UnLock();
  1421. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  1422. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  1423. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  1424. std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
  1425. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  1426. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  1427. //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd;
  1428. std::get<CONNECTED_HANDLE_ID>(*connection) = hConneted;
  1429. //pMqttClient->mqtt_conn_context = connection;
  1430. //设置MQTT连接参数
  1431. //MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
  1432. //conn_opts.keepAliveInterval = 20;
  1433. //conn_opts.cleansession = 1;
  1434. //conn_opts.MQTTVersion = 5;
  1435. //设置回调函数
  1436. /*
  1437. mqtt_set_port(pMqttClient, (char*)"1883");
  1438. mqtt_set_host(pMqttClient, (char*)SERVER_ADDRESS.c_str());
  1439. //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
  1440. mqtt_set_client_id(pMqttClient, (char*)pszClientID);
  1441. //mqtt_set_user_name(client, user_name);
  1442. // mqtt_set_password(client, password);
  1443. mqtt_set_clean_session(pMqttClient, 1);
  1444. //mqtt_set_write_buf_size(pMqttClient, 8192);
  1445. //mqtt_set_read_buf_size(pMqttClient, 8192);
  1446. mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1447. mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1448. //mqtt_set_version(pMqttClient, 5);
  1449. */
  1450. //连接Broker
  1451. //std::cout << CurrentDateTime() << " TID ["<< GetCurrentThreadId() << "] Connecting 2 to the MQTT server..." << pszClientID << endl;
  1452. int rc = 0;
  1453. dwTick = GetTickCount();
  1454. while (true)
  1455. {
  1456. rc = mqtt_connect(pMqttClient);
  1457. if (rc != MQTT_SUCCESS_ERROR)
  1458. {
  1459. if (GetTickCount() - dwTick > dwOpenTimeout)
  1460. {
  1461. //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  1462. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
  1463. //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode);
  1464. delete connection;
  1465. delete pfilter;
  1466. return nullptr;
  1467. }
  1468. else
  1469. {
  1470. //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  1471. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
  1472. Sleep(2000);
  1473. }
  1474. }
  1475. else
  1476. {
  1477. break;
  1478. }
  1479. }
  1480. //dwWaitTick = GetTickCount();
  1481. //DWORD ret = WaitForSingleObject(hConneted, dwOpenTimeout);
  1482. DWORD dwWaitTick = GetTickCount() - dwTick;
  1483. //if (ret == WAIT_OBJECT_0)
  1484. //{
  1485. //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  1486. //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ***[" << GetTickCount()-dwTick << "ms]*** wait:[" << dwWaitTick << "ms]*** TID [" << GetCurrentThreadId() << std::endl;
  1487. return connection;
  1488. //}
  1489. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
  1490. //delete connection;
  1491. //delete pfilter;
  1492. //return nullptr;
  1493. }
  1494. //关闭并释放连接
  1495. LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
  1496. {
  1497. if (hConnection == nullptr)
  1498. {
  1499. return;
  1500. }
  1501. //mLog::FINFO(" Close Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  1502. //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
  1503. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1504. if (pconn != nullptr) {
  1505. //pconn->disconnect();
  1506. //MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  1507. //disc_opts.onSuccess = onDisconnect;
  1508. //disc_opts.onFailure = onDisconnectFailure;
  1509. mqtt_disconnect(pconn);
  1510. //delete pconn;
  1511. }
  1512. //Sleep(300);
  1513. //Sleep(300);
  1514. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  1515. std::cout << "Free Connection [" << (UINT64)hConnection << "] filter [" << (UINT64)pfilter << "] .." << endl;
  1516. if (pfilter != nullptr)
  1517. delete pfilter;
  1518. //未知问题导致 释放出现异常,先注释调,后面加上,避免内存泄漏
  1519. delete hConnection;
  1520. mqtt_release(pconn);
  1521. }
  1522. //主动订阅主题
  1523. LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
  1524. {
  1525. if (hConnection == nullptr)
  1526. {
  1527. return 0;
  1528. }
  1529. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1530. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  1531. //if (!pMqttClient->is_connected()) {
  1532. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1533. // return 0;
  1534. //}
  1535. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  1536. //MQTTProperties prop = MQTTProperties_initializer;
  1537. //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1538. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  1539. pLock->Thread_Lock();
  1540. //pMqttClient->subscribe(pszTopic, 1, opts);
  1541. if (pTopicList->size() < 1)
  1542. {
  1543. int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  1544. //mLog::FINFO("mqtt {$} Subscribe First {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  1545. //std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe First " << pszTopic << endl;
  1546. }
  1547. else
  1548. {
  1549. int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  1550. //mLog::FINFO("mqtt {$} Subscribe ReUse {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  1551. //std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe ReUse " << pszTopic << endl;
  1552. }
  1553. pTopicList->push_back(pszTopic);
  1554. pLock->Thread_UnLock();
  1555. return 0;
  1556. }
  1557. //主题订阅取消
  1558. LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  1559. {
  1560. if (hConnection == nullptr)
  1561. {
  1562. return 0;
  1563. }
  1564. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1565. //if (!pMqttClient->is_connected()) {
  1566. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1567. // return 0;
  1568. //}
  1569. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  1570. //auto ret = pMqttClient->unsubscribe(pszTopic);
  1571. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  1572. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  1573. pLock->Thread_Lock();
  1574. //MQTTAsync_responseOptions resp;
  1575. pTopicList->remove(pszTopic);
  1576. int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
  1577. //mLog::FINFO("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  1578. pLock->Thread_UnLock();
  1579. return 2;
  1580. }
  1581. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  1582. LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName)
  1583. {
  1584. char pszClientID[256];
  1585. sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
  1586. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
  1587. });
  1588. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
  1589. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  1590. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  1591. const char* pLoad = pCmd->encode();
  1592. mqtt_message_t msg;
  1593. memset(&msg, 0, sizeof(msg));
  1594. msg.payload = (void*)pLoad;
  1595. msg.qos = MQTT_QOS;
  1596. int rc = mqtt_publish(pConn, pszTopic, &msg );
  1597. //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  1598. //mLog::FINFO("CLT {$} Publish to {$} result {$}", pszClientID, pszTopic, rc);
  1599. CloseConnection(connObj);
  1600. return 0;
  1601. }
  1602. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  1603. LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection)
  1604. {
  1605. if (hConnection == nullptr)
  1606. {
  1607. //mLog::FERROR("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
  1608. //std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  1609. return 0;
  1610. }
  1611. //mLog::FINFO("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode() );
  1612. //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  1613. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1614. //if (!pMqttClient->is_connected()) {
  1615. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1616. // return 0;
  1617. //}
  1618. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  1619. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  1620. pLock->Thread_Lock();
  1621. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
  1622. //std::cout << "try publish " << pAction->encode() << endl;
  1623. //pMqttClient->publish(pszTopic, pAction->encode());
  1624. //pConn->publish(pszTopic, pCmd->encode());
  1625. //std::cout << "try publish " << pAction->encode() << endl;
  1626. const char* pLoad = pAction->encode();
  1627. int len = strlen(pLoad);
  1628. //MQTTAsync_responseOptions resp;
  1629. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  1630. mqtt_message_t msg;
  1631. memset(&msg, 0, sizeof(msg));
  1632. msg.payload = (void*)pLoad;
  1633. msg.qos = MQTT_QOS;
  1634. msg.payloadlen = len;
  1635. int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  1636. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  1637. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
  1638. if (rc < 0)
  1639. {
  1640. //mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  1641. //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
  1642. }
  1643. //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
  1644. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
  1645. pLock->Thread_UnLock();
  1646. return 2;
  1647. }
  1648. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  1649. LOGICDEVICE_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection)
  1650. {
  1651. if (hConnection == nullptr)
  1652. {
  1653. //mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, pAction, pContext->encode());
  1654. //std::cout << CurrentDateTime() << "Who ????? " << "Publish2 to " << pszTopic << " Action Body: " << pAction << endl; //"\n context : " << pContext->encode() <<
  1655. return 0;
  1656. }
  1657. //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action 2 to " << pszTopic << " Action Body: " << endl; //<< pAction << "\n context : " << pContext->encode()
  1658. //mLog::FINFO("{$} Publish Action 2 to {$} Action {$} Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction , pContext->encode());
  1659. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1660. //if (!pMqttClient->is_connected()) {
  1661. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1662. // return 0;
  1663. //}
  1664. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  1665. pLock->Thread_Lock();
  1666. ResDataObject req;
  1667. //TODO 这里应该添加 组包逻辑,待完善
  1668. //PacketAnalizer::MakeActionRequest(req, );
  1669. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  1670. const char* pLoad = req.encode();
  1671. int len = strlen(pLoad);
  1672. mqtt_message_t msg;
  1673. memset(&msg, 0, sizeof(msg));
  1674. msg.payload = (void*)pLoad;
  1675. msg.qos = MQTT_QOS;
  1676. int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  1677. pLock->Thread_UnLock();
  1678. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  1679. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  1680. if (rc < 0)
  1681. {
  1682. //mLog::FERROR("{$} PublishAction failed {$} body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, rc, pLoad);
  1683. //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish 2 to [" << pszTopic << "] Send result " << rc << endl;
  1684. }
  1685. return 2;
  1686. }
  1687. /*
  1688. /// <summary>
  1689. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  1690. /// 超时没收到应答返回失败,
  1691. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  1692. /// </summary>
  1693. /// <param name="pAction">要发送的命令Action名</param>
  1694. /// <param name="pContext">命令携带的参数</param>
  1695. /// <param name="pszTopic">要发送的目标topic</param>
  1696. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  1697. /// <param name="resObj">应答返回的参数结果</param>
  1698. /// <param name="dwWaitTime">等待超时时间</param>
  1699. /// <param name="hConnection">复用的MQTT链接句柄</param>
  1700. /// <param name="onmsg">复用的链接的消息处理函数</param>
  1701. /// <returns>成功返回2,其他返回错误码</returns>
  1702. LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  1703. ResDataObject& resObj, DWORD dwWaitTime)
  1704. {
  1705. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  1706. if (pszRespTopic != nullptr)
  1707. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  1708. if (hConnection == nullptr)
  1709. {
  1710. return 0;
  1711. }
  1712. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1713. //if (!pMqttClient->is_connected()) {
  1714. // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  1715. // return 0;
  1716. //}
  1717. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  1718. //SubscribeTopic()
  1719. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
  1720. string topic = topicName;//msg->get_topic().c_str();
  1721. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  1722. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
  1723. //应答到了,处理
  1724. ResDataObject req;
  1725. req.decode((const char*)message->payload);
  1726. PacketAnalizer::GetPacketContext(&req, resObj);
  1727. SetEvent(hEvent);
  1728. //处理结束 将函数指针换回去
  1729. void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  1730. //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  1731. MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
  1732. }
  1733. else
  1734. {
  1735. //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  1736. (msgarrvd)(context, topicName, topicLen, message);
  1737. }
  1738. };
  1739. //接管回调
  1740. // pMqttClient->set_message_callback(action_func);
  1741. MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
  1742. //pMqttClient->subscribe(pszRespTopic, 1);
  1743. MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  1744. MQTTProperties prop = MQTTProperties_initializer;
  1745. std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
  1746. //pMqttClient->subscribe(pszTopic, 1, opts);
  1747. MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
  1748. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  1749. //pMqttClient->publish(pszTopic, req.encode());
  1750. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  1751. MQTTClient_message* m = NULL;
  1752. MQTTClient_deliveryToken dt;
  1753. MQTTProperty property;
  1754. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  1755. property.value.data.data = "test user property";
  1756. property.value.data.len = (int)strlen(property.value.data.data);
  1757. property.value.value.data = "test user property value";
  1758. property.value.value.len = (int)strlen(property.value.value.data);
  1759. MQTTProperties properties = MQTTProperties_initializer;
  1760. MQTTProperties_add(&properties, &property);
  1761. //pConn->publish(pszTopic, pCmd->encode());
  1762. const char* pLoad = req.encode();
  1763. MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
  1764. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
  1765. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  1766. if (ret == WAIT_OBJECT_0) {
  1767. //等到应答了
  1768. return 2;
  1769. }
  1770. return 0;
  1771. }
  1772. */
  1773. /// <summary>
  1774. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  1775. /// </summary>
  1776. /// <param name="hConnection"></param>
  1777. /// <param name="pAction"></param>
  1778. /// <param name="pContext"></param>
  1779. /// <param name="pszTopic"></param>
  1780. /// <param name="resObj"></param>
  1781. /// <param name="hEvent"></param>
  1782. /// <param name="dwWaitTime"></param>
  1783. /// <returns></returns>
  1784. LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
  1785. const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
  1786. {
  1787. Sleep(1);
  1788. //mLog::FINFO("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
  1789. //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  1790. if (hConnection == nullptr)
  1791. {
  1792. return 0;
  1793. }
  1794. DWORD dwTick = GetTickCount();
  1795. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1796. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  1797. ResDataObject resContext;
  1798. string strResObject;
  1799. char* pszPad = 0;
  1800. char* pszPad2 = 0;
  1801. auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2](ResDataObject* rsp) -> bool {
  1802. //mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
  1803. //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
  1804. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  1805. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  1806. {
  1807. //mLog::FINFO("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  1808. //std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
  1809. resObj = *rsp;
  1810. return true;
  1811. }
  1812. //mLog::FINFO("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  1813. //std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
  1814. return false;
  1815. };
  1816. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  1817. pLock->Thread_Lock();
  1818. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  1819. std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
  1820. std::get<FILTER_RES_OBJ_ID>(*pfilter) = &resContext;
  1821. std::get<FILTER_FUNC_ID>(*pfilter) = func;
  1822. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  1823. const char* pLoad = req.encode();
  1824. mqtt_message_t msg;
  1825. memset(&msg, 0, sizeof(msg));
  1826. msg.payload = (void*)pLoad;
  1827. msg.qos = MQTT_QOS;
  1828. dwTick = GetTickCount();
  1829. int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  1830. pLock->Thread_UnLock();
  1831. dwTick = GetTickCount() - dwTick;
  1832. //mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
  1833. //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
  1834. dwTick = GetTickCount();
  1835. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  1836. if (ret == WAIT_OBJECT_0) {
  1837. //等到应答了
  1838. dwTick = GetTickCount() - dwTick;
  1839. //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
  1840. //mLog::FINFO("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
  1841. std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  1842. CloseHandle(hEvent);
  1843. return 2;
  1844. }
  1845. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  1846. //resObj.decode(strResObject.c_str());
  1847. //mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
  1848. //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
  1849. std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  1850. CloseHandle(hEvent);
  1851. return 0;
  1852. }
  1853. /// <summary>
  1854. /// 新建MQTT连接发送Ation并等待应答
  1855. /// </summary>
  1856. /// <param name="pAction"></param>
  1857. /// <param name="pContext"></param>
  1858. /// <param name="pszTopic"></param>
  1859. /// <param name="pszRespTopic"></param>
  1860. /// <param name="resObj"></param>
  1861. /// <param name="dwWaitTime"></param>
  1862. /// <param name="pszSenderName"></param>
  1863. /// <returns></returns>
  1864. LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  1865. DWORD dwWaitTime, const char* pszSenderName)
  1866. {
  1867. ResDataObject req;
  1868. if (pszRespTopic != nullptr)
  1869. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  1870. //临时创建连接并发送和接收应答
  1871. char pszClientID[256];
  1872. sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
  1873. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  1874. if (hEvent == NULL)
  1875. return 0;
  1876. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic, void* conn) {
  1877. //应答到了,处理
  1878. PacketAnalizer::GetPacketContext(req, resObj);
  1879. SetEvent(hEvent);
  1880. });
  1881. //发布消息,并等待应答
  1882. PublishAction(&req, pszTopic, connObj);
  1883. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  1884. if (ret == WAIT_OBJECT_0) {
  1885. //等到应答了
  1886. CloseConnection(connObj);
  1887. CloseHandle(hEvent);
  1888. return 2;
  1889. }
  1890. CloseConnection(connObj);
  1891. CloseHandle(hEvent);
  1892. return 0;
  1893. }
  1894. LOGICDEVICE_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection)
  1895. {
  1896. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  1897. return 0;
  1898. }