LogicDevice.cpp 107 KB


  1. // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include <cstring>
  4. #include <iostream>
  5. #include <chrono>
  6. #include "LogicDevice.h"
  7. #include <stddef.h> // 用于 offsetof
  8. #include "PacketAnalizer.h"
  9. #include "MessageInfo.h"
  10. #include "common_api.h"
  11. #include "LocalConfig.h"
  12. #include "Base64.h"
  13. #include "SystemLogger.hpp"
  14. #include "mqttclient.h"
  15. #include "LinuxEvent.h"
  16. //Log4CPP::Logger* ////mLog::gLogger = nullptr;
  17. void msgarrivd(void* client, message_data_t* message);
  18. std::string CurrentDateTime();
  19. //-------------Logic Device SysIF--------------------------
  20. LogicDeviceSysIF::LogicDeviceSysIF(void)
  21. {
  22. }
  23. LogicDeviceSysIF::~LogicDeviceSysIF(void)
  24. {
  25. }
  26. //init
  27. void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
  28. {
  29. m_pLogicDev = p;
  30. //p->SetSysLogicDevice(this);
  31. };
  32. LogicDevice* LogicDeviceSysIF::GetLogicDevice()
  33. {
  34. return m_pLogicDev;
  35. }
  36. //Command In and Out
  37. //notify from lower layer
  38. RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  39. {
  40. return RET_FAILED;
  41. };
  42. //notify to lower layer
  43. RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
  44. {
  45. if (m_pLogicDev)
  46. {
  47. return m_pLogicDev->CmdToLogicDev(pCmd);
  48. }
  49. return RET_NOSUPPORT;
  50. };
  51. const std::string SERVER_ADDRESS("127.0.0.1");
  52. /*
  53. string DEVICE_ID; //AS CLIENT_ID
  54. //const std::string CLIENT_ID("paho_cpp_async_subcribe");
  55. //const std::string TOPIC("hello");
  56. mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
  57. const int QOS = 1;
  58. const int N_RETRY_ATTEMPTS = 5;*/
  59. using namespace std;
  60. LogicDevice* g_pDPCDeviceObject = NULL;
  61. //string DEVICE_ID;
  62. //-------------Data Logic Device--------------------------
  63. LogicDevice::LogicDevice(void)
  64. {
  65. m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  66. uuid_t uuid;
  67. char uuid_str[37];
  68. uuid_generate_random(uuid);
  69. uuid_unparse(uuid, uuid_str);
  70. m_pDevInstance = new char[40];
  71. strncpy(m_pDevInstance, uuid_str, 40);
  72. m_pResErrorList = new ResDataObject();
  73. sem_init(&m_SemphRequest, 0, 0);
  74. sem_init(&m_SemphPublish, 0, 0);
  75. m_pDrvDPC = NULL;
  76. g_pDPCDeviceObject = this;
  77. m_pMqttConntion = nullptr;
  78. //m_strServer = "tcp://localhost:1883";
  79. m_strServer = SERVER_ADDRESS;// "192.168.2.225";
  80. m_strServerPort = "1883";
  81. m_bMqttUseSSL = false;
  82. m_strMqttUser = "";
  83. m_strMqttPassword = "";
  84. m_strEBusRoot = "";
  85. m_strCCOSDevicePath = "";
  86. m_pParent = nullptr;
  87. m_topicFilter = nullptr;
  88. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  89. m_pPacketSendingQue = new MsgQueue<ResDataObject>();
  90. int x = 0;
  91. for ( x = 0; x < sizeof(szPad); x++)
  92. szPad[x] = 'A' + x % 26;
  93. szPad[x-1] = 0;
  94. memset(szPad2, 0, sizeof(szPad2));
  95. m_dwLastPacket = GetTickCount();
  96. }
  97. LogicDevice::~LogicDevice(void)
  98. {
  99. delete []m_pDevInstance;
  100. delete m_pResErrorList;
  101. sem_destroy(&m_SemphRequest);
  102. sem_destroy(&m_SemphPublish);
  103. //m_EvtNotify = NULL;
  104. delete m_pPacketReceivedQue;
  105. delete m_pPacketSendingQue;
  106. }
  107. void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
  108. if (m_strClientID.length() > 0)
  109. {
  110. ////mLog::FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID);
  111. return;
  112. }
  113. ////mLog::FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot);
  114. if (pszEBusRoot[0] == '/') {
  115. m_strEBusRoot = pszEBusRoot + 1;
  116. }
  117. else {
  118. m_strEBusRoot = pszEBusRoot;
  119. }
  120. char szKeys[256];
  121. strcpy(szKeys, pszEBusRoot);
  122. char* pt = szKeys;
  123. while (*pt != 0)
  124. {
  125. if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
  126. *pt = '_';
  127. pt++;
  128. }
  129. m_strClientID = CCOS_CLIENT_ID_PREFIX;
  130. if (szKeys[0] == '_')
  131. m_strClientID += (szKeys + 1);
  132. else
  133. m_strClientID += szKeys;
  134. if (pszCCOSRoot != nullptr)
  135. {
  136. std::cout << "LogicDevice::SetClientRootID [ Set CCOS path is" << pszCCOSRoot << "]" << std::endl;
  137. ////mLog::FINFO("Set CCOS path: {$}", pszCCOSRoot);
  138. m_strCCOSDevicePath = pszCCOSRoot;
  139. int nPos = m_strCCOSDevicePath.find('/');
  140. if (nPos != string::npos)
  141. {
  142. //CCOS/
  143. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  144. //CCOS/DEVICE/
  145. if (nPos != string::npos)
  146. {
  147. m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos);
  148. //CCOS/DEVICE/Generator
  149. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  150. if (nPos != string::npos)
  151. {
  152. m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos);
  153. }
  154. }
  155. }
  156. ////mLog::FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath);
  157. }
  158. ////mLog::FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath);
  159. SetName(m_strClientID.c_str());
  160. OnSetClientID();
  161. }
  162. void LogicDevice::OnSetClientID() {
  163. }
  164. void LogicDevice::SubscribeSelf() {
  165. if (m_strDevicePath.length() > 0 && m_strDevicePath[0] != '/') {
  166. m_strDevicePath = "/" + m_strDevicePath;
  167. }
  168. ////mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
  169. std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
  170. if (m_strEBusRoot.length() > 0) {
  171. SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str());
  172. }
  173. if (m_strCCOSDevicePath.length() > 0)
  174. {
  175. ////mLog::FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath);
  176. SubscribeTopic(m_pMqttConntion, (m_strCCOSDevicePath + m_strDevicePath).c_str());
  177. //订阅
  178. }
  179. ////mLog::FINFO("AbstractPath [{$}] ", m_strAbstractPath);
  180. if (m_strAbstractPath.length() > 0)
  181. {
  182. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath).c_str());
  183. if (m_strDevicePath.length() > 0)
  184. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath + m_strDevicePath).c_str());
  185. }
  186. ////mLog::FINFO("CCOSRoot [{$}] ", m_strCCOSRoot);
  187. if (m_strCCOSRoot.length() > 0)
  188. {
  189. SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot).c_str());
  190. //订阅
  191. }
  192. SubscribeActions();
  193. }
  194. void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  195. {
  196. cout << "SubScribeTopic IN" << endl;
  197. if (bSubscribe)
  198. SubscribeTopic(m_pMqttConntion, pszTopic);
  199. else
  200. UnSubscribe(m_pMqttConntion, pszTopic);
  201. }
  202. void LogicDevice::NotifyDrvThread()
  203. {
  204. m_EvtNotify->SetEvent();
  205. }
  206. std::shared_ptr<LinuxEvent> LogicDevice::GetEvtHandle()
  207. {
  208. return m_EvtNotify;
  209. }
  210. //1. init part
  211. //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
  212. //{
  213. // m_pSysLogic = pLogic;
  214. //}
  215. //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
  216. //{
  217. // m_pLogger = pLogger;
  218. //}
  219. //
  220. //Logger *LogicDevice::GetLogHandle()
  221. //{
  222. // return m_pLogger;
  223. //}
  224. void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
  225. {
  226. m_pDrvDPC = pDPC;
  227. }
  228. DriverDPC *LogicDevice::GetDrvDPC()
  229. {
  230. return m_pDrvDPC;
  231. }
  232. RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
  233. {
  234. if (szEbusDevPath == nullptr)
  235. return RET_FAILED;
  236. for (auto dev : m_subDevices) {
  237. if (dev->GetRootPath() == szEbusDevPath)
  238. return RET_SUCCEED;
  239. }
  240. m_subDevices.push_back(pChild);
  241. return RET_SUCCEED;
  242. }
  243. RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
  244. {
  245. if (szCcosDevPath == nullptr)
  246. return RET_FAILED;
  247. for (auto dev : m_subCcosDevices) {
  248. if (dev->GetCcosRootPath() == szCcosDevPath)
  249. return RET_SUCCEED;
  250. }
  251. m_subCcosDevices.push_back(pChild);
  252. return RET_SUCCEED;
  253. }
  254. LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
  255. {
  256. for (auto dev : m_subDevices) {
  257. if (dev->GetRootPath() == szEbusDevPath)
  258. return dev;
  259. }
  260. return nullptr;
  261. }
  262. LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
  263. {
  264. for (auto dev : m_subCcosDevices) {
  265. if (dev->GetCcosRootPath() == szCcosDevPath)
  266. return dev;
  267. }
  268. return nullptr;
  269. }
  270. void SYSTEM_CALL LogicDevice::CompleteInit()
  271. {
  272. //if (//mLog::gLogger == nullptr)
  273. //{
  274. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  275. // LogHost = ((string)getLogRootpath()).c_str();
  276. // if (LogHost.length() <= 1)
  277. // {
  278. // char szName[256];
  279. // sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
  280. // LogHost = szName;
  281. // }
  282. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  283. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  284. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  285. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  286. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  287. // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
  288. //}
  289. //else
  290. //{
  291. // string strRoot = ((string)getLogRootpath()).c_str();
  292. // if (strRoot.length() > 1 && strRoot != LogHost)
  293. // {
  294. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  295. // LogHost = strRoot;
  296. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  297. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  298. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  299. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  300. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  301. // }
  302. //}
  303. //string version;
  304. //if (GetVersion(version, hMyModule))
  305. // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
  306. //else
  307. ////mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
  308. ////mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
  309. if (m_strClientID.length() <= 0)
  310. {
  311. ////mLog::FWARN("No Client name ......" );
  312. std::cout << "No Client name ......." << endl;
  313. return;
  314. }
  315. std::cout << "CompleteInit->NewConnection ......." << endl;
  316. m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
  317. [this](ResDataObject* req, const char* topic, void* conn) {
  318. //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
  319. //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
  320. // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
  321. // 消息处理如果耗时较长,则需要开线程来处理
  322. if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
  323. strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0)
  324. {
  325. //主题以ebus或者ccos开头,给我发的消息,进行处理
  326. ProcessSubscribeRequest(req);
  327. }
  328. else
  329. {
  330. //我主动订阅的外部模块的消息
  331. ProcessSubscribeMsg(req);
  332. }
  333. //CmdToLogicDev(req);
  334. });
  335. if (m_pMqttConntion != nullptr)
  336. {
  337. StartThread(); //启动Request线程
  338. SubscribeSelf();
  339. //DWORD dwThreadID = 0;
  340. //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID);
  341. //////mLog::FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID);
  342. }
  343. }
  344. RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
  345. PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
  346. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
  347. switch (type) {
  348. case PACKET_TYPE_REQ:
  349. PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction);
  350. ProcessRequest(req, cmd);//请求
  351. break;
  352. case PACKET_TYPE_RES:
  353. ProcessResponse(req, cmd);//应答
  354. break;
  355. case PACKET_TYPE_NOTIFY:
  356. ProcessNotify(req, cmd);
  357. //通知
  358. break;
  359. }
  360. return RET_FAILED;
  361. }
  362. RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
  363. {
  364. ProcessSubscribeRequest(pCmd);
  365. return RET_SUCCEED;
  366. }
  367. RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
  368. {
  369. //Open命令
  370. if (cmd == PACKET_CMD_OPEN) {
  371. /* {
  372. "IDX": "40",
  373. "TYPE" : "0",
  374. "CMD" : "0",
  375. "HANDLE" :
  376. {
  377. "ROUTE": "1",
  378. "FLAGS" : "63",
  379. "LANG" : "en-US",
  380. "HANDLEID" : "0",
  381. "OWNERID" :
  382. {
  383. "EBUSID": "ImageSave",
  384. "MACHINEID" : "DESKTOP-FVD53H8",
  385. "PROCID" : "35408",
  386. "ADDR" : "2631366957696"
  387. },
  388. "DEVID":
  389. {
  390. "EBUSID": "ccosChannel",
  391. "MACHINEID" : "",
  392. "PROCID" : "0",
  393. "ADDR" : "0"
  394. }
  395. },
  396. "KEY": "\/ccosChannel"
  397. ResDataObject resRes, resResponse;
  398. ResDataObject resTopic;
  399. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  400. if (cmd == PACKET_CMD_OPEN )
  401. {
  402. //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
  403. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  404. return RET_SUCCEED;
  405. }
  406. if (cmd == PACKET_CMD_CLOSE)
  407. {
  408. //CLOSE 客户端主动断开
  409. }
  410. } */
  411. PacketArrived(pCmd);
  412. return RET_SUCCEED;
  413. }
  414. else if (cmd == PACKET_CMD_CLOSE)
  415. {
  416. ResDataObject resp;
  417. InnerOpenClose(*pCmd, resp, false);
  418. }
  419. else {
  420. PacketArrived(pCmd);
  421. }
  422. return RET_SUCCEED;
  423. }
  424. RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
  425. {
  426. return RET_SUCCEED;
  427. }
  428. RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
  429. {
  430. PacketArrived(pCmd);
  431. return RET_SUCCEED;
  432. }
  433. void LogicDevice::PacketArrived(ResDataObject* pRequest)
  434. {
  435. //m_pPacketReceivedQue->Lock();
  436. ////mLog::FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  437. //if (Thread_Lock(5000) != WAIT_OBJECT_0)
  438. //{
  439. // ////mLog::FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  440. // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  441. // return ;
  442. //}
  443. m_pPacketReceivedQue->InQueue(*pRequest);
  444. ////mLog::FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  445. //SetEvent(m_EvtNotify);//notify to user
  446. long nLast = 0;
  447. int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程
  448. if (released <= 0)
  449. {
  450. ////mLog::FERROR("PacketArrived ReleaseSemaphore failed {$} ", GetLastError());
  451. }
  452. //Thread_UnLock();
  453. //m_pPacketReceivedQue->UnLock();
  454. }
  455. bool LogicDevice::OnStartThread()
  456. {
  457. return true;
  458. }
  459. bool LogicDevice::OnEndThread()
  460. {
  461. return true;
  462. }
  463. //单一发送线程,暂时未用
  464. DWORD LogicDevice::Thread_Publish_Thread(void* pPara)
  465. {
  466. INT ret = 0;
  467. int waitevent = 0;
  468. LogicDevice* handle = (LogicDevice*)pPara;
  469. //prev work usleep(30000);
  470. ////mLog::FINFO("Thread Start [{$}]", handle->m_strClientID);
  471. while (!handle->m_ExitFlag->Wait(1))
  472. {
  473. //do work
  474. ResDataObject resSend;
  475. DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100);
  476. if (wait != WAIT_OBJECT_0)
  477. continue;
  478. bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend);
  479. if (bHasPacket)
  480. {
  481. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend);
  482. PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend);
  483. ////mLog::FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd);
  484. if (PACKET_CMD_NONE == cmd)
  485. {
  486. ////mLog::FDEBUG(" [{$}] Publish what packet {$} ", handle->m_strClientID, resSend.encode());
  487. }
  488. if (type == PACKET_TYPE_NOTIFY)
  489. {
  490. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  491. PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle);
  492. ////mLog::FINFO("Notify Transaction: {$}", handle->m_strCurTransaction);
  493. PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction);
  494. PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion);
  495. ////mLog::FDEBUG("Notify: {$}", resSend.encode());
  496. //printf("--> %s \n", pCmd->encode());
  497. PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion);
  498. PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion);
  499. delete pHandle;
  500. }
  501. else
  502. {
  503. ResDataObject resTopic;
  504. PacketAnalizer::GetPacketTopic(&resSend, resTopic);
  505. //PacketAnalizer::GetPacketTopicGetPacketTopic
  506. //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic);
  507. ////mLog::FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic);
  508. PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion);
  509. }
  510. }
  511. }
  512. return 0;
  513. }
  514. RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  515. {
  516. cout << "LogicDevice::DevOpen - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  517. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  518. ResDataObject req, reqParam;
  519. reqParam = pszGroup;
  520. cout << "LogicDevice::DevOpen - Creating OPEN request. Command: PACKET_CMD_OPEN" << endl;
  521. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam);
  522. RET_STATUS ret = InnerOpenClose(req, resRespons, true);
  523. cout << "LogicDevice::DevOpen - InnerOpenClose returned: " << ret << endl;
  524. return ret;
  525. }
  526. RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  527. {
  528. cout << "LogicDevice::DevClose - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  529. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  530. ResDataObject req, reqParam;
  531. reqParam = pszGroup;
  532. cout << "LogicDevice::DevClose - Creating CLOSE request. Command: PACKET_CMD_CLOSE" << endl;
  533. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam);
  534. RET_STATUS ret = InnerOpenClose(req, resRespons, false);
  535. cout << "LogicDevice::DevClose - InnerOpenClose returned: " << ret << endl;
  536. return ret;
  537. }
  538. /// <summary>
  539. /// Get device property
  540. /// </summary>
  541. RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons)
  542. {
  543. cout << "LogicDevice::DevGet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  544. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr") << endl;
  545. ResDataObject req, reqParam;
  546. cout << "LogicDevice::DevGet - Creating GET request. Command: PACKET_CMD_GET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  547. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam);
  548. RET_STATUS ret = Request(&req, &resRespons);
  549. cout << "LogicDevice::DevGet - Request returned: " << ret << endl;
  550. return ret;
  551. }
  552. /// <summary>
  553. /// Set device property
  554. /// </summary>
  555. RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons)
  556. {
  557. cout << "LogicDevice::DevSet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  558. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  559. << ", pszValueSet: " << (pszValueSet ? pszValueSet : "nullptr") << endl;
  560. ResDataObject req, reqParam;
  561. if (pszValueSet != nullptr && pszValueSet[0] != 0) {
  562. if (pszValueSet[0] == '{') {
  563. cout << "LogicDevice::DevSet - Decoding JSON value for property" << endl;
  564. reqParam.decode(pszValueSet);
  565. }
  566. else {
  567. cout << "LogicDevice::DevSet - Using raw value for property" << endl;
  568. reqParam = pszValueSet;
  569. }
  570. }
  571. else {
  572. cout << "LogicDevice::DevSet - No value provided for property" << endl;
  573. }
  574. cout << "LogicDevice::DevSet - Creating SET request. Command: PACKET_CMD_SET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  575. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam);
  576. RET_STATUS ret = Request(&req, &resRespons);
  577. cout << "LogicDevice::DevSet - Request returned: " << ret << endl;
  578. return ret;
  579. }
  580. /// <summary>
  581. /// Update device property
  582. /// </summary>
  583. RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons)
  584. {
  585. cout << "LogicDevice::DevUpdate - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  586. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  587. << ", pszValueUpdate: " << (pszValueUpdate ? pszValueUpdate : "nullptr") << endl;
  588. ResDataObject req, reqParam;
  589. if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) {
  590. if (pszValueUpdate[0] == '{') {
  591. cout << "LogicDevice::DevUpdate - Decoding JSON value for update" << endl;
  592. reqParam.decode(pszValueUpdate);
  593. }
  594. else {
  595. cout << "LogicDevice::DevUpdate - Using raw value for update" << endl;
  596. reqParam = pszValueUpdate;
  597. }
  598. }
  599. else {
  600. cout << "LogicDevice::DevUpdate - No update value provided" << endl;
  601. }
  602. cout << "LogicDevice::DevUpdate - Creating UPDATE request. Command: PACKET_CMD_UPDATE, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  603. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam);
  604. RET_STATUS ret = Request(&req, &resRespons);
  605. cout << "LogicDevice::DevUpdate - Request returned: " << ret << endl;
  606. return ret;
  607. }
  608. /// <summary>
  609. /// Add device property
  610. /// </summary>
  611. RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons)
  612. {
  613. cout << "LogicDevice::DevAdd - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  614. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  615. << ", pszValueAdd: " << (pszValueAdd ? pszValueAdd : "nullptr") << endl;
  616. ResDataObject req, reqParam;
  617. if (pszValueAdd != nullptr && pszValueAdd[0] != 0) {
  618. if (pszValueAdd[0] == '{') {
  619. cout << "LogicDevice::DevAdd - Decoding JSON value for addition" << endl;
  620. reqParam.decode(pszValueAdd);
  621. }
  622. else {
  623. cout << "LogicDevice::DevAdd - Using raw value for addition" << endl;
  624. reqParam = pszValueAdd;
  625. }
  626. }
  627. else {
  628. cout << "LogicDevice::DevAdd - No value provided for addition" << endl;
  629. }
  630. cout << "LogicDevice::DevAdd - Creating ADD request. Command: PACKET_CMD_ADD, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  631. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam);
  632. RET_STATUS ret = Request(&req, &resRespons);
  633. cout << "LogicDevice::DevAdd - Request returned: " << ret << endl;
  634. return ret;
  635. }
  636. /// <summary>
  637. /// Delete device property
  638. /// </summary>
  639. RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons)
  640. {
  641. cout << "LogicDevice::DevDel - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  642. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  643. << ", pszValueDel: " << (pszValueDel ? pszValueDel : "nullptr") << endl;
  644. ResDataObject req, reqParam;
  645. if (pszValueDel != nullptr && pszValueDel[0] != 0) {
  646. if (pszValueDel[0] == '{') {
  647. cout << "LogicDevice::DevDel - Decoding JSON value for deletion" << endl;
  648. reqParam.decode(pszValueDel);
  649. }
  650. else {
  651. cout << "LogicDevice::DevDel - Using raw value for deletion" << endl;
  652. reqParam = pszValueDel;
  653. }
  654. }
  655. else {
  656. cout << "LogicDevice::DevDel - No value provided for deletion" << endl;
  657. }
  658. cout << "LogicDevice::DevDel - Creating DEL request. Command: PACKET_CMD_DEL, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  659. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam);
  660. RET_STATUS ret = Request(&req, &resRespons);
  661. cout << "LogicDevice::DevDel - Request returned: " << ret << endl;
  662. return ret;
  663. }
  664. /// <summary>
  665. /// Execute device action
  666. /// </summary>
  667. RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons)
  668. {
  669. cout << "LogicDevice::DevAction - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  670. << ", pszActionName: " << (pszActionName ? pszActionName : "nullptr")
  671. << ", pszParams: " << (pszParams ? pszParams : "nullptr") << endl;
  672. ResDataObject req, reqParam;
  673. if (pszParams != nullptr && pszParams[0] != 0) {
  674. if (pszParams[0] == '{') {
  675. cout << "LogicDevice::DevAction - Decoding JSON parameters for action" << endl;
  676. reqParam.decode(pszParams);
  677. }
  678. else {
  679. cout << "LogicDevice::DevAction - Using raw parameters for action" << endl;
  680. reqParam = pszParams;
  681. }
  682. }
  683. else {
  684. cout << "LogicDevice::DevAction - No parameters provided for action" << endl;
  685. }
  686. cout << "LogicDevice::DevAction - Creating EXE request. Command: PACKET_CMD_EXE, Action: " << (pszActionName ? pszActionName : "nullptr") << endl;
  687. PacketAnalizer::MakeRequest(req, pszActionName, PACKET_CMD_EXE, &reqParam);
  688. ResDataObject resResult;
  689. RET_STATUS ret = Request(&req, &resResult);
  690. cout << "LogicDevice::DevAction - Request returned: " << ret << endl;
  691. PacketAnalizer::GetPacketContext(&resResult, resRespons);
  692. cout << "LogicDevice::DevAction - Extracted packet context to response" << endl;
  693. return ret;
  694. }
  695. /// <summary>
  696. /// Send message to device
  697. /// </summary>
  698. RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons)
  699. {
  700. cout << "LogicDevice::DevMessage - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  701. << ", pszTopic: " << (pszTopic ? pszTopic : "nullptr")
  702. << ", pszMessageValue: " << (pszMessageValue ? pszMessageValue : "nullptr") << endl;
  703. ResDataObject req, reqParam;
  704. if (pszMessageValue != nullptr && pszMessageValue[0] != 0) {
  705. if (pszMessageValue[0] == '{') {
  706. cout << "LogicDevice::DevMessage - Decoding JSON message value" << endl;
  707. reqParam.decode(pszMessageValue);
  708. }
  709. else {
  710. cout << "LogicDevice::DevMessage - Using raw message value" << endl;
  711. reqParam = pszMessageValue;
  712. }
  713. }
  714. else {
  715. cout << "LogicDevice::DevMessage - No message value provided" << endl;
  716. }
  717. cout << "LogicDevice::DevMessage - Creating MSG request. Command: PACKET_CMD_MSG, Topic: " << (pszTopic ? pszTopic : "nullptr") << endl;
  718. PacketAnalizer::MakeRequest(req, pszTopic, PACKET_CMD_MSG, &reqParam);
  719. RET_STATUS ret = Request(&req, &resRespons);
  720. cout << "LogicDevice::DevMessage - Request returned: " << ret << endl;
  721. return ret;
  722. }
  723. RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose)
  724. {
  725. if (openOrClose)
  726. {
  727. ResDataObject resContext;
  728. GetDeviceResource(&resp);
  729. LogicDevice::GetDeviceResource(&resp);
  730. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  731. if (PacketAnalizer::GetPacketContext(&req, resContext))
  732. {
  733. //如果有上线参数
  734. if (resContext.GetFirstOf("Online") >= 0)
  735. {
  736. ////mLog::FINFO("Got Online Request {$}", resContext.encode());
  737. int idx = resContext.GetFirstOf("Online");
  738. //如果有上线参数
  739. if (idx >= 0)
  740. {
  741. do
  742. {
  743. string wsName = resContext[idx].encode();
  744. ////mLog::FINFO("SubscribeGroupActions [{$}] ", wsName);
  745. SubscribeGroupActions(wsName, true);
  746. idx = resContext.GetNextOf("Online", idx);
  747. } while (idx >= 0);
  748. }
  749. resp.update("Online", m_rsOnlineGroup);
  750. }
  751. }
  752. }
  753. else
  754. {
  755. ResDataObject context;
  756. if (PacketAnalizer::GetPacketContext(&req, context))
  757. {
  758. int idx = context.GetFirstOf("Offline");
  759. //如果有上线参数
  760. if (idx >= 0)
  761. {
  762. do
  763. {
  764. string wsName = context[idx].encode();
  765. SubscribeGroupActions(wsName, false);
  766. idx = context.GetNextOf("Offline", idx);
  767. } while (idx >= 0);
  768. }
  769. resp.update("Online", m_rsOnlineGroup);
  770. }
  771. }
  772. return RET_SUCCEED;
  773. }
  774. bool LogicDevice::Exec(void)
  775. {
  776. struct timespec ts;
  777. clock_gettime(CLOCK_REALTIME, &ts);
  778. ts.tv_sec += 3;
  779. // 等待退出信号(3秒超时)
  780. DWORD dwRet = 0;
  781. dwRet = m_ExitFlag->Wait(3);
  782. if (dwRet == WAIT_OBJECT_0)
  783. {
  784. return false;
  785. }
  786. // 等待请求信号(3秒超时)
  787. int ret = sem_timedwait(&m_SemphRequest, &ts);
  788. if (ret == 0) {
  789. ////mLog::FDEBUG("[{$}] Got Packet Event ", m_strClientID);
  790. ResDataObject req;
  791. bool got = true;
  792. do {
  793. got = m_pPacketReceivedQue->DeQueue(req);
  794. if (got) {
  795. m_dwLastPacket = GetTickCount();
  796. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
  797. std::string keystr = PacketAnalizer::GetPacketKey(&req);
  798. ////mLog::FINFO(" {$} Got Request key {$} transaction {$}",
  799. // m_strClientID, keystr, m_strCurTransaction);
  800. if (cmd == PACKET_CMD_OPEN) {
  801. ResDataObject resRes, resResponse;
  802. InnerOpenClose(req, resRes, true);
  803. PacketAnalizer::MakeOpenResponse(req, resResponse, resRes);
  804. resResponse.update("Online", m_rsOnlineGroup);
  805. PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction);
  806. ResDataObject resTopic;
  807. PacketAnalizer::GetContextTopic(&req, resTopic);
  808. PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str());
  809. if (keystr.substr(0, 4) == "CCOS") {
  810. PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str());
  811. }
  812. else {
  813. PacketAnalizer::UpdateDeviceNotifyResponse(resResponse,
  814. getLocalMachineId(),
  815. getLocalEbusId(),
  816. (uint64_t)getpid(),
  817. (uint64_t)m_pMqttConntion);
  818. }
  819. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  820. // 发送连接状态通知
  821. ResDataObject NotifyData;
  822. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE,
  823. "ConnectionStatus", "1");
  824. PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction);
  825. CmdFromLogicDev(&NotifyData);
  826. }
  827. else {
  828. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  829. ResDataObject resPacket;
  830. RET_STATUS retStatus = RET_FAILED;
  831. if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") {
  832. ResDataObject devRes;
  833. if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) {
  834. LogicDevice::GetDeviceResource(&devRes);
  835. PacketAnalizer::UpdatePacketContext(resPacket, devRes);
  836. }
  837. }
  838. else {
  839. retStatus = Request(&req, &resPacket);
  840. }
  841. PacketAnalizer::MakeRetCode(retStatus, &resPacket);
  842. PacketAnalizer::CloneTransaction(&req, &resPacket);
  843. ResDataObject resTopic;
  844. PacketAnalizer::GetContextTopic(&req, resTopic);
  845. PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str());
  846. PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
  847. }
  848. }
  849. } while (got);
  850. // 检查心跳(10分钟无数据发送心跳)
  851. if (GetTickCount() - m_dwLastPacket > 600000) {
  852. m_dwLastPacket = GetTickCount();
  853. ResDataObject heartBeat;
  854. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  855. "HeartBeat", m_strClientID.c_str());
  856. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  857. }
  858. return true;
  859. }
  860. // 检查心跳(即使没有请求)
  861. if (GetTickCount() - m_dwLastPacket > 600000) {
  862. m_dwLastPacket = GetTickCount();
  863. ResDataObject heartBeat;
  864. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  865. "HeartBeat", m_strClientID.c_str());
  866. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  867. }
  868. return true;
  869. }
  870. void SYSTEM_CALL LogicDevice::CompleteUnInit()
  871. {
  872. StopThread();
  873. }
  874. int LogicDevice::GetDevice_Thread_Priority()
  875. {
  876. return THREAD_PRIORITY_NONE;
  877. }
  878. RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  879. {
  880. //pDeviceResource->update("ClientType", DPC_UnitClient);
  881. GUID guid;
  882. string name;
  883. GetDeviceType(guid);
  884. guid_2_string(guid, name);
  885. pDeviceResource->update("DeviceType", name.c_str());
  886. //Get Unit Type (Unit GUID)
  887. if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
  888. {
  889. pDeviceResource->add("LogicDevInstance", m_pDevInstance);
  890. }
  891. ////
  892. size_t idx = (*pDeviceResource)["Attribute"].size();
  893. if (idx > 0)
  894. {
  895. int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
  896. if (erroridx < 0)
  897. {
  898. (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
  899. }
  900. else
  901. {
  902. (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
  903. }
  904. }
  905. else
  906. {
  907. ResDataObject Attribute;
  908. Attribute.add("ErrorList", *m_pResErrorList);
  909. pDeviceResource->add("Attribute", Attribute);
  910. }
  911. // (*pDeviceResource)["Action"].size();
  912. if (pDeviceResource->GetFirstOf("Action") < 0)
  913. {
  914. pDeviceResource->add("Action", m_Actions);
  915. }
  916. return RET_SUCCEED;
  917. }
  918. //notify from lower layer
  919. RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
  920. {
  921. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
  922. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  923. if (type == PACKET_TYPE_NOTIFY)
  924. {
  925. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  926. PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle);
  927. ////mLog::FDEBUG("Notify Transaction: {$}", m_strCurTransaction);
  928. PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction);
  929. ;
  930. //if (m_strEBusRoot.length() <= 0)
  931. //{
  932. // //////mLog::FWARN("EBusRoot is null");
  933. //}
  934. PacketAnalizer::UpdateDeviceNotifyResponse(
  935. *pCmd,
  936. getLocalMachineId(),
  937. getLocalEbusId(),
  938. static_cast<uint64_t>(getpid()), // Linux 上获取进程 ID
  939. reinterpret_cast<uint64_t>(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型
  940. );
  941. ////mLog::FDEBUG("[{$}] Notify: {$}", m_strClientID, pCmd->encode());
  942. //printf("--> %s \n", pCmd->encode());
  943. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str());
  944. PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  945. string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd);
  946. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str());
  947. PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion);
  948. if (m_strAbstractPath.length() > 0)
  949. {
  950. PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion);
  951. string realPath,devPath;
  952. devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length());
  953. for (int x = 0; x < m_rsOnlineGroup.size(); x++)
  954. {
  955. if ((int)m_rsOnlineGroup[x] == 1)
  956. {
  957. realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath;
  958. PublishAction(pCmd, realPath.c_str(), m_pMqttConntion);
  959. }
  960. }
  961. } //m_pPacketSendingQue->InQueue(*pCmd);
  962. delete pHandle;
  963. }
  964. return RET_SUCCEED;
  965. /*
  966. if (pCmd && m_pSysLogic)
  967. {
  968. return m_pSysLogic->CmdFromLogicDev(pCmd);
  969. }
  970. //put log here
  971. return RET_FAILED;*/
  972. }
  973. std::wstring mb2wc_a(const char* mbstr)
  974. {
  975. std::wstring strVal;
  976. // 获取输入字符串的长度
  977. size_t mbstr_len = strlen(mbstr);
  978. // 计算转换后宽字符字符串的长度
  979. size_t size = mbstr_len + 1; // 需要多一个字符来存储结尾的 '\0'
  980. wchar_t* wcstr = new wchar_t[size];
  981. if (wcstr)
  982. {
  983. memset(wcstr, 0, size * sizeof(wchar_t));
  984. // mbsrtowcs 返回转换后的字符数
  985. size_t ret = mbsrtowcs(wcstr, &mbstr, size, nullptr);
  986. if (ret != (size_t)-1) // mbsrtowcs 返回 (size_t)-1 表示错误
  987. {
  988. strVal = wcstr;
  989. }
  990. delete[] wcstr;
  991. }
  992. return strVal;
  993. }
  994. RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
  995. {
  996. string ResBase64, DesBase64;
  997. wstring wResUTF = ResInfo;
  998. wstring wDesUTF = Description;
  999. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  1000. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  1001. return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
  1002. }
  1003. RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  1004. {
  1005. string ResBase64,DesBase64;
  1006. wstring wResUTF = mb2wc_a(ResInfo);
  1007. wstring wDesUTF = mb2wc_a(Description);
  1008. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  1009. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  1010. return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
  1011. }
  1012. RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  1013. {
  1014. //int ret = 1;
  1015. if (Code == 0 || (string)ResInfo == "")
  1016. {
  1017. ////mLog::FERROR("Code or ResInfo is empty");
  1018. return RET_FAILED;
  1019. }
  1020. MessageInfo info;
  1021. info.CodeID = Code;
  1022. info.Type = nMessageType;
  1023. info.Level = Level;
  1024. info.Resouceinfo = ResInfo;
  1025. info.Description = Description;
  1026. string strDevInstanceCode = DevInstance;
  1027. strDevInstanceCode += Code;
  1028. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1029. {
  1030. string strInstancekey = m_pResErrorList->GetKey(i);
  1031. if (strInstancekey == strDevInstanceCode)
  1032. {
  1033. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1034. //{
  1035. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1036. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1037. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1038. // {
  1039. //ret = 0;
  1040. ////mLog::FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
  1041. return RET_SUCCEED;
  1042. // }
  1043. //}
  1044. //ret = 2;
  1045. //break;
  1046. }
  1047. }
  1048. //if (ret==1)
  1049. {
  1050. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1051. //info.GetResDataObject(tempInfo);
  1052. //ErrorInfo.update(Code, tempInfo);
  1053. info.GetResDataObject(ErrorInfo);
  1054. struct timeval tv;
  1055. struct tm* tm_info;
  1056. char TimeTag[30];
  1057. // 获取当前时间
  1058. gettimeofday(&tv, NULL);
  1059. // 转换为本地时间
  1060. tm_info = localtime(&tv.tv_sec);
  1061. // 格式化时间为字符串
  1062. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1063. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1064. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1065. ErrorInfo.add("CreationTime", TimeTag);
  1066. ErrorInfo.add("AppId", pAppId);
  1067. ErrorInfo.add("InstanceId", DevInstance);
  1068. if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1069. {
  1070. m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
  1071. }
  1072. ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
  1073. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1074. ////mLog::FWARN( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
  1075. CmdFromLogicDev(&NotifyData);
  1076. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1077. }
  1078. //else if (ret == 2)
  1079. //{
  1080. // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
  1081. // info.GetResDataObject(tempInfo);
  1082. // ErrorInfo.update(Code, tempInfo);
  1083. // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1084. // {
  1085. // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
  1086. // }
  1087. // ResNotify.update(DevInstance, ErrorInfo);
  1088. // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1089. // RES_////mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
  1090. // CmdFromLogicDev(&NotifyData);
  1091. //}
  1092. //put log here
  1093. return RET_SUCCEED;
  1094. }
  1095. RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
  1096. {
  1097. AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
  1098. //put log here
  1099. return RET_FAILED;
  1100. }
  1101. RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
  1102. {
  1103. int index = -1;
  1104. bool m_bClearAll = false;
  1105. //trim empty code string
  1106. string CodeStr = Code;
  1107. if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
  1108. {
  1109. CodeStr = "";
  1110. Code = CodeStr.c_str();
  1111. m_bClearAll = true;
  1112. }
  1113. //if ((string)Code == "0" || (string)Code == "")
  1114. //{
  1115. // m_bClearAll = true;
  1116. //}
  1117. string strDevInstanceCode = DevInstance;
  1118. strDevInstanceCode += Code;
  1119. if (m_bClearAll)
  1120. {
  1121. m_pResErrorList->clear();
  1122. }
  1123. else
  1124. {
  1125. MessageInfo info;
  1126. info.CodeID = Code;
  1127. info.Type = nMessageType;
  1128. info.Level = Level;
  1129. info.Resouceinfo = ResInfo;
  1130. info.Description = Description;
  1131. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1132. {
  1133. string strInstancekey = m_pResErrorList->GetKey(i);
  1134. if (strInstancekey == strDevInstanceCode)
  1135. {
  1136. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1137. //{
  1138. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1139. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1140. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1141. // {
  1142. // index = (INT)j;
  1143. // break;
  1144. // }
  1145. //}
  1146. index = (int)i;
  1147. break;
  1148. }
  1149. }
  1150. }
  1151. MessageInfo info;
  1152. info.CodeID = Code;
  1153. info.Type = nMessageType;
  1154. info.Level = Level;
  1155. info.Resouceinfo = ResInfo;
  1156. info.Description = Description;
  1157. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1158. //info.GetResDataObject(tempInfo);
  1159. //ErrorInfo.add(Code, tempInfo);
  1160. info.GetResDataObject(ErrorInfo);
  1161. ErrorInfo.add("InstanceId", DevInstance);
  1162. bool result = false;
  1163. if (index>=0)
  1164. {
  1165. //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
  1166. result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
  1167. }
  1168. if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
  1169. {
  1170. ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
  1171. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
  1172. ////mLog::FWARN( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
  1173. CmdFromLogicDev(&NotifyData);
  1174. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1175. }
  1176. return RET_SUCCEED;
  1177. }
  1178. RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
  1179. {
  1180. DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
  1181. //put log here
  1182. return RET_FAILED;
  1183. }
  1184. RET_STATUS LogicDevice::EvtProcedure()
  1185. {
  1186. return RET_FAILED;
  1187. }
  1188. bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
  1189. {
  1190. ////mLog::FERROR("NOT FINISHED YET");
  1191. return false;
  1192. }
  1193. RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
  1194. {
  1195. std::string strResult = "";
  1196. //组Context包
  1197. //if (m_pLogger)
  1198. {
  1199. wstring wContect = mb2wc_a(pContext);
  1200. CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
  1201. }
  1202. //Thread_Lock();
  1203. //if (NULL != fmt)
  1204. //{
  1205. // va_list marker = NULL;
  1206. // va_start(marker, fmt);
  1207. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1208. // std::vector<char> vBuffer(nLength, '\0');
  1209. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1210. // if (nWritten > 0)
  1211. // {
  1212. // strResult = &vBuffer[0];
  1213. // }
  1214. // va_end(marker);
  1215. //}
  1216. //Thread_UnLock();
  1217. ResDataObject SysLogNode;
  1218. string guidstr;
  1219. GUID DeviceGuid;
  1220. //组Log包
  1221. if (GetDeviceType(DeviceGuid))
  1222. {
  1223. guid_2_string(DeviceGuid, guidstr);
  1224. SysLogNode.add("Module", guidstr.c_str());
  1225. SysLogNode.add("AppId", pAppId);
  1226. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1227. if (pCode)
  1228. {
  1229. SysLogNode.add("BusinessKey", pCode);
  1230. }
  1231. else
  1232. {
  1233. SysLogNode.add("BusinessKey", "");
  1234. }
  1235. SysLogNode.add("IP", (const char*)getLocalIpAddress());
  1236. struct timeval tv;
  1237. struct tm* tm_info;
  1238. char TimeTag[30];
  1239. // 获取当前时间
  1240. gettimeofday(&tv, NULL);
  1241. // 转换为本地时间
  1242. tm_info = localtime(&tv.tv_sec);
  1243. // 格式化时间为字符串
  1244. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1245. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1246. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1247. SysLogNode.add("CreationTime", TimeTag);
  1248. string strLevel = SysLogLevel2str(Level);
  1249. SysLogNode.add("Level", strLevel.c_str());
  1250. SysLogNode.add("HostName", (const char*)getLocalMachineId());
  1251. SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
  1252. SysLogNode.add("FreeText", strResult.c_str());
  1253. SysLogNode.add("Context", pCode);
  1254. ResDataObject NotifyData;
  1255. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1256. CmdFromLogicDev(&NotifyData);
  1257. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1258. }
  1259. else
  1260. {
  1261. ////mLog::FWARN("no Guid??");
  1262. return RET_FAILED;
  1263. }
  1264. //打印LOG
  1265. switch (Level)
  1266. {
  1267. case Syslog_Debug:
  1268. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1269. ////mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
  1270. break;
  1271. case Syslog_Information:
  1272. ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1273. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1274. break;
  1275. case Syslog_Warning:
  1276. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1277. ////mLog::Warn("SysLog {$} ", SysLogNode.encode());
  1278. break;
  1279. case Syslog_Error:
  1280. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1281. ////mLog::FERROR("SysLog {$} ", SysLogNode.encode());
  1282. break;
  1283. case Syslog_Fatal:
  1284. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1285. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1286. break;
  1287. default:
  1288. ////mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
  1289. break;
  1290. }
  1291. return RET_SUCCEED;
  1292. }
  1293. RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
  1294. {
  1295. std::string strResult = "";
  1296. //组Context包
  1297. //if (m_pLogger)
  1298. //{
  1299. // m_pLogger->Thread_Lock();
  1300. // if (NULL != fmt)
  1301. // {
  1302. // va_list marker = NULL;
  1303. // va_start(marker, fmt);
  1304. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1305. // std::vector<char> vBuffer(nLength, '\0');
  1306. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1307. // if (nWritten > 0)
  1308. // {
  1309. // strResult = &vBuffer[0];
  1310. // }
  1311. // va_end(marker);
  1312. // }
  1313. // m_pLogger->Thread_UnLock();
  1314. //}
  1315. //else
  1316. {
  1317. Thread_Lock();
  1318. if (fmt != NULL) {
  1319. va_list marker;
  1320. va_start(marker, fmt);
  1321. // 获取格式化字符串所需的大小(不包括终止符)
  1322. int nLength = vsnprintf(NULL, 0, fmt, marker);
  1323. if (nLength >= 0) {
  1324. std::vector<char> vBuffer(nLength + 1, '\0'); // +1 为终止符
  1325. vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker);
  1326. strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串
  1327. }
  1328. va_end(marker);
  1329. }
  1330. Thread_UnLock();
  1331. }
  1332. ResDataObject SysLogNode;
  1333. string guidstr;
  1334. GUID DeviceGuid;
  1335. //组Log包
  1336. if (GetDeviceType(DeviceGuid))
  1337. {
  1338. guid_2_string(DeviceGuid, guidstr);
  1339. SysLogNode.add("Module", guidstr.c_str());
  1340. SysLogNode.add("AppId", "");
  1341. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1342. if (pCode)
  1343. {
  1344. SysLogNode.add("BusinessKey", pCode);
  1345. }
  1346. else
  1347. {
  1348. SysLogNode.add("BusinessKey", "");
  1349. }
  1350. SysLogNode.add("IP", (const char *)getLocalIpAddress());
  1351. struct timeval tv;
  1352. struct tm* tm_info;
  1353. char TimeTag[30];
  1354. // 获取当前时间
  1355. gettimeofday(&tv, NULL);
  1356. // 转换为本地时间
  1357. tm_info = localtime(&tv.tv_sec);
  1358. // 格式化时间为字符串
  1359. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1360. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1361. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1362. SysLogNode.add("CreationTime", TimeTag);
  1363. SysLogNode.add("Level", Level);
  1364. SysLogNode.add("HostName", (const char *)getLocalMachineId());
  1365. SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
  1366. SysLogNode.add("FreeText", strResult.c_str());
  1367. ResDataObject NotifyData;
  1368. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1369. CmdFromLogicDev(&NotifyData);
  1370. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1371. if(m_pParent != nullptr)
  1372. NotifyParent(&NotifyData, "/Notify");
  1373. }
  1374. else
  1375. {
  1376. ////mLog::FERROR("no Guid??");
  1377. return RET_FAILED;
  1378. }
  1379. //打印LOG
  1380. switch (Level)
  1381. {
  1382. case Syslog_Debug:
  1383. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1384. ////mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
  1385. break;
  1386. case Syslog_Information:
  1387. //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1388. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1389. break;
  1390. case Syslog_Warning:
  1391. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1392. ////mLog::Warn("SysLog {$}", SysLogNode.encode());
  1393. break;
  1394. case Syslog_Error:
  1395. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1396. ////mLog::FERROR("SysLog {$}", SysLogNode.encode());
  1397. break;
  1398. case Syslog_Fatal:
  1399. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1400. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1401. break;
  1402. default:
  1403. ////mLog::FINFO( "SysLog {$}", SysLogNode.encode());
  1404. break;
  1405. }
  1406. return RET_SUCCEED;
  1407. }
  1408. /////////////////////////////////////////////////////////////////////////////
  1409. ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  1410. {
  1411. return nullptr;
  1412. }
  1413. std::string CurrentDateTime()
  1414. {
  1415. auto now = std::chrono::system_clock::now();
  1416. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  1417. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  1418. std::tm now_tm{};
  1419. localtime_r(&now_time_t, &now_tm); // 线程安全版本
  1420. char buf[64];
  1421. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm);
  1422. std::string now_time_str = buf;
  1423. // 格式化微秒部分并追加
  1424. snprintf(buf, sizeof(buf), ".%06lld", static_cast<long long>(now_us.count() % 1000000));
  1425. now_time_str += buf;
  1426. return now_time_str;
  1427. }
  1428. static string CurrentDateTime2()
  1429. {
  1430. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  1431. char buf[100] = { 0 };
  1432. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  1433. return buf;
  1434. }
  1435. bool LogicDevice::GetActions(ResDataObject& resAction)
  1436. {
  1437. for (int x = 0; x < m_Actions.size(); x++)
  1438. {
  1439. resAction.update(m_Actions.GetKey(x), "");
  1440. }
  1441. return true;
  1442. }
  1443. void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
  1444. {
  1445. if (m_pParent != nullptr)
  1446. {
  1447. if (hConnection == nullptr)
  1448. {
  1449. hConnection = m_pParent->m_pMqttConntion;
  1450. }
  1451. PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
  1452. }
  1453. }
  1454. const mqtt_qos_t MQTT_QOS = QOS2;
  1455. /// <summary>
  1456. /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector
  1457. /// CCOS/Demo/Detector
  1458. /// </summary>
  1459. /// <param name="bOnline"></param>
  1460. void LogicDevice::SubscribeGroupActions(string wsName, int bOnline)
  1461. {
  1462. if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0)
  1463. {
  1464. int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()];
  1465. if (bOnline == lastOnline)
  1466. {
  1467. ////mLog::FWARN("Group Abstract is already ? Online {$}", bOnline);
  1468. return;
  1469. }
  1470. }
  1471. if ( m_strAbstractPath.length() > 0)
  1472. {
  1473. std::string pszAction, gpPath;
  1474. int ret = 0;
  1475. int num = m_Actions.size();
  1476. ////mLog::FINFO("Group Device onLine or offLine {$}", bOnline);
  1477. gpPath = m_strAbstractPath;
  1478. gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName);
  1479. pszAction = gpPath + m_strDevicePath;
  1480. pszAction += "/Action/+";
  1481. SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1482. if (bOnline)
  1483. {
  1484. ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1485. }
  1486. else
  1487. {
  1488. ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1489. }
  1490. //for (size_t i = 0; i < num; i++)
  1491. //{
  1492. // pszAction = gpPath + m_strDevicePath;
  1493. // pszAction += "/Action/";
  1494. // pszAction += (m_Actions.GetKey(i));
  1495. // SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1496. // if (bOnline)
  1497. // {
  1498. // ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1499. // }
  1500. // else
  1501. // {
  1502. // ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1503. // }
  1504. //}
  1505. m_rsOnlineGroup.update(wsName.c_str(), bOnline);
  1506. }
  1507. else
  1508. {
  1509. ////mLog::FWARN("try Online but AbstractPath is none {$}", m_strClientID);
  1510. }
  1511. }
  1512. void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase)
  1513. {
  1514. string pszAction = devuribBase;
  1515. pszAction += "/Get/+";
  1516. SubscribeTopic(pconn, pszAction.c_str());
  1517. pszAction = devuribBase;
  1518. pszAction += "/Update/+";
  1519. SubscribeTopic(pconn, pszAction.c_str());
  1520. pszAction = devuribBase;
  1521. pszAction += "/Set/+";
  1522. SubscribeTopic(pconn, pszAction.c_str());
  1523. pszAction = devuribBase;
  1524. pszAction += "/Add/+";
  1525. SubscribeTopic(pconn, pszAction.c_str());
  1526. pszAction = devuribBase;
  1527. pszAction += "/Del/+";
  1528. SubscribeTopic(pconn, pszAction.c_str());
  1529. pszAction = devuribBase;
  1530. pszAction += "/Action/+";
  1531. SubscribeTopic(pconn, pszAction.c_str());
  1532. pszAction = devuribBase;
  1533. pszAction += "/Message/+";
  1534. SubscribeTopic(pconn, pszAction.c_str());
  1535. }
  1536. void LogicDevice::SubscribeActions()
  1537. {
  1538. ////mLog::FINFO("Begain");
  1539. if (nullptr == m_pMqttConntion)
  1540. return;
  1541. int num = m_Actions.size();
  1542. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
  1543. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
  1544. {
  1545. ResDataObject res,resAction;
  1546. GetDeviceResource(&res);
  1547. std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
  1548. resAction = res["Action"];
  1549. ////mLog::FINFO("Actions from deviceresource [{$}]", resAction.size());
  1550. for (int x = 0; x < resAction.size(); x++)
  1551. {
  1552. string action = (string)resAction.GetKey(x);
  1553. if (action.length() > 0)
  1554. {
  1555. ////mLog::FINFO("Action from DeviceResource [{$}]", action);
  1556. m_Actions.update(action.c_str(), "");
  1557. }
  1558. }
  1559. num = m_Actions.size();
  1560. }
  1561. std::string pszAction;
  1562. int ret = 0;
  1563. SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str());
  1564. ////mLog::FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1565. if (m_strEBusRoot != m_strCCOSDevicePath)
  1566. {
  1567. pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1568. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1569. ////mLog::FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1570. }
  1571. if (m_strAbstractPath.length() > 0)
  1572. {
  1573. pszAction = m_strAbstractPath + m_strDevicePath;
  1574. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1575. ////mLog::FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1576. }
  1577. //for (size_t i = 0; i < num; i++)
  1578. //{
  1579. // //订阅ebus的路径
  1580. // pszAction = m_strEBusRoot;
  1581. // pszAction += "/Action/";
  1582. // pszAction += (m_Actions.GetKey(i));
  1583. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1584. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1585. //
  1586. // //订阅CCOS设备路径
  1587. // if (m_strEBusRoot != m_strCCOSDevicePath)
  1588. // {
  1589. // pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1590. // pszAction += "/Action/";
  1591. // pszAction += (m_Actions.GetKey(i));
  1592. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1593. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1594. // }
  1595. // //抽象设备路径要一直存在
  1596. // //订阅抽象设备路径
  1597. // if ( m_strAbstractPath.length() > 0)
  1598. // {
  1599. // pszAction = m_strAbstractPath + m_strDevicePath;
  1600. // pszAction += "/Action/";
  1601. // pszAction += (m_Actions.GetKey(i));
  1602. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1603. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1604. // }
  1605. //}
  1606. //pszAction = m_strEBusRoot;
  1607. //pszAction += "/Action/UpdateDeviceResource";
  1608. ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1609. //pTopicList->push_back(pszAction);
  1610. //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1611. //////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1612. //*/
  1613. }
  1614. /*
  1615. void connlost(void* context, char* cause)
  1616. {
  1617. //连接中断
  1618. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1619. std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
  1620. printf("\nConnection lost\n");
  1621. printf(" cause: %s\n", cause);
  1622. std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
  1623. }
  1624. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  1625. {
  1626. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1627. std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
  1628. }
  1629. */
  1630. //发送成功
  1631. //void delivered(void* context, MQTTAsync_token dt)
  1632. //{
  1633. // printf("Message with token value %d delivery confirmed\n", dt);
  1634. // //deliveredtoken = dt;
  1635. //}
  1636. //void connlost(void* context, char* cause)
  1637. //{
  1638. // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
  1639. // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
  1640. //
  1641. // //连接中断
  1642. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1643. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1644. // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
  1645. // return;
  1646. //
  1647. // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1648. // int rc;
  1649. //
  1650. // printf("Reconnecting\n");
  1651. // conn_opts.keepAliveInterval = 20;
  1652. // conn_opts.cleansession = 1;
  1653. // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  1654. // {
  1655. // printf("Failed to start connect, return code %d\n", rc);
  1656. // //finished = 1;
  1657. // }
  1658. //}
  1659. //重连成功
  1660. //void onReconnected(void* context, char* cause)
  1661. //{
  1662. // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
  1663. // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
  1664. //
  1665. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1666. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1667. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1668. // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1669. // const char* client_id = std::get<CLINET_ID_ID>(*connection);
  1670. // int rc;
  1671. //
  1672. // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
  1673. // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
  1674. // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
  1675. // // 重新订阅,暂不重新订阅,看看
  1676. //
  1677. // ///*
  1678. // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1679. // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1680. // //opts.onSuccess = onSubscribe;
  1681. // //opts.onFailure = onSubscribeFailure;
  1682. // //opts.context = client;
  1683. // auto it = pTopicList->begin();
  1684. // while(it != pTopicList->end())
  1685. // {
  1686. // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
  1687. // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
  1688. // it++;
  1689. // }
  1690. // //*/
  1691. //
  1692. //}
  1693. //
  1694. ////MQTT 连接主动断开连接失败
  1695. //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1696. //{
  1697. // printf("Disconnect failed, rc %d\n", response->code);
  1698. // //disc_finished = 1;
  1699. //}
  1700. //
  1701. ////MQTT 连接主动断开连接成功
  1702. //void onDisconnect(void* context, MQTTAsync_successData* response)
  1703. //{
  1704. // //printf("Successful disconnection\n");
  1705. // //disc_finished = 1;
  1706. //}
  1707. //
  1708. //void onSubscribe(void* context, MQTTAsync_successData* response)
  1709. //{
  1710. // printf("Subscribe succeeded\n");
  1711. // //subscribed = 1;
  1712. //}
  1713. //
  1714. //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1715. //{
  1716. // printf("Subscribe failed, rc %d\n", response->code);
  1717. // //finished = 1;
  1718. //}
  1719. //
  1720. //
  1721. //void onConnectFailure(void* context, MQTTAsync_failureData* response)
  1722. //{
  1723. // printf("Connect failed, rc %d\n", response->code);
  1724. // //finished = 1;
  1725. //}
  1726. //void onConnect(void* context, MQTTAsync_successData* response)
  1727. //{
  1728. //
  1729. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1730. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1731. //
  1732. // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
  1733. //
  1734. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1735. // int rc;
  1736. //
  1737. // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
  1738. // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
  1739. // if (hConnected != NULL)
  1740. // SetEvent(hConnected);
  1741. //
  1742. // /*
  1743. // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1744. // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1745. // opts.onSuccess = onSubscribe;
  1746. // opts.onFailure = onSubscribeFailure;
  1747. // opts.context = client;
  1748. // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  1749. // {
  1750. // printf("Failed to start subscribe, return code %d\n", rc);
  1751. // finished = 1;
  1752. // }*/
  1753. //}
  1754. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  1755. int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS);
  1756. void resubscribe_topic(void* client, void* reconnect_date)
  1757. {
  1758. mqtt_client* pClient = (mqtt_client*)client;
  1759. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1760. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1761. string client_id = std::get<CLINET_ID_ID>(*connection);
  1762. ////mLog::FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size());
  1763. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1764. pLock->Thread_Lock();
  1765. for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void {
  1766. ////mLog::FWARN("Resubscribe {$}", str);
  1767. mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd);
  1768. });
  1769. pLock->Thread_UnLock();
  1770. }
  1771. //MQTT消息抵达
  1772. //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1773. void msgarrivd(void* client, message_data_t* message)
  1774. {
  1775. // 日志前缀:时间+线程ID
  1776. const auto logPrefix = CurrentDateTime() + " msgarrivd:: TID [" + std::to_string(GetCurrentThreadId()) + "] ";
  1777. std::cout << logPrefix << std::endl;
  1778. // 基础参数校验
  1779. if (!client || !message || !message->topic_name) return;
  1780. // 转换并校验客户端对象
  1781. auto* pClient = reinterpret_cast<mqtt_client*>(client);
  1782. if (!pClient || !pClient->mqtt_conn_context) {
  1783. std::cout << logPrefix << "Invalid client or connection context" << std::endl;
  1784. return;
  1785. }
  1786. // 转换并校验连接对象
  1787. ccos_mqtt_connection* connection = nullptr;
  1788. try {
  1789. connection = reinterpret_cast<ccos_mqtt_connection*>(pClient->mqtt_conn_context);
  1790. if (!connection) {
  1791. std::cout << logPrefix << "Connection is null" << std::endl;
  1792. return;
  1793. }
  1794. }
  1795. catch (...) {
  1796. std::cout << logPrefix << "Failed to cast connection context" << std::endl;
  1797. return;
  1798. }
  1799. // 获取并校验锁对象
  1800. CcosLock* pLock = nullptr;
  1801. try {
  1802. pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1803. if (!pLock) {
  1804. std::cout << logPrefix << "Lock is null" << std::endl;
  1805. return;
  1806. }
  1807. }
  1808. catch (...) {
  1809. std::cout << logPrefix << "Failed to get lock from connection" << std::endl;
  1810. return;
  1811. }
  1812. // 加锁并二次校验
  1813. pLock->Thread_Lock();
  1814. auto* pLockTemp = std::get<CONN_SEND_LOCK_ID>(*connection);
  1815. if (!pLockTemp) {
  1816. pLock->Thread_UnLock();
  1817. return;
  1818. }
  1819. if (!pClient->mqtt_conn_context) {
  1820. pLock->Thread_UnLock();
  1821. std::cout << logPrefix << "Connection context became null during locking" << std::endl;
  1822. return;
  1823. }
  1824. // 获取客户端ID
  1825. std::string clientId;
  1826. try {
  1827. clientId = std::get<CLINET_ID_ID>(*connection);
  1828. }
  1829. catch (...) {
  1830. pLock->Thread_UnLock();
  1831. std::cout << logPrefix << "Failed to get client ID" << std::endl;
  1832. return;
  1833. }
  1834. // 获取MQTT客户端对象
  1835. void* mqttClient = nullptr;
  1836. try {
  1837. mqttClient = std::get<MQTT_CLT_ID>(*connection);
  1838. if (!mqttClient) {
  1839. pLock->Thread_UnLock();
  1840. std::cout << logPrefix << "MQTT client is null" << std::endl;
  1841. return;
  1842. }
  1843. }
  1844. catch (...) {
  1845. pLock->Thread_UnLock();
  1846. std::cout << logPrefix << "Failed to get MQTT client" << std::endl;
  1847. return;
  1848. }
  1849. // 处理主题名
  1850. std::string topic;
  1851. try {
  1852. const size_t topicLen = strlen(message->topic_name);
  1853. const size_t maxTopicLen = sizeof(message->topic_name) - 1;
  1854. topic = (topicLen > maxTopicLen) ?
  1855. std::string(message->topic_name, maxTopicLen) :
  1856. std::string(message->topic_name);
  1857. }
  1858. catch (...) {
  1859. pLock->Thread_UnLock();
  1860. std::cout << logPrefix << "Failed to process topic name" << std::endl;
  1861. return;
  1862. }
  1863. // 输出消息接收日志
  1864. std::cout << logPrefix << clientId << " Get Msg from " << topic << std::endl;
  1865. // 校验消息数据
  1866. if (!message->message || !message->message->payload) {
  1867. pLock->Thread_UnLock();
  1868. std::cout << logPrefix << "Invalid message data" << std::endl;
  1869. return;
  1870. }
  1871. // 校验消息大小
  1872. const auto payloadLen = message->message->payloadlen;
  1873. if ((topic.length() >= MQTT_TOPIC_LEN_MAX && payloadLen > 16380) || payloadLen > 512 * 1024) {
  1874. pLock->Thread_UnLock();
  1875. std::cout << logPrefix << "Message too large: topic_len=" << topic.length()
  1876. << ", payload_len=" << payloadLen << std::endl;
  1877. return;
  1878. }
  1879. // 大消息提示
  1880. if (payloadLen > 16380) {
  1881. std::cout << logPrefix << clientId << " Get big Msg from " << topic
  1882. << " msg Len " << payloadLen << std::endl;
  1883. }
  1884. // 解码消息载荷
  1885. ResDataObject req;
  1886. try {
  1887. std::string payload(static_cast<const char*>(message->message->payload), payloadLen);
  1888. req.decode(payload.c_str());
  1889. }
  1890. catch (...) {
  1891. pLock->Thread_UnLock();
  1892. return;
  1893. }
  1894. // 处理消息钩子
  1895. void* pHook = nullptr;
  1896. try {
  1897. pHook = std::get<MSG_HOOK_ID>(*connection);
  1898. }
  1899. catch (...) {
  1900. pLock->Thread_UnLock();
  1901. return;
  1902. }
  1903. if (pHook) {
  1904. try {
  1905. auto* filter = reinterpret_cast<ccos_mqtt_msg_filter*>(pHook);
  1906. if (!filter) {
  1907. pLock->Thread_UnLock();
  1908. return;
  1909. }
  1910. auto func = std::get<FILTER_FUNC_ID>(*filter);
  1911. std::cout << logPrefix << clientId << " Got Hook Process " << topic << std::endl;
  1912. auto* pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
  1913. if (!pRequests) {
  1914. pLock->Thread_UnLock();
  1915. return;
  1916. }
  1917. // 遍历请求处理钩子
  1918. for (int x = 0; x < pRequests->size(); x++) {
  1919. std::cout << logPrefix << clientId << " Hook Process Function " << topic << std::endl;
  1920. const std::string keyAction = pRequests->GetKey(x);
  1921. ResDataObject resObj = (*pRequests)[x];
  1922. auto hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1923. // 匹配钩子消息
  1924. if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES &&
  1925. keyAction == PacketAnalizer::GetPacketKey(&req)) {
  1926. std::cout << logPrefix << clientId << " Got Hook Function Hooked.. " << topic << std::endl;
  1927. // 打开事件(优先使用resObj,失败则用默认)
  1928. auto event = LinuxEvent::OpenEvent(std::string(resObj).c_str());
  1929. if (!event) event = hEvent;
  1930. // 设置响应对象
  1931. auto* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
  1932. if (pResponse) pResponse->add(keyAction.c_str(), req);
  1933. // 触发事件
  1934. if (event) {
  1935. try { event->SetEvent(); }
  1936. catch (...) {}
  1937. }
  1938. // 清理并退出
  1939. pRequests->eraseOneOf(keyAction.c_str(), 0);
  1940. pLock->Thread_UnLock();
  1941. return;
  1942. }
  1943. }
  1944. }
  1945. catch (...) {}
  1946. }
  1947. // 未命中钩子,继续处理
  1948. std::cout << logPrefix << clientId << " Go on Process " << topic << std::endl;
  1949. pLock->Thread_UnLock();
  1950. // 调用用户回调
  1951. ccos_mqtt_callback onmsg = nullptr;
  1952. try {
  1953. onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
  1954. }
  1955. catch (...) {
  1956. return;
  1957. }
  1958. if (onmsg) {
  1959. try { onmsg(&req, topic.c_str(), connection); }
  1960. catch (...) {}
  1961. }
  1962. else {
  1963. std::cout << logPrefix << "**** ----- **** USER_MSG_CAKBCK_ID is null "
  1964. << clientId << " When processing " << topic << std::endl;
  1965. }
  1966. }
  1967. void* MqttSendThreadFunc(void* pPara)
  1968. {
  1969. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara;
  1970. if (!connection) {
  1971. return NULL;
  1972. }
  1973. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1974. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*connection);
  1975. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1976. std::string client_id = std::get<CLINET_ID_ID>(*connection);
  1977. sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
  1978. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1979. // 线程启动日志
  1980. std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
  1981. while (std::get<THREAD_RUNNING_ID>(*connection)) {
  1982. struct timespec ts;
  1983. clock_gettime(CLOCK_REALTIME, &ts);
  1984. ts.tv_sec += 1;
  1985. // 等待信号量前的日志
  1986. //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
  1987. if (sem_timedwait(hSemaphore, &ts) == 0) {
  1988. std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
  1989. pLock->Thread_Lock();
  1990. std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
  1991. if (pList->empty()) {
  1992. std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
  1993. pLock->Thread_UnLock();
  1994. continue;
  1995. }
  1996. ResDataObject* pMsg = pList->front();
  1997. pList->pop_front();
  1998. pLock->Thread_UnLock();
  1999. std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
  2000. std::string m_strSendMessage;
  2001. for (int x = 0; x < pMsg->size(); x++) {
  2002. const char* pTopic = pMsg->GetKey(x);
  2003. m_strSendMessage = (std::string)(*pMsg)[pTopic];
  2004. std::cout<< CurrentDateTime() << " [" << client_id << "] Prepare to publish messages to the topic: " << pTopic
  2005. << ", Message length: " << m_strSendMessage.length() << "Byte" << std::endl;
  2006. PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id, pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
  2007. }
  2008. std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
  2009. delete pMsg;
  2010. //std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
  2011. }
  2012. else if (errno == ETIMEDOUT) {
  2013. //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
  2014. continue;
  2015. }
  2016. else {
  2017. // 其他错误情况
  2018. std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
  2019. continue;
  2020. }
  2021. }
  2022. return NULL;
  2023. }
  2024. // 安全遍历MQTT消息处理程序的宏
  2025. #define mqtt_list_for_each_entry_safe(pos, n, head, member) \
  2026. for (pos = container_of((head)->next, decltype(*pos), member), \
  2027. n = container_of(pos->member.next, decltype(*pos), member); \
  2028. &pos->member != (head); \
  2029. pos = n, n = container_of(n->member.next, decltype(*n), member))
  2030. // 通过成员指针获取包含结构的指针
  2031. #define container_of(ptr, type, member) ({ \
  2032. const typeof(((type*)0)->member)* __mptr = (ptr); \
  2033. (type*)((char*)__mptr - offsetof(type, member)); })
  2034. // 重连回调函数 - 实现自动重订阅
  2035. static void reconnect_handler(void* client, void* reconnect_data) {
  2036. mqtt_client_t* c = (mqtt_client_t*)client;
  2037. message_handlers_t* pos, * n;
  2038. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context;
  2039. // 遍历所有已订阅的主题并重新订阅
  2040. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  2041. pLock->Thread_Lock();
  2042. /*mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) {
  2043. mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler);
  2044. }*/
  2045. pLock->Thread_UnLock();
  2046. }
  2047. mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
  2048. if (!connection) {
  2049. std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl;
  2050. return nullptr;
  2051. }
  2052. //mqtt_log_init();
  2053. mqtt_client_t* client = NULL;
  2054. client = mqtt_lease();
  2055. mqtt_log_init();
  2056. if (!client) { // 修复:检查客户端指针是否有效
  2057. return nullptr;
  2058. }
  2059. std::get<MQTT_CLT_ID>(*connection) = client;
  2060. client->mqtt_conn_context = connection;
  2061. std::string& pszClientID = std::get<CLINET_ID_ID>(*connection);
  2062. // 使用本地副本存储连接参数
  2063. std::string host = SERVER_ADDRESS;
  2064. std::string port = "1883";
  2065. std::string username = "zskkcc";
  2066. std::string password = "zskk1234";
  2067. std::cout << "[CONFIG] Setting connection parameters:"
  2068. << "\n\tClient ID: " << pszClientID
  2069. << "\n\tHost: " << host
  2070. << "\n\tPort: " << port
  2071. << std::endl;
  2072. // 设置MQTT连接参数
  2073. mqtt_set_host(client, const_cast<char*>(host.c_str()));
  2074. mqtt_set_port(client, const_cast<char*>(port.c_str()));
  2075. mqtt_set_user_name(client, const_cast<char*>(username.c_str()));
  2076. mqtt_set_password(client, const_cast<char*>(password.c_str()));
  2077. mqtt_set_client_id(client, const_cast<char*>(pszClientID.c_str()));
  2078. mqtt_set_clean_session(client, 0);
  2079. mqtt_set_write_buf_size(client, 8192);
  2080. mqtt_set_read_buf_size(client, 8192);
  2081. // 设置重连回调(替代mqtt_set_resubscribe_handler)
  2082. //mqtt_subscribe();
  2083. //mqtt_set_reconnect_handler(pMqttClient, reconnect_handler);
  2084. int rc = 0;
  2085. const uint64_t timeout_ms = 10000; // 10秒超时
  2086. uint64_t start_time = GetTickCount();
  2087. int attempt_count = 0;
  2088. bool connected = false;
  2089. std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl;
  2090. try {
  2091. while (true) {
  2092. attempt_count++;
  2093. uint64_t current_time = GetTickCount();
  2094. uint64_t elapsed = current_time - start_time;
  2095. // 检查超时
  2096. if (elapsed > timeout_ms) {
  2097. std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl;
  2098. return nullptr;
  2099. }
  2100. std::cout << CurrentDateTime() << " [ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
  2101. rc = mqtt_connect(client);
  2102. if (rc == MQTT_SUCCESS_ERROR) {
  2103. connected = true;
  2104. break;
  2105. }
  2106. std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl;
  2107. // 计算等待时间
  2108. uint64_t remaining = timeout_ms - elapsed;
  2109. uint64_t wait_time = (remaining > 2000) ? 2000 : remaining;
  2110. if (wait_time > 0) {
  2111. std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl;
  2112. usleep(wait_time * 1000);
  2113. }
  2114. }
  2115. if (connected) {
  2116. uint64_t total_time = GetTickCount() - start_time;
  2117. std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl;
  2118. // 安全地设置连接上下文
  2119. try {
  2120. return client;
  2121. }
  2122. catch (const std::exception& e) {
  2123. std::cerr << "[ERROR] Failed to set connection context: " << e.what() << std::endl;
  2124. return nullptr;
  2125. }
  2126. }
  2127. }
  2128. catch (const std::exception& e) {
  2129. std::cerr << "[ERROR] Exception during connection: " << e.what() << std::endl;
  2130. return nullptr;
  2131. }
  2132. catch (...) {
  2133. std::cerr << "[ERROR] Unknown exception during connection" << std::endl;
  2134. return nullptr;
  2135. }
  2136. return nullptr;
  2137. }
  2138. ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
  2139. {
  2140. ////mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
  2141. std::cout << "TID " << GetCurrentThreadId()
  2142. << " : " << pszClientID
  2143. << " NewConnection " << pszServer
  2144. << ":" << pszServerPort
  2145. << " user: " << pszUserName
  2146. << " password " << pszPassword
  2147. << std::endl;
  2148. //连接MQTT broker
  2149. DWORD dwTick = GetTickCount();
  2150. std::string clientIdStr(pszClientID ? pszClientID : "");
  2151. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2152. try {
  2153. // 初始化元组字段
  2154. std::get<MQTT_TIPIC_LIST_ID>(*connection) = new mqtt_topic_list;
  2155. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2156. std::get<USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2157. CcosLock* pLock = new CcosLock();
  2158. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2159. pLock->Thread_Lock();
  2160. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2161. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2162. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2163. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2164. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2165. std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
  2166. mqtt_client* pMqttClient = InnerConnect(connection);
  2167. if (pMqttClient == nullptr)
  2168. {
  2169. pLock->Thread_UnLock();
  2170. throw std::runtime_error("InnerConnect failed");
  2171. }
  2172. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2173. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2174. // 创建POSIX信号量
  2175. sem_t* semaphore = new sem_t;
  2176. if (sem_init(semaphore, 0, 0) != 0) {
  2177. pLock->Thread_UnLock();
  2178. delete semaphore;
  2179. throw std::runtime_error("Failed to initialize semaphore");
  2180. }
  2181. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2182. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2183. // 创建发送线程
  2184. pthread_t threadId;
  2185. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2186. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2187. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2188. // 错误处理...
  2189. sem_destroy(semaphore);
  2190. delete semaphore;
  2191. pLock->Thread_UnLock();
  2192. throw std::runtime_error("Failed to initialize semaphore");
  2193. }
  2194. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2195. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2196. pLock->Thread_UnLock();
  2197. uint64_t dwWaitTick = GetTickCount() - dwTick;
  2198. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2199. << dwWaitTick << " ms" << std::endl;
  2200. return connection;
  2201. }
  2202. catch (const std::exception& e) {
  2203. std::cerr << "Exception in NewConnection: " << e.what() << std::endl;
  2204. // 清理资源
  2205. if (std::get<SEMAPHORE_HANDLE_ID>(*connection)) {
  2206. sem_destroy(std::get<SEMAPHORE_HANDLE_ID>(*connection));
  2207. delete std::get<SEMAPHORE_HANDLE_ID>(*connection);
  2208. }
  2209. delete std::get<MSG_LIST_ID>(*connection);
  2210. delete std::get<CONN_SEND_LOCK_ID>(*connection);
  2211. delete std::get<MQTT_TIPIC_LIST_ID>(*connection);
  2212. delete connection;
  2213. return nullptr;
  2214. }
  2215. }
  2216. //创建额外连接,需要提供回调函数
  2217. LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async)
  2218. {
  2219. DWORD dwTick = GetTickCount();
  2220. ////mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
  2221. //mqtt_client* pMqttClient = nullptr;
  2222. //pMqttClient = mqtt_lease();
  2223. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2224. //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  2225. //std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2226. mqtt_topic_list* pTopicList = new std::list<string>;
  2227. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  2228. CcosLock* pLock = new CcosLock();
  2229. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2230. pLock->Thread_Lock();
  2231. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2232. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2233. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2234. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2235. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2236. std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
  2237. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2238. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2239. mqtt_client* pMqttClient = InnerConnect(connection);
  2240. if (pMqttClient == nullptr)
  2241. {
  2242. pLock->Thread_UnLock();
  2243. delete std::get<MQTT_TIPIC_LIST_ID>(*connection); // 删除 pTopicList
  2244. delete std::get<CONN_SEND_LOCK_ID>(*connection); // 删除 pLock
  2245. delete connection;
  2246. delete pfilter;
  2247. return nullptr;
  2248. }
  2249. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2250. //pMqttClient->mqtt_conn_context = connection;
  2251. //设置回调函数
  2252. //设置MQTT连接参数
  2253. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2254. /*
  2255. //连接Broker
  2256. int rc = 0;
  2257. dwTick = GetTickCount();
  2258. while (true)
  2259. {
  2260. rc = mqtt_connect(pMqttClient);
  2261. if (rc != MQTT_SUCCESS_ERROR)
  2262. {
  2263. if (GetTickCount() - dwTick > dwOpenTimeout)
  2264. {
  2265. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2266. pLock->Thread_UnLock();
  2267. delete connection;
  2268. delete pfilter;
  2269. return nullptr;
  2270. }
  2271. else
  2272. {
  2273. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2274. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
  2275. Sleep(20);
  2276. }
  2277. }
  2278. else
  2279. {
  2280. break;
  2281. }
  2282. }*/
  2283. DWORD dwWaitTick = GetTickCount() - dwTick;
  2284. ////mLog::FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  2285. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2286. // 创建POSIX信号量
  2287. sem_t* semaphore = new sem_t;
  2288. sem_init(semaphore, 0, 0);
  2289. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2290. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2291. // 创建发送线程
  2292. std::cout << "Preapre create MqttSendThreadFunc" << std::endl;
  2293. pthread_t threadId;
  2294. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2295. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2296. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2297. // 错误处理...
  2298. sem_destroy(semaphore);
  2299. delete semaphore;
  2300. pLock->Thread_UnLock();
  2301. delete connection;
  2302. return nullptr;
  2303. }
  2304. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2305. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2306. pLock->Thread_UnLock();
  2307. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2308. << dwWaitTick << " ms" << std::endl;
  2309. return connection;
  2310. }
  2311. //重置连接
  2312. LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection)
  2313. {
  2314. if (hConnection == nullptr)
  2315. {
  2316. return;
  2317. }
  2318. ////mLog::FWARN(" Reset Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  2319. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
  2320. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2321. if (pconn != nullptr) {
  2322. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2323. pLock->Thread_Lock();
  2324. //mqtt_do_reconnect(pconn);
  2325. mqtt_disconnect(pconn);
  2326. /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic);
  2327. mqtt_connect(pconn);*/
  2328. mqtt_release(pconn);
  2329. pconn = InnerConnect(hConnection);
  2330. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2331. if (pconn != nullptr)
  2332. {
  2333. //std::get<MQTT_CLT_ID>(*hConnection) = pconn;
  2334. //resubscribe_topic(pconn, pconn->mqtt_conn_context);
  2335. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2336. }
  2337. pLock->Thread_UnLock();
  2338. }
  2339. }
  2340. //关闭并释放连接
  2341. LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
  2342. {
  2343. if (!hConnection) return;
  2344. auto clientId = std::get<CLINET_ID_ID>(*hConnection);
  2345. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl;
  2346. std::get<THREAD_RUNNING_ID>(*hConnection) = false;
  2347. sem_t* semaphore = std::get<SEMAPHORE_HANDLE_ID>(*hConnection);
  2348. if (semaphore) {
  2349. sem_post(semaphore); // 发送信号量唤醒线程
  2350. }
  2351. pthread_t sendThread = std::get<CONNECTED_HANDLE_ID>(*hConnection);
  2352. if (sendThread != 0) {
  2353. // 等待线程退出,超时时间设为1秒
  2354. struct timespec timeout;
  2355. clock_gettime(CLOCK_REALTIME, &timeout);
  2356. timeout.tv_sec += 1;
  2357. int joinResult = pthread_timedjoin_np(sendThread, nullptr, &timeout);
  2358. if (joinResult != 0) {
  2359. std::cerr << CurrentDateTime() << " " << clientId
  2360. << " Warning: Send thread did not exit in time, force cancel" << std::endl;
  2361. pthread_cancel(sendThread); // 超时后强制终止(最后的备选方案)
  2362. pthread_join(sendThread, nullptr);
  2363. }
  2364. std::get<CONNECTED_HANDLE_ID>(*hConnection) = 0; // 重置线程ID
  2365. }
  2366. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2367. if (nullptr != pLock)
  2368. pLock->Thread_Lock();
  2369. else
  2370. return;
  2371. mqtt_client* pMqttClient = static_cast<mqtt_client*>(std::get<MQTT_CLT_ID>(*hConnection));
  2372. if (pMqttClient) {
  2373. mqtt_disconnect(pMqttClient); // 断开连接
  2374. mqtt_release(pMqttClient); // 释放客户端实例
  2375. pMqttClient = NULL;
  2376. std::get<MQTT_CLT_ID>(*hConnection) = nullptr;
  2377. }
  2378. ccos_mqtt_msg_filter* pFilter = static_cast<ccos_mqtt_msg_filter*>(std::get<MSG_HOOK_ID>(*hConnection));
  2379. if (pFilter) {
  2380. std::cout << "Free Connection filter [" << (UINT64)pFilter << "] .." << std::endl;
  2381. // 释放filter内部的资源对象
  2382. delete std::get<FILTER_RES_OBJ_ID>(*pFilter);
  2383. delete std::get<FILTER_RESPONS_OBJ_ID>(*pFilter);
  2384. delete pFilter;
  2385. std::get<MSG_HOOK_ID>(*hConnection) = nullptr;
  2386. }
  2387. mqtt_msg_list* pMsgList = std::get<MSG_LIST_ID>(*hConnection);
  2388. if (pMsgList) {
  2389. // 清理剩余未发送的消息
  2390. while (!pMsgList->empty()) {
  2391. ResDataObject* pMsg = pMsgList->front();
  2392. pMsgList->pop_front();
  2393. delete pMsg; // 释放消息对象
  2394. }
  2395. delete pMsgList;
  2396. std::get<MSG_LIST_ID>(*hConnection) = nullptr;
  2397. }
  2398. if (semaphore) {
  2399. sem_destroy(semaphore); // 销毁信号量
  2400. delete semaphore;
  2401. std::get<SEMAPHORE_HANDLE_ID>(*hConnection) = nullptr;
  2402. }
  2403. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2404. if (pTopicList) {
  2405. pTopicList->clear(); // 清空列表
  2406. delete pTopicList;
  2407. std::get<MQTT_TIPIC_LIST_ID>(*hConnection) = nullptr;
  2408. }
  2409. if (pLock) {
  2410. pLock->Thread_UnLock(); // 确保锁被释放
  2411. delete pLock;
  2412. std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
  2413. }
  2414. delete hConnection;
  2415. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl;
  2416. }
  2417. //主动订阅主题
  2418. LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
  2419. {
  2420. std::cout << "SubscribeTopic called. Topic: " << (pszTopic ? pszTopic : "null")
  2421. << ", isShare: " << (isShare ? "true" : "false") << std::endl;
  2422. if (hConnection == nullptr)
  2423. {
  2424. std::cout << "SubscribeTopic error: hConnection is nullptr" << std::endl;
  2425. return 0;
  2426. }
  2427. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2428. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2429. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2430. pLock->Thread_Lock();
  2431. pTopicList->push_back(std::string(pszTopic));
  2432. //pMqttClient->subscribe(pszTopic, 1, opts);
  2433. int rc = MQTT_SUCCESS_ERROR;
  2434. DWORD dwTick = GetTickCount();
  2435. int nTryTimes = 0;
  2436. do
  2437. {
  2438. if (pTopicList->size() < 1)
  2439. {
  2440. const char* topicToSubscribe = pTopicList->back().c_str();
  2441. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2442. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2443. ////mLog::FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2444. std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe First " << pszTopic << endl;
  2445. }
  2446. else
  2447. {
  2448. const char* topicToSubscribe = pTopicList->back().c_str();
  2449. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2450. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2451. ////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2452. std::cout << CurrentDateTime() << " mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << " ret: "<<ret <<endl;
  2453. }
  2454. if (rc != MQTT_SUCCESS_ERROR)
  2455. {
  2456. ////mLog::FWARN("try do mqtt reconnect ..");
  2457. //mqtt_do_reconnect(pMqttClient);
  2458. mqtt_disconnect(pMqttClient);
  2459. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2460. mqtt_release(pMqttClient);
  2461. pMqttClient = InnerConnect(hConnection);
  2462. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2463. if (pMqttClient != nullptr)
  2464. {
  2465. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2466. //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2467. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2468. }
  2469. usleep(2000 * 1000);
  2470. }
  2471. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2472. pLock->Thread_UnLock();
  2473. return 0;
  2474. }
  2475. //主题订阅取消
  2476. LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  2477. {
  2478. if (hConnection == nullptr)
  2479. {
  2480. return 0;
  2481. }
  2482. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2483. //if (!pMqttClient->is_connected()) {
  2484. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2485. // return 0;
  2486. //}
  2487. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2488. //auto ret = pMqttClient->unsubscribe(pszTopic);
  2489. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2490. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2491. pLock->Thread_Lock();
  2492. //MQTTAsync_responseOptions resp;
  2493. pTopicList->remove(pszTopic);
  2494. int rc = MQTT_SUCCESS_ERROR;
  2495. DWORD dwTick = GetTickCount();
  2496. int nTryTimes = 0;
  2497. do
  2498. {
  2499. int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
  2500. ////mLog::FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  2501. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2502. pLock->Thread_UnLock();
  2503. return 2;
  2504. }
  2505. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  2506. LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout)
  2507. {
  2508. char pszClientID[256];
  2509. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2510. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2511. getpid(), GetCurrentThreadId());
  2512. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
  2513. });
  2514. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
  2515. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  2516. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2517. string pLoad = pCmd->encode();
  2518. int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName);
  2519. std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2520. ////mLog::FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc);
  2521. CloseConnection(connObj);
  2522. return rc;
  2523. }
  2524. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  2525. LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout)
  2526. {
  2527. if (hConnection == nullptr)
  2528. {
  2529. ////mLog::FDEBUG("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
  2530. std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
  2531. return 0;
  2532. }
  2533. std::cout << CurrentDateTime()<<" " << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  2534. string topic = pszTopic;
  2535. if (topic.length() <= 0)
  2536. {
  2537. ////mLog::FWARN("ignore empty topic packet {$}", pAction->encode());
  2538. return 2;
  2539. }
  2540. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2541. //if (!pMqttClient->is_connected()) {
  2542. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2543. // return 0;
  2544. //}
  2545. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2546. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2547. pLock->Thread_Lock();
  2548. ////mLog::FDEBUG("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode());
  2549. //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction);
  2550. //if(org_publisher.length() <= 0)
  2551. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
  2552. ResDataObject* pPacket = new ResDataObject();
  2553. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2554. pPacket->add(pszTopic, pAction->encode());
  2555. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2556. pList->push_back(pPacket);
  2557. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2558. std::cout << "try publish " << pAction->encode() << endl;
  2559. //pMqttClient->publish(pszTopic, pAction->encode());
  2560. /*
  2561. //pConn->publish(pszTopic, pCmd->encode());
  2562. std::cout << "try publish " << pAction->encode() << endl;
  2563. const char* pLoad = pAction->encode();
  2564. int len = strlen(pLoad);
  2565. //MQTTAsync_responseOptions resp;
  2566. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2567. mqtt_message_t msg;
  2568. memset(&msg, 0, sizeof(msg));
  2569. msg.payload = (void*)pLoad;
  2570. msg.qos = MQTT_QOS;
  2571. msg.payloadlen = len;
  2572. int rc = MQTT_SUCCESS_ERROR;
  2573. DWORD dwTick = GetTickCount();
  2574. int nTryTimes = 0;
  2575. do
  2576. {
  2577. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2578. nTryTimes++;
  2579. if (rc != MQTT_SUCCESS_ERROR)
  2580. {
  2581. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id);
  2582. //mqtt_do_reconnect(pMqttClient);
  2583. mqtt_disconnect(pMqttClient);
  2584. rc = mqtt_connect(pMqttClient);
  2585. if (rc == MQTT_SUCCESS_ERROR)
  2586. {
  2587. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2588. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2589. }
  2590. Sleep(2);
  2591. }
  2592. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout);
  2593. //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2594. ////mLog::FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2595. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  2596. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
  2597. if (rc < 0)
  2598. {
  2599. ////mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2600. //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
  2601. pLock->Thread_UnLock();
  2602. return rc;
  2603. }*/
  2604. //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
  2605. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2606. ////mLog::FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size());
  2607. pLock->Thread_UnLock();
  2608. return 2;
  2609. }
  2610. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  2611. int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos)
  2612. {
  2613. const int dwTimeout = 500;
  2614. if (pMqttClient == nullptr)
  2615. {
  2616. ////mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message);
  2617. return 0;
  2618. }
  2619. ////mLog::FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message);
  2620. std::string messageCopy = message;
  2621. const char* pLoad = messageCopy.c_str();
  2622. int len = messageCopy.length();
  2623. mqtt_message_t msg;
  2624. memset(&msg, 0, sizeof(msg));
  2625. msg.payload = (void*)pLoad;
  2626. msg.payloadlen = len;
  2627. msg.qos = qos;
  2628. msg.retained = 1;
  2629. int rc = MQTT_SUCCESS_ERROR;
  2630. DWORD dwTick = GetTickCount();
  2631. int nTryTimes = 0;
  2632. do
  2633. {
  2634. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2635. nTryTimes++;
  2636. if (rc != MQTT_SUCCESS_ERROR)
  2637. {
  2638. ////mLog::FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id);
  2639. //mqtt_do_reconnect(pMqttClient);
  2640. ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context;
  2641. mqtt_disconnect(pMqttClient);
  2642. /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2643. mqtt_connect(pMqttClient);*/
  2644. mqtt_release(pMqttClient);
  2645. pMqttClient = nullptr;
  2646. usleep(10000 * 1000);
  2647. pMqttClient = InnerConnect(hConnection);
  2648. //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2649. if (pMqttClient != nullptr)
  2650. {
  2651. break;
  2652. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2653. //resubscribe_topic(pMqttClient, hConnection);
  2654. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2655. }
  2656. }
  2657. } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
  2658. //if(nTryTimes > 1)
  2659. ////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2660. cout << CurrentDateTime() << " CLT " << client_id.c_str() << " PublishAction to " << pszTopic << " send Times " << nTryTimes << " result " << rc << endl;
  2661. ////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2662. if (rc < 0)
  2663. {
  2664. ////mLog::FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2665. return rc;
  2666. }
  2667. return 2;
  2668. }
  2669. /*
  2670. /// <summary>
  2671. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  2672. /// 超时没收到应答返回失败,
  2673. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  2674. /// </summary>
  2675. /// <param name="pAction">要发送的命令Action名</param>
  2676. /// <param name="pContext">命令携带的参数</param>
  2677. /// <param name="pszTopic">要发送的目标topic</param>
  2678. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  2679. /// <param name="resObj">应答返回的参数结果</param>
  2680. /// <param name="dwWaitTime">等待超时时间</param>
  2681. /// <param name="hConnection">复用的MQTT链接句柄</param>
  2682. /// <param name="onmsg">复用的链接的消息处理函数</param>
  2683. /// <returns>成功返回2,其他返回错误码</returns>
  2684. LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  2685. ResDataObject& resObj, DWORD dwWaitTime)
  2686. {
  2687. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2688. if (pszRespTopic != nullptr)
  2689. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2690. if (hConnection == nullptr)
  2691. {
  2692. return 0;
  2693. }
  2694. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2695. //if (!pMqttClient->is_connected()) {
  2696. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2697. // return 0;
  2698. //}
  2699. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  2700. //SubscribeTopic()
  2701. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
  2702. string topic = topicName;//msg->get_topic().c_str();
  2703. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  2704. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
  2705. //应答到了,处理
  2706. ResDataObject req;
  2707. req.decode((const char*)message->payload);
  2708. PacketAnalizer::GetPacketContext(&req, resObj);
  2709. SetEvent(hEvent);
  2710. //处理结束 将函数指针换回去
  2711. void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2712. //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  2713. MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
  2714. }
  2715. else
  2716. {
  2717. //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2718. (msgarrvd)(context, topicName, topicLen, message);
  2719. }
  2720. };
  2721. //接管回调
  2722. // pMqttClient->set_message_callback(action_func);
  2723. MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
  2724. //pMqttClient->subscribe(pszRespTopic, 1);
  2725. MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2726. MQTTProperties prop = MQTTProperties_initializer;
  2727. std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
  2728. //pMqttClient->subscribe(pszTopic, 1, opts);
  2729. MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
  2730. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  2731. //pMqttClient->publish(pszTopic, req.encode());
  2732. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  2733. MQTTClient_message* m = NULL;
  2734. MQTTClient_deliveryToken dt;
  2735. MQTTProperty property;
  2736. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  2737. property.value.data.data = "test user property";
  2738. property.value.data.len = (int)strlen(property.value.data.data);
  2739. property.value.value.data = "test user property value";
  2740. property.value.value.len = (int)strlen(property.value.value.data);
  2741. MQTTProperties properties = MQTTProperties_initializer;
  2742. MQTTProperties_add(&properties, &property);
  2743. //pConn->publish(pszTopic, pCmd->encode());
  2744. const char* pLoad = req.encode();
  2745. MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
  2746. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2747. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  2748. if (ret == WAIT_OBJECT_0) {
  2749. //等到应答了
  2750. return 2;
  2751. }
  2752. return 0;
  2753. }
  2754. */
  2755. /// <summary>
  2756. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  2757. /// </summary>
  2758. /// <param name="hConnection"></param>
  2759. /// <param name="pAction"></param>
  2760. /// <param name="pContext"></param>
  2761. /// <param name="pszTopic"></param>
  2762. /// <param name="resObj"></param>
  2763. /// <param name="hEvent"></param>
  2764. /// <param name="dwWaitTime"></param>
  2765. /// <returns></returns>
  2766. LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
  2767. const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
  2768. {
  2769. usleep(1000 * 1000);
  2770. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2771. if (hConnection == nullptr)
  2772. {
  2773. return 0;
  2774. }
  2775. DWORD dwTick = GetTickCount();
  2776. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2777. std::shared_ptr<LinuxEvent> hEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  2778. ResDataObject resContext;
  2779. string strResObject;
  2780. char* pszPad = 0;
  2781. char* pszPad2 = 0;
  2782. auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool {
  2783. ////mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
  2784. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
  2785. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  2786. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  2787. {
  2788. if (GetTickCount() - dwTick < dwWaitTime)
  2789. {
  2790. ////mLog::FDEBUG("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2791. resObj = *rsp;
  2792. return true;
  2793. }
  2794. else
  2795. {
  2796. ////mLog::FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2797. return false;
  2798. }
  2799. std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
  2800. }
  2801. ////mLog::FDEBUG("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2802. std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
  2803. return false;
  2804. };
  2805. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2806. pLock->Thread_Lock();
  2807. ////mLog::FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
  2808. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  2809. ResDataObject *resActions = std::get<FILTER_RES_OBJ_ID>(*pfilter);
  2810. resActions->add(pAction, pszTopic);
  2811. std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
  2812. std::get<FILTER_FUNC_ID>(*pfilter) = func;
  2813. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2814. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection).c_str());
  2815. const char* pLoad = req.encode();
  2816. ResDataObject* pPacket = new ResDataObject();
  2817. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2818. pPacket->add(pszTopic, pLoad);
  2819. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2820. pList->push_back(pPacket);
  2821. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2822. /*
  2823. mqtt_message_t msg;
  2824. memset(&msg, 0, sizeof(msg));
  2825. msg.payload = (void*)pLoad;
  2826. msg.qos = MQTT_QOS;
  2827. msg.payloadlen = strlen(pLoad);
  2828. //int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2829. int rc = MQTT_SUCCESS_ERROR;
  2830. dwTick = GetTickCount();
  2831. int nTryTimes = 0;
  2832. do
  2833. {
  2834. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2835. nTryTimes++;
  2836. if (rc != MQTT_SUCCESS_ERROR)
  2837. {
  2838. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id);
  2839. //mqtt_do_reconnect(pMqttClient);
  2840. mqtt_disconnect(pMqttClient);
  2841. rc = mqtt_connect(pMqttClient);
  2842. if (rc == MQTT_SUCCESS_ERROR)
  2843. {
  2844. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2845. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2846. }
  2847. Sleep(2);
  2848. }
  2849. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3);
  2850. if (nTryTimes >= 1)
  2851. ////mLog::FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, nTryTimes, rc);
  2852. */
  2853. pLock->Thread_UnLock();
  2854. dwTick = GetTickCount() - dwTick;
  2855. //////mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
  2856. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
  2857. dwTick = GetTickCount();
  2858. DWORD ret = hEvent->Wait(dwWaitTime);
  2859. if (ret) {
  2860. //等到应答了
  2861. dwTick = GetTickCount() - dwTick;
  2862. pLock->Thread_Lock();
  2863. ResDataObject* pResp = std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
  2864. for (int x = 0; x < pResp->size(); x++)
  2865. {
  2866. ResDataObject r = (*pResp)[x];
  2867. if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r))
  2868. {
  2869. resObj = r;
  2870. pResp->eraseOneOf(pAction, x);
  2871. break;
  2872. }
  2873. }
  2874. pLock->Thread_UnLock();
  2875. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
  2876. ////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
  2877. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2878. return 2;
  2879. }
  2880. //std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2881. //resObj.decode(strResObject.c_str());
  2882. ////mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
  2883. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
  2884. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2885. return 0;
  2886. }
  2887. /// <summary>
  2888. /// 新建MQTT连接发送Ation并等待应答
  2889. /// </summary>
  2890. /// <param name="pAction"></param>
  2891. /// <param name="pContext"></param>
  2892. /// <param name="pszTopic"></param>
  2893. /// <param name="pszRespTopic"></param>
  2894. /// <param name="resObj"></param>
  2895. /// <param name="dwWaitTime"></param>
  2896. /// <param name="pszSenderName"></param>
  2897. /// <returns></returns>
  2898. LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  2899. DWORD dwWaitTime, const char* pszSenderName)
  2900. {
  2901. ResDataObject req;
  2902. if (pszRespTopic != nullptr)
  2903. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2904. //临时创建连接并发送和接收应答
  2905. char pszClientID[256];
  2906. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2907. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2908. getpid(), GetCurrentThreadId());
  2909. sem_t sem;
  2910. if (sem_init(&sem, 0, 0) != 0) {
  2911. return 0; // 如果初始化失败,返回0
  2912. }
  2913. std::cout << " ActionAndResp->NewConnection " << endl;
  2914. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) {
  2915. //应答到了,处理
  2916. PacketAnalizer::GetPacketContext(req, resObj);
  2917. sem_post(&sem);
  2918. });
  2919. ////发布消息,并等待应答
  2920. PublishAction(&req, pszTopic, connObj);
  2921. // 等待应答或超时
  2922. struct timespec ts;
  2923. clock_gettime(CLOCK_REALTIME, &ts);
  2924. ts.tv_sec += dwWaitTime / 1000; // 转换为秒
  2925. ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒
  2926. // 等待信号量或者超时
  2927. int ret = 0;
  2928. while (ret == 0) {
  2929. ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量
  2930. if (ret == -1 && errno == ETIMEDOUT) {
  2931. // 超时
  2932. sem_destroy(&sem);
  2933. CloseConnection(connObj);
  2934. return 0; // 超时返回0
  2935. }
  2936. }
  2937. // 等到应答了
  2938. sem_destroy(&sem); // 销毁信号量
  2939. CloseConnection(connObj);
  2940. return 2; // 返回2表示应答成功
  2941. }