LogicDevice.cpp 101 KB


  1. // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include <cstring>
  4. #include <iostream>
  5. #include <chrono>
  6. #include "LogicDevice.h"
  7. #include <stddef.h> // 用于 offsetof
  8. #include "PacketAnalizer.h"
  9. #include "MessageInfo.h"
  10. #include "common_api.h"
  11. #include "LocalConfig.h"
  12. #include "Base64.h"
  13. #include "SystemLogger.hpp"
  14. #include "mqttclient.h"
  15. #include "LinuxEvent.h"
  16. //Log4CPP::Logger* ////mLog::gLogger = nullptr;
  17. void msgarrivd(void* client, message_data_t* message);
  18. std::string CurrentDateTime();
  19. //-------------Logic Device SysIF--------------------------
  20. LogicDeviceSysIF::LogicDeviceSysIF(void)
  21. {
  22. }
  23. LogicDeviceSysIF::~LogicDeviceSysIF(void)
  24. {
  25. }
  26. //init
  27. void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
  28. {
  29. m_pLogicDev = p;
  30. //p->SetSysLogicDevice(this);
  31. };
  32. LogicDevice* LogicDeviceSysIF::GetLogicDevice()
  33. {
  34. return m_pLogicDev;
  35. }
  36. //Command In and Out
  37. //notify from lower layer
  38. RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  39. {
  40. return RET_FAILED;
  41. };
  42. //notify to lower layer
  43. RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
  44. {
  45. if (m_pLogicDev)
  46. {
  47. return m_pLogicDev->CmdToLogicDev(pCmd);
  48. }
  49. return RET_NOSUPPORT;
  50. };
  51. const std::string SERVER_ADDRESS("127.0.0.1");
  52. /*
  53. string DEVICE_ID; //AS CLIENT_ID
  54. //const std::string CLIENT_ID("paho_cpp_async_subcribe");
  55. //const std::string TOPIC("hello");
  56. mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
  57. const int QOS = 1;
  58. const int N_RETRY_ATTEMPTS = 5;*/
  59. using namespace std;
  60. LogicDevice* g_pDPCDeviceObject = NULL;
  61. string LogHost = "";
  62. //string DEVICE_ID;
  63. //-------------Data Logic Device--------------------------
  64. LogicDevice::LogicDevice(void)
  65. {
  66. m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  67. uuid_t uuid;
  68. char uuid_str[37];
  69. uuid_generate_random(uuid);
  70. uuid_unparse(uuid, uuid_str);
  71. m_pDevInstance = new char[40];
  72. strncpy(m_pDevInstance, uuid_str, 40);
  73. m_pResErrorList = new ResDataObject();
  74. sem_init(&m_SemphRequest, 0, 0);
  75. sem_init(&m_SemphPublish, 0, 0);
  76. m_pDrvDPC = NULL;
  77. g_pDPCDeviceObject = this;
  78. m_pMqttConntion = nullptr;
  79. //m_strServer = "tcp://localhost:1883";
  80. m_strServer = SERVER_ADDRESS;// "192.168.2.225";
  81. m_strServerPort = "1883";
  82. m_bMqttUseSSL = false;
  83. m_strMqttUser = "";
  84. m_strMqttPassword = "";
  85. m_strEBusRoot = "";
  86. m_strCCOSDevicePath = "";
  87. m_pParent = nullptr;
  88. m_topicFilter = nullptr;
  89. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  90. m_pPacketSendingQue = new MsgQueue<ResDataObject>();
  91. int x = 0;
  92. for ( x = 0; x < sizeof(szPad); x++)
  93. szPad[x] = 'A' + x % 26;
  94. szPad[x-1] = 0;
  95. memset(szPad2, 0, sizeof(szPad2));
  96. m_dwLastPacket = GetTickCount();
  97. }
  98. LogicDevice::~LogicDevice(void)
  99. {
  100. delete []m_pDevInstance;
  101. delete m_pResErrorList;
  102. sem_destroy(&m_SemphRequest);
  103. sem_destroy(&m_SemphPublish);
  104. //m_EvtNotify = NULL;
  105. delete m_pPacketReceivedQue;
  106. delete m_pPacketSendingQue;
  107. }
  108. void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
  109. if (m_strClientID.length() > 0)
  110. {
  111. ////mLog::FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID);
  112. return;
  113. }
  114. ////mLog::FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot);
  115. if (pszEBusRoot[0] == '/') {
  116. m_strEBusRoot = pszEBusRoot + 1;
  117. }
  118. else {
  119. m_strEBusRoot = pszEBusRoot;
  120. }
  121. char szKeys[256];
  122. strcpy(szKeys, pszEBusRoot);
  123. char* pt = szKeys;
  124. while (*pt != 0)
  125. {
  126. if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
  127. *pt = '_';
  128. pt++;
  129. }
  130. m_strClientID = CCOS_CLIENT_ID_PREFIX;
  131. if (szKeys[0] == '_')
  132. m_strClientID += (szKeys + 1);
  133. else
  134. m_strClientID += szKeys;
  135. if (pszCCOSRoot != nullptr)
  136. {
  137. ////mLog::FINFO("Set CCOS path: {$}", pszCCOSRoot);
  138. m_strCCOSDevicePath = pszCCOSRoot;
  139. int nPos = m_strCCOSDevicePath.find('/');
  140. if (nPos != string::npos)
  141. {
  142. //CCOS/
  143. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  144. //CCOS/DEVICE/
  145. if (nPos != string::npos)
  146. {
  147. m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos);
  148. //CCOS/DEVICE/Generator
  149. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  150. if (nPos != string::npos)
  151. {
  152. m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos);
  153. }
  154. }
  155. }
  156. ////mLog::FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath);
  157. }
  158. ////mLog::FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath);
  159. SetName(m_strClientID.c_str());
  160. OnSetClientID();
  161. }
  162. void LogicDevice::OnSetClientID() {
  163. }
  164. void LogicDevice::SubscribeSelf() {
  165. if (m_strDevicePath.length() > 0)
  166. if (m_strDevicePath.c_str()[0] != '/')
  167. m_strDevicePath = "/" + m_strDevicePath;
  168. ////mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
  169. std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
  170. if (m_strEBusRoot.length() > 0) {
  171. SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str());
  172. }
  173. if (m_strCCOSDevicePath.length() > 0)
  174. {
  175. ////mLog::FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath);
  176. SubscribeTopic(m_pMqttConntion, (m_strCCOSDevicePath + m_strDevicePath).c_str());
  177. //订阅
  178. }
  179. ////mLog::FINFO("AbstractPath [{$}] ", m_strAbstractPath);
  180. if (m_strAbstractPath.length() > 0)
  181. {
  182. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath).c_str());
  183. if(m_strDevicePath.length() > 0)
  184. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath + m_strDevicePath).c_str());
  185. }
  186. ////mLog::FINFO("CCOSRoot [{$}] ", m_strCCOSRoot);
  187. if (m_strCCOSRoot.length() > 0)
  188. {
  189. SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot).c_str());
  190. //订阅
  191. }
  192. SubscribeActions();
  193. }
  194. void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  195. {
  196. if (bSubscribe)
  197. SubscribeTopic(m_pMqttConntion, pszTopic);
  198. else
  199. UnSubscribe(m_pMqttConntion, pszTopic);
  200. }
  201. void LogicDevice::NotifyDrvThread()
  202. {
  203. m_EvtNotify->SetEvent();
  204. }
  205. std::shared_ptr<LinuxEvent> LogicDevice::GetEvtHandle()
  206. {
  207. return m_EvtNotify;
  208. }
  209. //1. init part
  210. //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
  211. //{
  212. // m_pSysLogic = pLogic;
  213. //}
  214. //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
  215. //{
  216. // m_pLogger = pLogger;
  217. //}
  218. //
  219. //Logger *LogicDevice::GetLogHandle()
  220. //{
  221. // return m_pLogger;
  222. //}
  223. void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
  224. {
  225. m_pDrvDPC = pDPC;
  226. }
  227. DriverDPC *LogicDevice::GetDrvDPC()
  228. {
  229. return m_pDrvDPC;
  230. }
  231. RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
  232. {
  233. if (szEbusDevPath == nullptr)
  234. return RET_FAILED;
  235. for (auto dev : m_subDevices) {
  236. if (dev->GetRootPath() == szEbusDevPath)
  237. return RET_SUCCEED;
  238. }
  239. m_subDevices.push_back(pChild);
  240. return RET_SUCCEED;
  241. }
  242. RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
  243. {
  244. if (szCcosDevPath == nullptr)
  245. return RET_FAILED;
  246. for (auto dev : m_subCcosDevices) {
  247. if (dev->GetCcosRootPath() == szCcosDevPath)
  248. return RET_SUCCEED;
  249. }
  250. m_subCcosDevices.push_back(pChild);
  251. return RET_SUCCEED;
  252. }
  253. LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
  254. {
  255. for (auto dev : m_subDevices) {
  256. if (dev->GetRootPath() == szEbusDevPath)
  257. return dev;
  258. }
  259. return nullptr;
  260. }
  261. LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
  262. {
  263. for (auto dev : m_subCcosDevices) {
  264. if (dev->GetCcosRootPath() == szCcosDevPath)
  265. return dev;
  266. }
  267. return nullptr;
  268. }
  269. void SYSTEM_CALL LogicDevice::CompleteInit()
  270. {
  271. //if (//mLog::gLogger == nullptr)
  272. //{
  273. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  274. // LogHost = ((string)getLogRootpath()).c_str();
  275. // if (LogHost.length() <= 1)
  276. // {
  277. // char szName[256];
  278. // sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
  279. // LogHost = szName;
  280. // }
  281. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  282. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  283. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  284. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  285. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  286. // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
  287. //}
  288. //else
  289. //{
  290. // string strRoot = ((string)getLogRootpath()).c_str();
  291. // if (strRoot.length() > 1 && strRoot != LogHost)
  292. // {
  293. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  294. // LogHost = strRoot;
  295. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  296. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  297. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  298. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  299. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  300. // }
  301. //}
  302. //string version;
  303. //if (GetVersion(version, hMyModule))
  304. // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
  305. //else
  306. ////mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
  307. ////mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
  308. if (m_strClientID.length() <= 0)
  309. {
  310. ////mLog::FWARN("No Client name ......" );
  311. std::cout << "No Client name ......." << endl;
  312. return;
  313. }
  314. std::cout << "CompleteInit->NewConnection ......." << endl;
  315. m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
  316. [this](ResDataObject* req, const char* topic, void* conn) {
  317. //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
  318. //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
  319. // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
  320. // 消息处理如果耗时较长,则需要开线程来处理
  321. if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
  322. strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0)
  323. {
  324. //主题以ebus或者ccos开头,给我发的消息,进行处理
  325. ProcessSubscribeRequest(req);
  326. }
  327. else
  328. {
  329. //我主动订阅的外部模块的消息
  330. ProcessSubscribeMsg(req);
  331. }
  332. //CmdToLogicDev(req);
  333. });
  334. if (m_pMqttConntion != nullptr)
  335. {
  336. StartThread(); //启动Request线程
  337. SubscribeSelf();
  338. //DWORD dwThreadID = 0;
  339. //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID);
  340. //////mLog::FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID);
  341. }
  342. }
  343. RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
  344. PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
  345. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
  346. switch (type) {
  347. case PACKET_TYPE_REQ:
  348. PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction);
  349. ProcessRequest(req, cmd);//请求
  350. break;
  351. case PACKET_TYPE_RES:
  352. ProcessResponse(req, cmd);//应答
  353. break;
  354. case PACKET_TYPE_NOTIFY:
  355. ProcessNotify(req, cmd);
  356. //通知
  357. break;
  358. }
  359. return RET_FAILED;
  360. }
  361. RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
  362. {
  363. ProcessSubscribeRequest(pCmd);
  364. return RET_SUCCEED;
  365. }
  366. RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
  367. {
  368. //Open命令
  369. if (cmd == PACKET_CMD_OPEN) {
  370. /* {
  371. "IDX": "40",
  372. "TYPE" : "0",
  373. "CMD" : "0",
  374. "HANDLE" :
  375. {
  376. "ROUTE": "1",
  377. "FLAGS" : "63",
  378. "LANG" : "en-US",
  379. "HANDLEID" : "0",
  380. "OWNERID" :
  381. {
  382. "EBUSID": "ImageSave",
  383. "MACHINEID" : "DESKTOP-FVD53H8",
  384. "PROCID" : "35408",
  385. "ADDR" : "2631366957696"
  386. },
  387. "DEVID":
  388. {
  389. "EBUSID": "ccosChannel",
  390. "MACHINEID" : "",
  391. "PROCID" : "0",
  392. "ADDR" : "0"
  393. }
  394. },
  395. "KEY": "\/ccosChannel"
  396. ResDataObject resRes, resResponse;
  397. ResDataObject resTopic;
  398. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  399. if (cmd == PACKET_CMD_OPEN )
  400. {
  401. //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
  402. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  403. return RET_SUCCEED;
  404. }
  405. if (cmd == PACKET_CMD_CLOSE)
  406. {
  407. //CLOSE 客户端主动断开
  408. }
  409. } */
  410. PacketArrived(pCmd);
  411. return RET_SUCCEED;
  412. }
  413. else if (cmd == PACKET_CMD_CLOSE)
  414. {
  415. ResDataObject resp;
  416. InnerOpenClose(*pCmd, resp, false);
  417. }
  418. else {
  419. PacketArrived(pCmd);
  420. }
  421. return RET_SUCCEED;
  422. }
  423. RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
  424. {
  425. return RET_SUCCEED;
  426. }
  427. RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
  428. {
  429. PacketArrived(pCmd);
  430. return RET_SUCCEED;
  431. }
  432. void LogicDevice::PacketArrived(ResDataObject* pRequest)
  433. {
  434. //m_pPacketReceivedQue->Lock();
  435. ////mLog::FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  436. //if (Thread_Lock(5000) != WAIT_OBJECT_0)
  437. //{
  438. // ////mLog::FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  439. // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  440. // return ;
  441. //}
  442. m_pPacketReceivedQue->InQueue(*pRequest);
  443. ////mLog::FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  444. //SetEvent(m_EvtNotify);//notify to user
  445. long nLast = 0;
  446. int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程
  447. if (released <= 0)
  448. {
  449. ////mLog::FERROR("PacketArrived ReleaseSemaphore failed {$} ", GetLastError());
  450. }
  451. //Thread_UnLock();
  452. //m_pPacketReceivedQue->UnLock();
  453. }
  454. bool LogicDevice::OnStartThread()
  455. {
  456. return true;
  457. }
  458. bool LogicDevice::OnEndThread()
  459. {
  460. return true;
  461. }
  462. //单一发送线程,暂时未用
  463. DWORD LogicDevice::Thread_Publish_Thread(void* pPara)
  464. {
  465. INT ret = 0;
  466. int waitevent = 0;
  467. LogicDevice* handle = (LogicDevice*)pPara;
  468. //prev work usleep(30000);
  469. ////mLog::FINFO("Thread Start [{$}]", handle->m_strClientID);
  470. while (!handle->m_ExitFlag->Wait(1))
  471. {
  472. //do work
  473. ResDataObject resSend;
  474. DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100);
  475. if (wait != WAIT_OBJECT_0)
  476. continue;
  477. bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend);
  478. if (bHasPacket)
  479. {
  480. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend);
  481. PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend);
  482. ////mLog::FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd);
  483. if (PACKET_CMD_NONE == cmd)
  484. {
  485. ////mLog::FDEBUG(" [{$}] Publish what packet {$} ", handle->m_strClientID, resSend.encode());
  486. }
  487. if (type == PACKET_TYPE_NOTIFY)
  488. {
  489. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  490. PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle);
  491. ////mLog::FINFO("Notify Transaction: {$}", handle->m_strCurTransaction);
  492. PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction);
  493. PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion);
  494. ////mLog::FDEBUG("Notify: {$}", resSend.encode());
  495. //printf("--> %s \n", pCmd->encode());
  496. PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion);
  497. PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion);
  498. delete pHandle;
  499. }
  500. else
  501. {
  502. ResDataObject resTopic;
  503. PacketAnalizer::GetPacketTopic(&resSend, resTopic);
  504. //PacketAnalizer::GetPacketTopicGetPacketTopic
  505. //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic);
  506. ////mLog::FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic);
  507. PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion);
  508. }
  509. }
  510. }
  511. return 0;
  512. }
  513. RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  514. {
  515. ResDataObject req,reqParam;
  516. reqParam = pszGroup;
  517. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam);
  518. return InnerOpenClose(req, resRespons, true);
  519. }
  520. RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  521. {
  522. ResDataObject req, reqParam;
  523. reqParam = pszGroup;
  524. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam);
  525. return InnerOpenClose(req, resRespons, false);
  526. }
  527. /// <summary>
  528. ///
  529. /// </summary>
  530. /// <param name="pszDevUri"></param>
  531. /// <param name="pszProperty"></param>
  532. /// <param name="resRespons"></param>
  533. /// <returns></returns>
  534. RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons)
  535. {
  536. ResDataObject req,reqParam;
  537. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam);
  538. return Request(&req, &resRespons);
  539. }
  540. /// <summary>
  541. ///
  542. /// </summary>
  543. /// <param name="pszDevUri"></param>
  544. /// <param name="pszProperty"></param>
  545. /// <param name="pszValueSet"></param>
  546. /// <param name="resRespons"></param>
  547. /// <returns></returns>
  548. RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons)
  549. {
  550. ResDataObject req, reqParam;
  551. if (pszValueSet != nullptr && pszValueSet[0] != 0) {
  552. if (pszValueSet[0] == '{') {
  553. reqParam.decode(pszValueSet);
  554. }
  555. else {
  556. reqParam = pszValueSet;
  557. }
  558. }
  559. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam);
  560. return Request(&req, &resRespons);
  561. }
  562. /// <summary>
  563. ///
  564. /// </summary>
  565. /// <param name="pszDevUri"></param>
  566. /// <param name="pszProperty"></param>
  567. /// <param name="pszValueUpdate"></param>
  568. /// <param name="resRespons"></param>
  569. /// <returns></returns>
  570. RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons)
  571. {
  572. ResDataObject req, reqParam;
  573. if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) {
  574. if (pszValueUpdate[0] == '{') {
  575. reqParam.decode(pszValueUpdate);
  576. }
  577. else {
  578. reqParam = pszValueUpdate;
  579. }
  580. }
  581. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam);
  582. return Request(&req, &resRespons);
  583. }
  584. /// <summary>
  585. ///
  586. /// </summary>
  587. /// <param name="pszDevUri"></param>
  588. /// <param name="pszProperty"></param>
  589. /// <param name="pszValueAdd"></param>
  590. /// <param name="resRespons"></param>
  591. /// <returns></returns>
  592. RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons)
  593. {
  594. ResDataObject req, reqParam;
  595. if (pszValueAdd != nullptr && pszValueAdd[0] != 0) {
  596. if (pszValueAdd[0] == '{') {
  597. reqParam.decode(pszValueAdd);
  598. }
  599. else {
  600. reqParam = pszValueAdd;
  601. }
  602. }
  603. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam);
  604. return Request(&req, &resRespons);
  605. }
  606. /// <summary>
  607. ///
  608. /// </summary>
  609. /// <param name="pszDevUri"></param>
  610. /// <param name="pszProperty"></param>
  611. /// <param name="pszValueDel"></param>
  612. /// <param name="resRespons"></param>
  613. /// <returns></returns>
  614. RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons)
  615. {
  616. ResDataObject req, reqParam;
  617. if (pszValueDel != nullptr && pszValueDel[0] != 0) {
  618. if (pszValueDel[0] == '{') {
  619. reqParam.decode(pszValueDel);
  620. }
  621. else {
  622. reqParam = pszValueDel;
  623. }
  624. }
  625. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam);
  626. return Request(&req, &resRespons);
  627. }
  628. /// <summary>
  629. ///
  630. /// </summary>
  631. /// <param name="pszDevUri"></param>
  632. /// <param name="pszActionName"></param>
  633. /// <param name="pszParams"></param>
  634. /// <param name="resRespons"></param>
  635. /// <returns></returns>
  636. RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons)
  637. {
  638. ResDataObject req, reqParam;
  639. if (pszParams != nullptr && pszParams[0] != 0) {
  640. if (pszParams[0] == '{') {
  641. reqParam.decode(pszParams);
  642. }
  643. else {
  644. reqParam = pszParams;
  645. }
  646. }
  647. PacketAnalizer::MakeRequest(req,pszActionName , PACKET_CMD_EXE, &reqParam);
  648. ResDataObject resResult;
  649. RET_STATUS ret = Request(&req, &resResult);
  650. PacketAnalizer::GetPacketContext(&resResult, resRespons);
  651. return ret;
  652. }
  653. /// <summary>
  654. ///
  655. /// </summary>
  656. /// <param name="pszDevUri"></param>
  657. /// <param name="pszTopic"></param>
  658. /// <param name="pszMessageValue"></param>
  659. /// <param name="resRespons"></param>
  660. /// <returns></returns>
  661. RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons)
  662. {
  663. ResDataObject req, reqParam;
  664. if (pszMessageValue != nullptr && pszMessageValue[0] != 0) {
  665. if (pszMessageValue[0] == '{') {
  666. reqParam.decode(pszMessageValue);
  667. }
  668. else {
  669. reqParam = pszMessageValue;
  670. }
  671. }
  672. PacketAnalizer::MakeRequest(req,pszTopic , PACKET_CMD_MSG, &reqParam);
  673. return Request(&req, &resRespons);
  674. }
  675. RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose)
  676. {
  677. if (openOrClose)
  678. {
  679. ResDataObject resContext;
  680. GetDeviceResource(&resp);
  681. LogicDevice::GetDeviceResource(&resp);
  682. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  683. if (PacketAnalizer::GetPacketContext(&req, resContext))
  684. {
  685. //如果有上线参数
  686. if (resContext.GetFirstOf("Online") >= 0)
  687. {
  688. ////mLog::FINFO("Got Online Request {$}", resContext.encode());
  689. int idx = resContext.GetFirstOf("Online");
  690. //如果有上线参数
  691. if (idx >= 0)
  692. {
  693. do
  694. {
  695. string wsName = resContext[idx].encode();
  696. ////mLog::FINFO("SubscribeGroupActions [{$}] ", wsName);
  697. SubscribeGroupActions(wsName, true);
  698. idx = resContext.GetNextOf("Online", idx);
  699. } while (idx >= 0);
  700. }
  701. resp.update("Online", m_rsOnlineGroup);
  702. }
  703. }
  704. }
  705. else
  706. {
  707. ResDataObject context;
  708. if (PacketAnalizer::GetPacketContext(&req, context))
  709. {
  710. int idx = context.GetFirstOf("Offline");
  711. //如果有上线参数
  712. if (idx >= 0)
  713. {
  714. do
  715. {
  716. string wsName = context[idx].encode();
  717. SubscribeGroupActions(wsName, false);
  718. idx = context.GetNextOf("Offline", idx);
  719. } while (idx >= 0);
  720. }
  721. resp.update("Online", m_rsOnlineGroup);
  722. }
  723. }
  724. return RET_SUCCEED;
  725. }
  726. bool LogicDevice::Exec(void)
  727. {
  728. struct timespec ts;
  729. clock_gettime(CLOCK_REALTIME, &ts);
  730. ts.tv_sec += 3;
  731. // 等待退出信号(3秒超时)
  732. DWORD dwRet = 0;
  733. dwRet = m_ExitFlag->Wait(3);
  734. if (dwRet == WAIT_OBJECT_0)
  735. {
  736. return false;
  737. }
  738. // 等待请求信号(3秒超时)
  739. int ret = sem_timedwait(&m_SemphRequest, &ts);
  740. if (ret == 0) {
  741. ////mLog::FDEBUG("[{$}] Got Packet Event ", m_strClientID);
  742. ResDataObject req;
  743. bool got = true;
  744. do {
  745. got = m_pPacketReceivedQue->DeQueue(req);
  746. if (got) {
  747. m_dwLastPacket = GetTickCount();
  748. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
  749. std::string keystr = PacketAnalizer::GetPacketKey(&req);
  750. ////mLog::FINFO(" {$} Got Request key {$} transaction {$}",
  751. // m_strClientID, keystr, m_strCurTransaction);
  752. if (cmd == PACKET_CMD_OPEN) {
  753. ResDataObject resRes, resResponse;
  754. InnerOpenClose(req, resRes, true);
  755. PacketAnalizer::MakeOpenResponse(req, resResponse, resRes);
  756. resResponse.update("Online", m_rsOnlineGroup);
  757. PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction);
  758. ResDataObject resTopic;
  759. PacketAnalizer::GetContextTopic(&req, resTopic);
  760. PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str());
  761. if (keystr.substr(0, 4) == "CCOS") {
  762. PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str());
  763. }
  764. else {
  765. PacketAnalizer::UpdateDeviceNotifyResponse(resResponse,
  766. getLocalMachineId(),
  767. getLocalEbusId(),
  768. (uint64_t)getpid(),
  769. (uint64_t)m_pMqttConntion);
  770. }
  771. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  772. // 发送连接状态通知
  773. ResDataObject NotifyData;
  774. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE,
  775. "ConnectionStatus", "1");
  776. PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction);
  777. CmdFromLogicDev(&NotifyData);
  778. }
  779. else {
  780. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  781. ResDataObject resPacket;
  782. RET_STATUS retStatus = RET_FAILED;
  783. if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") {
  784. ResDataObject devRes;
  785. if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) {
  786. LogicDevice::GetDeviceResource(&devRes);
  787. PacketAnalizer::UpdatePacketContext(resPacket, devRes);
  788. }
  789. }
  790. else {
  791. retStatus = Request(&req, &resPacket);
  792. }
  793. PacketAnalizer::MakeRetCode(retStatus, &resPacket);
  794. PacketAnalizer::CloneTransaction(&req, &resPacket);
  795. ResDataObject resTopic;
  796. PacketAnalizer::GetContextTopic(&req, resTopic);
  797. PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str());
  798. PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
  799. }
  800. }
  801. } while (got);
  802. // 检查心跳(10分钟无数据发送心跳)
  803. if (GetTickCount() - m_dwLastPacket > 600000) {
  804. m_dwLastPacket = GetTickCount();
  805. ResDataObject heartBeat;
  806. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  807. "HeartBeat", m_strClientID.c_str());
  808. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  809. }
  810. return true;
  811. }
  812. // 检查心跳(即使没有请求)
  813. if (GetTickCount() - m_dwLastPacket > 600000) {
  814. m_dwLastPacket = GetTickCount();
  815. ResDataObject heartBeat;
  816. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  817. "HeartBeat", m_strClientID.c_str());
  818. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  819. }
  820. return true;
  821. }
  822. void SYSTEM_CALL LogicDevice::CompleteUnInit()
  823. {
  824. StopThread();
  825. }
  826. int LogicDevice::GetDevice_Thread_Priority()
  827. {
  828. return THREAD_PRIORITY_NONE;
  829. }
  830. RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  831. {
  832. //pDeviceResource->update("ClientType", DPC_UnitClient);
  833. GUID guid;
  834. string name;
  835. GetDeviceType(guid);
  836. guid_2_string(guid, name);
  837. pDeviceResource->update("DeviceType", name.c_str());
  838. //Get Unit Type (Unit GUID)
  839. if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
  840. {
  841. pDeviceResource->add("LogicDevInstance", m_pDevInstance);
  842. }
  843. ////
  844. size_t idx = (*pDeviceResource)["Attribute"].size();
  845. if (idx > 0)
  846. {
  847. int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
  848. if (erroridx < 0)
  849. {
  850. (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
  851. }
  852. else
  853. {
  854. (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
  855. }
  856. }
  857. else
  858. {
  859. ResDataObject Attribute;
  860. Attribute.add("ErrorList", *m_pResErrorList);
  861. pDeviceResource->add("Attribute", Attribute);
  862. }
  863. // (*pDeviceResource)["Action"].size();
  864. if (pDeviceResource->GetFirstOf("Action") < 0)
  865. {
  866. pDeviceResource->add("Action", m_Actions);
  867. }
  868. return RET_SUCCEED;
  869. }
  870. //notify from lower layer
  871. RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
  872. {
  873. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
  874. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  875. if (type == PACKET_TYPE_NOTIFY)
  876. {
  877. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  878. PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle);
  879. ////mLog::FDEBUG("Notify Transaction: {$}", m_strCurTransaction);
  880. PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction);
  881. ;
  882. //if (m_strEBusRoot.length() <= 0)
  883. //{
  884. // //////mLog::FWARN("EBusRoot is null");
  885. //}
  886. PacketAnalizer::UpdateDeviceNotifyResponse(
  887. *pCmd,
  888. getLocalMachineId(),
  889. getLocalEbusId(),
  890. static_cast<uint64_t>(getpid()), // Linux 上获取进程 ID
  891. reinterpret_cast<uint64_t>(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型
  892. );
  893. ////mLog::FDEBUG("[{$}] Notify: {$}", m_strClientID, pCmd->encode());
  894. //printf("--> %s \n", pCmd->encode());
  895. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str());
  896. PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  897. string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd);
  898. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str());
  899. PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion);
  900. if (m_strAbstractPath.length() > 0)
  901. {
  902. PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion);
  903. string realPath,devPath;
  904. devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length());
  905. for (int x = 0; x < m_rsOnlineGroup.size(); x++)
  906. {
  907. if ((int)m_rsOnlineGroup[x] == 1)
  908. {
  909. realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath;
  910. PublishAction(pCmd, realPath.c_str(), m_pMqttConntion);
  911. }
  912. }
  913. } //m_pPacketSendingQue->InQueue(*pCmd);
  914. delete pHandle;
  915. }
  916. return RET_SUCCEED;
  917. /*
  918. if (pCmd && m_pSysLogic)
  919. {
  920. return m_pSysLogic->CmdFromLogicDev(pCmd);
  921. }
  922. //put log here
  923. return RET_FAILED;*/
  924. }
  925. std::wstring mb2wc_a(const char* mbstr)
  926. {
  927. std::wstring strVal;
  928. // 获取输入字符串的长度
  929. size_t mbstr_len = strlen(mbstr);
  930. // 计算转换后宽字符字符串的长度
  931. size_t size = mbstr_len + 1; // 需要多一个字符来存储结尾的 '\0'
  932. wchar_t* wcstr = new wchar_t[size];
  933. if (wcstr)
  934. {
  935. memset(wcstr, 0, size * sizeof(wchar_t));
  936. // mbsrtowcs 返回转换后的字符数
  937. size_t ret = mbsrtowcs(wcstr, &mbstr, size, nullptr);
  938. if (ret != (size_t)-1) // mbsrtowcs 返回 (size_t)-1 表示错误
  939. {
  940. strVal = wcstr;
  941. }
  942. delete[] wcstr;
  943. }
  944. return strVal;
  945. }
  946. RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
  947. {
  948. string ResBase64, DesBase64;
  949. wstring wResUTF = ResInfo;
  950. wstring wDesUTF = Description;
  951. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  952. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  953. return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
  954. }
  955. RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  956. {
  957. string ResBase64,DesBase64;
  958. wstring wResUTF = mb2wc_a(ResInfo);
  959. wstring wDesUTF = mb2wc_a(Description);
  960. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  961. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  962. return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
  963. }
  964. RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  965. {
  966. //int ret = 1;
  967. if (Code == 0 || (string)ResInfo == "")
  968. {
  969. ////mLog::FERROR("Code or ResInfo is empty");
  970. return RET_FAILED;
  971. }
  972. MessageInfo info;
  973. info.CodeID = Code;
  974. info.Type = nMessageType;
  975. info.Level = Level;
  976. info.Resouceinfo = ResInfo;
  977. info.Description = Description;
  978. string strDevInstanceCode = DevInstance;
  979. strDevInstanceCode += Code;
  980. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  981. {
  982. string strInstancekey = m_pResErrorList->GetKey(i);
  983. if (strInstancekey == strDevInstanceCode)
  984. {
  985. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  986. //{
  987. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  988. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  989. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  990. // {
  991. //ret = 0;
  992. ////mLog::FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
  993. return RET_SUCCEED;
  994. // }
  995. //}
  996. //ret = 2;
  997. //break;
  998. }
  999. }
  1000. //if (ret==1)
  1001. {
  1002. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1003. //info.GetResDataObject(tempInfo);
  1004. //ErrorInfo.update(Code, tempInfo);
  1005. info.GetResDataObject(ErrorInfo);
  1006. struct timeval tv;
  1007. struct tm* tm_info;
  1008. char TimeTag[30];
  1009. // 获取当前时间
  1010. gettimeofday(&tv, NULL);
  1011. // 转换为本地时间
  1012. tm_info = localtime(&tv.tv_sec);
  1013. // 格式化时间为字符串
  1014. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1015. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1016. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1017. ErrorInfo.add("CreationTime", TimeTag);
  1018. ErrorInfo.add("AppId", pAppId);
  1019. ErrorInfo.add("InstanceId", DevInstance);
  1020. if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1021. {
  1022. m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
  1023. }
  1024. ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
  1025. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1026. ////mLog::FWARN( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
  1027. CmdFromLogicDev(&NotifyData);
  1028. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1029. }
  1030. //else if (ret == 2)
  1031. //{
  1032. // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
  1033. // info.GetResDataObject(tempInfo);
  1034. // ErrorInfo.update(Code, tempInfo);
  1035. // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1036. // {
  1037. // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
  1038. // }
  1039. // ResNotify.update(DevInstance, ErrorInfo);
  1040. // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1041. // RES_////mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
  1042. // CmdFromLogicDev(&NotifyData);
  1043. //}
  1044. //put log here
  1045. return RET_SUCCEED;
  1046. }
  1047. RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
  1048. {
  1049. AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
  1050. //put log here
  1051. return RET_FAILED;
  1052. }
  1053. RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
  1054. {
  1055. int index = -1;
  1056. bool m_bClearAll = false;
  1057. //trim empty code string
  1058. string CodeStr = Code;
  1059. if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
  1060. {
  1061. CodeStr = "";
  1062. Code = CodeStr.c_str();
  1063. m_bClearAll = true;
  1064. }
  1065. //if ((string)Code == "0" || (string)Code == "")
  1066. //{
  1067. // m_bClearAll = true;
  1068. //}
  1069. string strDevInstanceCode = DevInstance;
  1070. strDevInstanceCode += Code;
  1071. if (m_bClearAll)
  1072. {
  1073. m_pResErrorList->clear();
  1074. }
  1075. else
  1076. {
  1077. MessageInfo info;
  1078. info.CodeID = Code;
  1079. info.Type = nMessageType;
  1080. info.Level = Level;
  1081. info.Resouceinfo = ResInfo;
  1082. info.Description = Description;
  1083. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1084. {
  1085. string strInstancekey = m_pResErrorList->GetKey(i);
  1086. if (strInstancekey == strDevInstanceCode)
  1087. {
  1088. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1089. //{
  1090. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1091. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1092. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1093. // {
  1094. // index = (INT)j;
  1095. // break;
  1096. // }
  1097. //}
  1098. index = (int)i;
  1099. break;
  1100. }
  1101. }
  1102. }
  1103. MessageInfo info;
  1104. info.CodeID = Code;
  1105. info.Type = nMessageType;
  1106. info.Level = Level;
  1107. info.Resouceinfo = ResInfo;
  1108. info.Description = Description;
  1109. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1110. //info.GetResDataObject(tempInfo);
  1111. //ErrorInfo.add(Code, tempInfo);
  1112. info.GetResDataObject(ErrorInfo);
  1113. ErrorInfo.add("InstanceId", DevInstance);
  1114. bool result = false;
  1115. if (index>=0)
  1116. {
  1117. //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
  1118. result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
  1119. }
  1120. if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
  1121. {
  1122. ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
  1123. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
  1124. ////mLog::FWARN( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
  1125. CmdFromLogicDev(&NotifyData);
  1126. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1127. }
  1128. return RET_SUCCEED;
  1129. }
  1130. RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
  1131. {
  1132. DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
  1133. //put log here
  1134. return RET_FAILED;
  1135. }
  1136. RET_STATUS LogicDevice::EvtProcedure()
  1137. {
  1138. return RET_FAILED;
  1139. }
  1140. bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
  1141. {
  1142. ////mLog::FERROR("NOT FINISHED YET");
  1143. return false;
  1144. }
  1145. RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
  1146. {
  1147. std::string strResult = "";
  1148. //组Context包
  1149. //if (m_pLogger)
  1150. {
  1151. wstring wContect = mb2wc_a(pContext);
  1152. CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
  1153. }
  1154. //Thread_Lock();
  1155. //if (NULL != fmt)
  1156. //{
  1157. // va_list marker = NULL;
  1158. // va_start(marker, fmt);
  1159. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1160. // std::vector<char> vBuffer(nLength, '\0');
  1161. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1162. // if (nWritten > 0)
  1163. // {
  1164. // strResult = &vBuffer[0];
  1165. // }
  1166. // va_end(marker);
  1167. //}
  1168. //Thread_UnLock();
  1169. ResDataObject SysLogNode;
  1170. string guidstr;
  1171. GUID DeviceGuid;
  1172. //组Log包
  1173. if (GetDeviceType(DeviceGuid))
  1174. {
  1175. guid_2_string(DeviceGuid, guidstr);
  1176. SysLogNode.add("Module", guidstr.c_str());
  1177. SysLogNode.add("AppId", pAppId);
  1178. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1179. if (pCode)
  1180. {
  1181. SysLogNode.add("BusinessKey", pCode);
  1182. }
  1183. else
  1184. {
  1185. SysLogNode.add("BusinessKey", "");
  1186. }
  1187. SysLogNode.add("IP", (const char*)getLocalIpAddress());
  1188. struct timeval tv;
  1189. struct tm* tm_info;
  1190. char TimeTag[30];
  1191. // 获取当前时间
  1192. gettimeofday(&tv, NULL);
  1193. // 转换为本地时间
  1194. tm_info = localtime(&tv.tv_sec);
  1195. // 格式化时间为字符串
  1196. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1197. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1198. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1199. SysLogNode.add("CreationTime", TimeTag);
  1200. string strLevel = SysLogLevel2str(Level);
  1201. SysLogNode.add("Level", strLevel.c_str());
  1202. SysLogNode.add("HostName", (const char*)getLocalMachineId());
  1203. SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
  1204. SysLogNode.add("FreeText", strResult.c_str());
  1205. SysLogNode.add("Context", pCode);
  1206. ResDataObject NotifyData;
  1207. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1208. CmdFromLogicDev(&NotifyData);
  1209. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1210. }
  1211. else
  1212. {
  1213. ////mLog::FWARN("no Guid??");
  1214. return RET_FAILED;
  1215. }
  1216. //打印LOG
  1217. switch (Level)
  1218. {
  1219. case Syslog_Debug:
  1220. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1221. ////mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
  1222. break;
  1223. case Syslog_Information:
  1224. ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1225. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1226. break;
  1227. case Syslog_Warning:
  1228. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1229. ////mLog::Warn("SysLog {$} ", SysLogNode.encode());
  1230. break;
  1231. case Syslog_Error:
  1232. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1233. ////mLog::FERROR("SysLog {$} ", SysLogNode.encode());
  1234. break;
  1235. case Syslog_Fatal:
  1236. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1237. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1238. break;
  1239. default:
  1240. ////mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
  1241. break;
  1242. }
  1243. return RET_SUCCEED;
  1244. }
  1245. RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
  1246. {
  1247. std::string strResult = "";
  1248. //组Context包
  1249. //if (m_pLogger)
  1250. //{
  1251. // m_pLogger->Thread_Lock();
  1252. // if (NULL != fmt)
  1253. // {
  1254. // va_list marker = NULL;
  1255. // va_start(marker, fmt);
  1256. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1257. // std::vector<char> vBuffer(nLength, '\0');
  1258. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1259. // if (nWritten > 0)
  1260. // {
  1261. // strResult = &vBuffer[0];
  1262. // }
  1263. // va_end(marker);
  1264. // }
  1265. // m_pLogger->Thread_UnLock();
  1266. //}
  1267. //else
  1268. {
  1269. Thread_Lock();
  1270. if (fmt != NULL) {
  1271. va_list marker;
  1272. va_start(marker, fmt);
  1273. // 获取格式化字符串所需的大小(不包括终止符)
  1274. int nLength = vsnprintf(NULL, 0, fmt, marker);
  1275. if (nLength >= 0) {
  1276. std::vector<char> vBuffer(nLength + 1, '\0'); // +1 为终止符
  1277. vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker);
  1278. strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串
  1279. }
  1280. va_end(marker);
  1281. }
  1282. Thread_UnLock();
  1283. }
  1284. ResDataObject SysLogNode;
  1285. string guidstr;
  1286. GUID DeviceGuid;
  1287. //组Log包
  1288. if (GetDeviceType(DeviceGuid))
  1289. {
  1290. guid_2_string(DeviceGuid, guidstr);
  1291. SysLogNode.add("Module", guidstr.c_str());
  1292. SysLogNode.add("AppId", "");
  1293. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1294. if (pCode)
  1295. {
  1296. SysLogNode.add("BusinessKey", pCode);
  1297. }
  1298. else
  1299. {
  1300. SysLogNode.add("BusinessKey", "");
  1301. }
  1302. SysLogNode.add("IP", (const char *)getLocalIpAddress());
  1303. struct timeval tv;
  1304. struct tm* tm_info;
  1305. char TimeTag[30];
  1306. // 获取当前时间
  1307. gettimeofday(&tv, NULL);
  1308. // 转换为本地时间
  1309. tm_info = localtime(&tv.tv_sec);
  1310. // 格式化时间为字符串
  1311. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1312. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1313. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1314. SysLogNode.add("CreationTime", TimeTag);
  1315. SysLogNode.add("Level", Level);
  1316. SysLogNode.add("HostName", (const char *)getLocalMachineId());
  1317. SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
  1318. SysLogNode.add("FreeText", strResult.c_str());
  1319. ResDataObject NotifyData;
  1320. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1321. CmdFromLogicDev(&NotifyData);
  1322. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1323. if(m_pParent != nullptr)
  1324. NotifyParent(&NotifyData, "/Notify");
  1325. }
  1326. else
  1327. {
  1328. ////mLog::FERROR("no Guid??");
  1329. return RET_FAILED;
  1330. }
  1331. //打印LOG
  1332. switch (Level)
  1333. {
  1334. case Syslog_Debug:
  1335. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1336. ////mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
  1337. break;
  1338. case Syslog_Information:
  1339. //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1340. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1341. break;
  1342. case Syslog_Warning:
  1343. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1344. ////mLog::Warn("SysLog {$}", SysLogNode.encode());
  1345. break;
  1346. case Syslog_Error:
  1347. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1348. ////mLog::FERROR("SysLog {$}", SysLogNode.encode());
  1349. break;
  1350. case Syslog_Fatal:
  1351. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1352. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1353. break;
  1354. default:
  1355. ////mLog::FINFO( "SysLog {$}", SysLogNode.encode());
  1356. break;
  1357. }
  1358. return RET_SUCCEED;
  1359. }
  1360. /////////////////////////////////////////////////////////////////////////////
  1361. ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  1362. {
  1363. return nullptr;
  1364. }
  1365. std::string CurrentDateTime()
  1366. {
  1367. auto now = std::chrono::system_clock::now();
  1368. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  1369. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  1370. std::tm now_tm{};
  1371. localtime_r(&now_time_t, &now_tm); // 线程安全版本
  1372. char buf[64];
  1373. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm);
  1374. std::string now_time_str = buf;
  1375. // 格式化微秒部分并追加
  1376. snprintf(buf, sizeof(buf), ".%06lld", static_cast<long long>(now_us.count() % 1000000));
  1377. now_time_str += buf;
  1378. return now_time_str;
  1379. }
  1380. static string CurrentDateTime2()
  1381. {
  1382. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  1383. char buf[100] = { 0 };
  1384. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  1385. return buf;
  1386. }
  1387. bool LogicDevice::GetActions(ResDataObject& resAction)
  1388. {
  1389. for (int x = 0; x < m_Actions.size(); x++)
  1390. {
  1391. resAction.update(m_Actions.GetKey(x), "");
  1392. }
  1393. return true;
  1394. }
  1395. void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
  1396. {
  1397. if (m_pParent != nullptr)
  1398. {
  1399. if (hConnection == nullptr)
  1400. {
  1401. hConnection = m_pParent->m_pMqttConntion;
  1402. }
  1403. PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
  1404. }
  1405. }
  1406. const mqtt_qos_t MQTT_QOS = QOS2;
  1407. /// <summary>
  1408. /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector
  1409. /// CCOS/Demo/Detector
  1410. /// </summary>
  1411. /// <param name="bOnline"></param>
  1412. void LogicDevice::SubscribeGroupActions(string wsName, int bOnline)
  1413. {
  1414. if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0)
  1415. {
  1416. int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()];
  1417. if (bOnline == lastOnline)
  1418. {
  1419. ////mLog::FWARN("Group Abstract is already ? Online {$}", bOnline);
  1420. return;
  1421. }
  1422. }
  1423. if ( m_strAbstractPath.length() > 0)
  1424. {
  1425. std::string pszAction, gpPath;
  1426. int ret = 0;
  1427. int num = m_Actions.size();
  1428. ////mLog::FINFO("Group Device onLine or offLine {$}", bOnline);
  1429. gpPath = m_strAbstractPath;
  1430. gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName);
  1431. pszAction = gpPath + m_strDevicePath;
  1432. pszAction += "/Action/+";
  1433. SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1434. if (bOnline)
  1435. {
  1436. ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1437. }
  1438. else
  1439. {
  1440. ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1441. }
  1442. //for (size_t i = 0; i < num; i++)
  1443. //{
  1444. // pszAction = gpPath + m_strDevicePath;
  1445. // pszAction += "/Action/";
  1446. // pszAction += (m_Actions.GetKey(i));
  1447. // SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1448. // if (bOnline)
  1449. // {
  1450. // ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1451. // }
  1452. // else
  1453. // {
  1454. // ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1455. // }
  1456. //}
  1457. m_rsOnlineGroup.update(wsName.c_str(), bOnline);
  1458. }
  1459. else
  1460. {
  1461. ////mLog::FWARN("try Online but AbstractPath is none {$}", m_strClientID);
  1462. }
  1463. }
  1464. void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase)
  1465. {
  1466. string pszAction = devuribBase ;
  1467. pszAction += "/Get/+";
  1468. SubscribeTopic(pconn, pszAction.c_str());
  1469. pszAction = devuribBase;
  1470. pszAction += "/Update/+";
  1471. SubscribeTopic(pconn, pszAction.c_str());
  1472. pszAction = devuribBase;
  1473. pszAction += "/Set/+";
  1474. SubscribeTopic(pconn, pszAction.c_str());
  1475. pszAction = devuribBase;
  1476. pszAction += "/Add/+";
  1477. SubscribeTopic(pconn, pszAction.c_str());
  1478. pszAction = devuribBase;
  1479. pszAction += "/Del/+";
  1480. SubscribeTopic(pconn, pszAction.c_str());
  1481. pszAction = devuribBase;
  1482. pszAction += "/Action/+";
  1483. SubscribeTopic(pconn, pszAction.c_str());
  1484. pszAction = devuribBase;
  1485. pszAction += "/Message/+";
  1486. SubscribeTopic(pconn, pszAction.c_str());
  1487. }
  1488. void LogicDevice::SubscribeActions()
  1489. {
  1490. ////mLog::FINFO("Begain");
  1491. if (nullptr == m_pMqttConntion)
  1492. return;
  1493. int num = m_Actions.size();
  1494. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
  1495. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
  1496. {
  1497. ResDataObject res,resAction;
  1498. GetDeviceResource(&res);
  1499. std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
  1500. resAction = res["Action"];
  1501. ////mLog::FINFO("Actions from deviceresource [{$}]", resAction.size());
  1502. for (int x = 0; x < resAction.size(); x++)
  1503. {
  1504. string action = (string)resAction.GetKey(x);
  1505. if (action.length() > 0)
  1506. {
  1507. ////mLog::FINFO("Action from DeviceResource [{$}]", action);
  1508. m_Actions.update(action.c_str(), "");
  1509. }
  1510. }
  1511. num = m_Actions.size();
  1512. }
  1513. std::string pszAction;
  1514. int ret = 0;
  1515. SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str());
  1516. ////mLog::FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1517. if (m_strEBusRoot != m_strCCOSDevicePath)
  1518. {
  1519. pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1520. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1521. ////mLog::FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1522. }
  1523. if (m_strAbstractPath.length() > 0)
  1524. {
  1525. pszAction = m_strAbstractPath + m_strDevicePath;
  1526. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1527. ////mLog::FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1528. }
  1529. //for (size_t i = 0; i < num; i++)
  1530. //{
  1531. // //订阅ebus的路径
  1532. // pszAction = m_strEBusRoot;
  1533. // pszAction += "/Action/";
  1534. // pszAction += (m_Actions.GetKey(i));
  1535. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1536. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1537. //
  1538. // //订阅CCOS设备路径
  1539. // if (m_strEBusRoot != m_strCCOSDevicePath)
  1540. // {
  1541. // pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1542. // pszAction += "/Action/";
  1543. // pszAction += (m_Actions.GetKey(i));
  1544. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1545. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1546. // }
  1547. // //抽象设备路径要一直存在
  1548. // //订阅抽象设备路径
  1549. // if ( m_strAbstractPath.length() > 0)
  1550. // {
  1551. // pszAction = m_strAbstractPath + m_strDevicePath;
  1552. // pszAction += "/Action/";
  1553. // pszAction += (m_Actions.GetKey(i));
  1554. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1555. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1556. // }
  1557. //}
  1558. //pszAction = m_strEBusRoot;
  1559. //pszAction += "/Action/UpdateDeviceResource";
  1560. ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1561. //pTopicList->push_back(pszAction);
  1562. //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1563. //////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1564. //*/
  1565. }
  1566. /*
  1567. void connlost(void* context, char* cause)
  1568. {
  1569. //连接中断
  1570. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1571. std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
  1572. printf("\nConnection lost\n");
  1573. printf(" cause: %s\n", cause);
  1574. std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
  1575. }
  1576. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  1577. {
  1578. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1579. std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
  1580. }
  1581. */
  1582. //发送成功
  1583. //void delivered(void* context, MQTTAsync_token dt)
  1584. //{
  1585. // printf("Message with token value %d delivery confirmed\n", dt);
  1586. // //deliveredtoken = dt;
  1587. //}
  1588. //void connlost(void* context, char* cause)
  1589. //{
  1590. // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
  1591. // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
  1592. //
  1593. // //连接中断
  1594. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1595. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1596. // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
  1597. // return;
  1598. //
  1599. // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1600. // int rc;
  1601. //
  1602. // printf("Reconnecting\n");
  1603. // conn_opts.keepAliveInterval = 20;
  1604. // conn_opts.cleansession = 1;
  1605. // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  1606. // {
  1607. // printf("Failed to start connect, return code %d\n", rc);
  1608. // //finished = 1;
  1609. // }
  1610. //}
  1611. //重连成功
  1612. //void onReconnected(void* context, char* cause)
  1613. //{
  1614. // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
  1615. // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
  1616. //
  1617. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1618. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1619. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1620. // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1621. // const char* client_id = std::get<CLINET_ID_ID>(*connection);
  1622. // int rc;
  1623. //
  1624. // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
  1625. // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
  1626. // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
  1627. // // 重新订阅,暂不重新订阅,看看
  1628. //
  1629. // ///*
  1630. // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1631. // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1632. // //opts.onSuccess = onSubscribe;
  1633. // //opts.onFailure = onSubscribeFailure;
  1634. // //opts.context = client;
  1635. // auto it = pTopicList->begin();
  1636. // while(it != pTopicList->end())
  1637. // {
  1638. // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
  1639. // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
  1640. // it++;
  1641. // }
  1642. // //*/
  1643. //
  1644. //}
  1645. //
  1646. ////MQTT 连接主动断开连接失败
  1647. //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1648. //{
  1649. // printf("Disconnect failed, rc %d\n", response->code);
  1650. // //disc_finished = 1;
  1651. //}
  1652. //
  1653. ////MQTT 连接主动断开连接成功
  1654. //void onDisconnect(void* context, MQTTAsync_successData* response)
  1655. //{
  1656. // //printf("Successful disconnection\n");
  1657. // //disc_finished = 1;
  1658. //}
  1659. //
  1660. //void onSubscribe(void* context, MQTTAsync_successData* response)
  1661. //{
  1662. // printf("Subscribe succeeded\n");
  1663. // //subscribed = 1;
  1664. //}
  1665. //
  1666. //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1667. //{
  1668. // printf("Subscribe failed, rc %d\n", response->code);
  1669. // //finished = 1;
  1670. //}
  1671. //
  1672. //
  1673. //void onConnectFailure(void* context, MQTTAsync_failureData* response)
  1674. //{
  1675. // printf("Connect failed, rc %d\n", response->code);
  1676. // //finished = 1;
  1677. //}
  1678. //void onConnect(void* context, MQTTAsync_successData* response)
  1679. //{
  1680. //
  1681. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1682. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1683. //
  1684. // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
  1685. //
  1686. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1687. // int rc;
  1688. //
  1689. // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
  1690. // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
  1691. // if (hConnected != NULL)
  1692. // SetEvent(hConnected);
  1693. //
  1694. // /*
  1695. // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1696. // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1697. // opts.onSuccess = onSubscribe;
  1698. // opts.onFailure = onSubscribeFailure;
  1699. // opts.context = client;
  1700. // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  1701. // {
  1702. // printf("Failed to start subscribe, return code %d\n", rc);
  1703. // finished = 1;
  1704. // }*/
  1705. //}
  1706. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  1707. int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS);
  1708. void resubscribe_topic(void* client, void* reconnect_date)
  1709. {
  1710. mqtt_client* pClient = (mqtt_client*)client;
  1711. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1712. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1713. string client_id = std::get<CLINET_ID_ID>(*connection);
  1714. ////mLog::FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size());
  1715. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1716. pLock->Thread_Lock();
  1717. for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void {
  1718. ////mLog::FWARN("Resubscribe {$}", str);
  1719. mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd);
  1720. });
  1721. pLock->Thread_UnLock();
  1722. }
  1723. //MQTT消息抵达
  1724. //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1725. void msgarrivd(void* client, message_data_t* message)
  1726. {
  1727. if (client == nullptr)
  1728. {
  1729. //printf("************************ somthing happend...");
  1730. return;
  1731. }
  1732. //消息抵达
  1733. ResDataObject req;
  1734. string topic = strlen( message->topic_name) > sizeof(message->topic_name)-1 ? string(message->topic_name, sizeof(message->topic_name) - 1) : message->topic_name; //topicName; //msg->get_topic().c_str();
  1735. mqtt_client* pClient = (mqtt_client*)client;
  1736. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1737. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1738. if (pLock == nullptr)
  1739. return;
  1740. pLock->Thread_Lock();
  1741. //再次取Lock,如果发现是空了,什么都不干
  1742. CcosLock* pLockTemp = std::get<CONN_SEND_LOCK_ID>(*connection);
  1743. if (pLockTemp == nullptr)
  1744. {
  1745. pLock->Thread_UnLock();
  1746. return;
  1747. }
  1748. string client_id = std::get<CLINET_ID_ID>(*connection);
  1749. void* pc = std::get<MQTT_CLT_ID>(*connection);
  1750. if (pc == nullptr)
  1751. {
  1752. //printf("mqtt is closed now..\n");
  1753. pLock->Thread_UnLock();
  1754. return;
  1755. }
  1756. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl;
  1757. //std::cout << " msg Body " << payload << endl;
  1758. if ((topic.length() >= MQTT_TOPIC_LEN_MAX && message->message->payloadlen > 16380) || message->message->payloadlen > 512*1024)
  1759. {
  1760. //std::string payloads((const char*)message->message->payload, 128);
  1761. ////mLog::FWARN("TID {$} : {$} Get Bad Msg Len[{$}] from {$}", GetCurrentThreadId(), client_id, message->message->payloadlen, topic.substr(0, 255));
  1762. pLock->Thread_UnLock();
  1763. return;
  1764. }
  1765. if((int)message->message->payloadlen > 16380)
  1766. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get big Msg from " << topic << " msg Len " << message->message->payloadlen << /*" msg Body " << payload <<*/ endl;
  1767. ////mLog::FWARN("TID {$} : {$} Get big Msg from {$} msg Len {$}", GetCurrentThreadId(), client_id, topic, message->message->payloadlen);
  1768. ////mLog::FDEBUG("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload.substr(0, 1024));
  1769. const char* pclient_id = client_id.c_str();
  1770. try
  1771. {
  1772. std::string strPayload((const char*)message->message->payload, message->message->payloadlen);
  1773. req.decode(strPayload.c_str());
  1774. }
  1775. catch (...)
  1776. {
  1777. pLock->Thread_UnLock();
  1778. ////mLog::FWARN("Decode Exception.. {$}", payload);
  1779. return;
  1780. }
  1781. //取消息钩子
  1782. void* pHook = std::get<MSG_HOOK_ID>(*connection);
  1783. if (pHook != nullptr) {
  1784. ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
  1785. ccos_mqtt_msg_filter_func func = std::get<FILTER_FUNC_ID>(*filter);
  1786. //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1787. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl;
  1788. ////mLog::FDEBUG("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic);
  1789. //消息钩子函数存在
  1790. ResDataObject *pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
  1791. for(int x=0;x<pRequests->size();x++) {
  1792. //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc;
  1793. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl;
  1794. ////mLog::FDEBUG("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic);
  1795. string keyAction = pRequests->GetKey(x);
  1796. ResDataObject resObj = (*pRequests)[x];
  1797. std::shared_ptr<LinuxEvent> hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1798. if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES && keyAction == PacketAnalizer::GetPacketKey(&req)) {
  1799. //勾住了,是该钩子的消息,则通知
  1800. //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1801. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl;
  1802. hEvent = LinuxEvent::OpenEvent(string(resObj).c_str());
  1803. ////mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} Notify Event [{$}] Handle [{$}]", GetCurrentThreadId(), client_id, topic, string(resObj), hEvent);
  1804. //PacketAnalizer::GetPacketContext(&req, *resObj);
  1805. ResDataObject* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
  1806. pResponse->add(keyAction.c_str(), req);
  1807. if (!hEvent) {
  1808. std::cerr << "Failed to open shared event" << std::endl;
  1809. }
  1810. else {
  1811. hEvent->SetEvent();
  1812. }
  1813. //CloseHandle(hEvent);
  1814. pRequests->eraseOneOf(keyAction.c_str(), 0);
  1815. pLock->Thread_UnLock();
  1816. //MQTTAsync_freeMessage(&message);
  1817. //MQTTAsync_free(topicName);
  1818. return ;
  1819. }
  1820. }
  1821. }
  1822. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl;
  1823. pLock->Thread_UnLock();
  1824. ccos_mqtt_callback onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
  1825. if (onmsg != nullptr)
  1826. (onmsg)(&req, topic.c_str(), connection);
  1827. else
  1828. {
  1829. ////mLog::FWARN("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get<CLINET_ID_ID>(*connection), topic);
  1830. cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get<CLINET_ID_ID>(*connection) << " When processing " << topic << endl;
  1831. }
  1832. //MQTTAsync_freeMessage(&message);
  1833. //MQTTAsync_free(topicName);
  1834. return ;
  1835. }
  1836. void* MqttSendThreadFunc(void* pPara)
  1837. {
  1838. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara;
  1839. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1840. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*connection);
  1841. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1842. std::string client_id = std::get<CLINET_ID_ID>(*connection);
  1843. sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
  1844. // 线程启动日志
  1845. std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
  1846. while (true) {
  1847. struct timespec ts;
  1848. clock_gettime(CLOCK_REALTIME, &ts);
  1849. ts.tv_sec += 1;
  1850. // 等待信号量前的日志
  1851. //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
  1852. if (sem_timedwait(hSemaphore, &ts) == 0) {
  1853. std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
  1854. pLock->Thread_Lock();
  1855. std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
  1856. if (pList->empty()) {
  1857. std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
  1858. pLock->Thread_UnLock();
  1859. continue;
  1860. }
  1861. ResDataObject* pMsg = pList->front();
  1862. pList->pop_front();
  1863. std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
  1864. for (int x = 0; x < pMsg->size(); x++) {
  1865. const char* pTopic = pMsg->GetKey(x);
  1866. std::string message = (std::string)(*pMsg)[pTopic];
  1867. std::cout << "[" << client_id << "] Prepare to publish messages to the topic: " << pTopic
  1868. << ", Message length: " << message.length() << "Byte" << std::endl;
  1869. mqtt_message_t msg;
  1870. memset(&msg, 0, sizeof(msg));
  1871. msg.payload = (void*)message.c_str();
  1872. msg.payloadlen = message.length();
  1873. msg.qos = QOS1;
  1874. // 发布消息
  1875. int publishResult = mqtt_publish(pConn, pTopic, &msg);
  1876. if (publishResult == 0) {
  1877. std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl;
  1878. }
  1879. else {
  1880. std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic
  1881. << ", Error code: " << publishResult << std::endl;
  1882. }
  1883. }
  1884. delete pMsg;
  1885. std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
  1886. pLock->Thread_UnLock();
  1887. std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
  1888. }
  1889. else if (errno == ETIMEDOUT) {
  1890. //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
  1891. }
  1892. else {
  1893. // 其他错误情况
  1894. std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
  1895. }
  1896. }
  1897. return NULL;
  1898. }
  1899. // 安全遍历MQTT消息处理程序的宏
  1900. #define mqtt_list_for_each_entry_safe(pos, n, head, member) \
  1901. for (pos = container_of((head)->next, typeof(*pos), member), \
  1902. n = container_of(pos->member.next, typeof(*pos), member); \
  1903. &pos->member != (head); \
  1904. pos = n, n = container_of(n->member.next, typeof(*n), member))
  1905. // 通过成员指针获取包含结构的指针
  1906. #define container_of(ptr, type, member) ({ \
  1907. const typeof(((type*)0)->member)* __mptr = (ptr); \
  1908. (type*)((char*)__mptr - offsetof(type, member)); })
  1909. // 重连回调函数 - 实现自动重订阅
  1910. static void reconnect_handler(void* client, void* reconnect_data) {
  1911. mqtt_client_t* c = (mqtt_client_t*)client;
  1912. message_handlers_t* pos, * n;
  1913. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context;
  1914. // 遍历所有已订阅的主题并重新订阅
  1915. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1916. pLock->Thread_Lock();
  1917. mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) {
  1918. mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler);
  1919. }
  1920. pLock->Thread_UnLock();
  1921. }
  1922. mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
  1923. std::cout << "[MQTT] Acquiring MQTT client instance..." << std::endl;
  1924. mqtt_client_t* pMqttClient = mqtt_lease();
  1925. if (!pMqttClient) {
  1926. std::cerr << "[ERROR] Failed to acquire MQTT client instance" << std::endl;
  1927. return nullptr;
  1928. }
  1929. std::cout << "[SUCCESS] MQTT client acquired: " << pMqttClient << std::endl;
  1930. // 确保 connection 指针有效(避免后续访问悬空指针)
  1931. if (!connection) {
  1932. std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl;
  1933. mqtt_release(pMqttClient); // 释放客户端
  1934. return nullptr;
  1935. }
  1936. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  1937. pMqttClient->mqtt_conn_context = connection;
  1938. const char* pszClientID = std::get<CLINET_ID_ID>(*connection);
  1939. std::cout << "[CONFIG] Setting connection parameters:"
  1940. << "\n\tClient ID: " << (pszClientID ? pszClientID : "null")
  1941. << "\n\tHost: " << SERVER_ADDRESS
  1942. << "\n\tPort: 1883"
  1943. << std::endl;
  1944. // 设置MQTT连接参数
  1945. std::string host_str = SERVER_ADDRESS;
  1946. char* host_buf = new char[host_str.size() + 1];
  1947. strncpy(host_buf, host_str.c_str(), host_str.size() + 1);
  1948. mqtt_set_host(pMqttClient, host_buf);
  1949. char port_buf[6] = "1883";
  1950. uint8_t clean_session = 1;
  1951. uint16_t keep_alive = 50;
  1952. mqtt_set_port(pMqttClient, port_buf);
  1953. mqtt_set_user_name(pMqttClient, const_cast<char*>("zskkcc"));
  1954. mqtt_set_password(pMqttClient, const_cast<char*>("zskk1234"));
  1955. mqtt_set_client_id(pMqttClient, (char*)pszClientID);
  1956. mqtt_set_clean_session(pMqttClient, 0);
  1957. mqtt_set_keep_alive_interval(pMqttClient, keep_alive);
  1958. mqtt_set_cmd_timeout(pMqttClient, 5000);
  1959. mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1960. mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  1961. // 设置重连回调(替代mqtt_set_resubscribe_handler)
  1962. //mqtt_subscribe();
  1963. mqtt_set_reconnect_handler(pMqttClient, reconnect_handler);
  1964. int rc = 0;
  1965. uint64_t start_time = GetTickCount();
  1966. const uint64_t timeout_ms = 10000; // 10秒超时
  1967. int attempt_count = 0;
  1968. std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl;
  1969. try {
  1970. while (true) {
  1971. attempt_count++;
  1972. std::cout << "[ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
  1973. rc = mqtt_connect(pMqttClient);
  1974. if (rc != MQTT_SUCCESS_ERROR) {
  1975. uint64_t elapsed = GetTickCount() - start_time;
  1976. std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl;
  1977. if (elapsed > timeout_ms) {
  1978. // 超时处理
  1979. std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl;
  1980. delete[] host_buf;
  1981. mqtt_release(pMqttClient);
  1982. return nullptr;
  1983. }
  1984. // 计算剩余时间
  1985. uint64_t remaining = timeout_ms - elapsed;
  1986. uint64_t wait_time = (remaining > 2000) ? 2000 : remaining;
  1987. std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl;
  1988. usleep(wait_time * 1000);
  1989. }
  1990. else {
  1991. uint64_t total_time = GetTickCount() - start_time;
  1992. std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl;
  1993. break;
  1994. }
  1995. }
  1996. }
  1997. catch (const std::exception& e) {
  1998. std::cerr << "[ERROR] " << e.what() << std::endl;
  1999. delete[] host_buf;
  2000. mqtt_release(pMqttClient);
  2001. return nullptr;
  2002. }
  2003. return pMqttClient;
  2004. }
  2005. ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
  2006. {
  2007. ////mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
  2008. std::cout << "TID " << GetCurrentThreadId()
  2009. << " : " << pszClientID
  2010. << " NewConnection " << pszServer
  2011. << ":" << pszServerPort
  2012. << " user: " << pszUserName
  2013. << " password " << pszPassword
  2014. << std::endl;
  2015. //连接MQTT broker
  2016. DWORD dwTick = GetTickCount();
  2017. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2018. // 初始化元组字段
  2019. std::get<MQTT_TIPIC_LIST_ID>(*connection) = new mqtt_topic_list;
  2020. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2021. std::get<USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2022. CcosLock* pLock = new CcosLock();
  2023. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2024. pLock->Thread_Lock();
  2025. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2026. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2027. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2028. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2029. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2030. std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
  2031. mqtt_client* pMqttClient = InnerConnect(connection);
  2032. if (pMqttClient == nullptr)
  2033. {
  2034. pLock->Thread_UnLock();
  2035. delete connection;
  2036. delete pfilter;
  2037. return nullptr;
  2038. }
  2039. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2040. //pMqttClient->mqtt_conn_context = connection;
  2041. //设置MQTT连接参数
  2042. //mqtt_set_port(pMqttClient, (char*)pszServerPort);
  2043. //mqtt_set_host(pMqttClient, (char*)pszServer);
  2044. //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
  2045. //mqtt_set_client_id(pMqttClient, (char*)pszClientID);
  2046. //mqtt_set_user_name(pMqttClient, (char*)pszUserName);
  2047. //mqtt_set_password(pMqttClient, (char*)pszPassword);
  2048. //mqtt_set_clean_session(pMqttClient, 1);
  2049. //mqtt_set_version(pMqttClient, 5);
  2050. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2051. //mqtt_set_write_buf_size(pMqttClient, 8192);
  2052. //mqtt_set_read_buf_size(pMqttClient, 8192);
  2053. //mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  2054. //mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  2055. //if (pszUserName != nullptr && strlen(pszUserName) > 0)
  2056. // conn_opts.username = pszUserName;
  2057. //if (pszPassword != nullptr && strlen(pszPassword) > 0)
  2058. // conn_opts.password = pszPassword;
  2059. //设置回调函数
  2060. /*
  2061. int rc = 0;
  2062. dwTick = GetTickCount();
  2063. while (true)
  2064. {
  2065. int rc = mqtt_connect(pMqttClient);
  2066. if (rc != MQTT_SUCCESS_ERROR)
  2067. {
  2068. //默认10s内尝试
  2069. if (GetTickCount() - dwTick > 10000)
  2070. {
  2071. ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2072. pLock->Thread_UnLock();
  2073. delete connection;
  2074. delete pfilter;
  2075. return nullptr;
  2076. }
  2077. else
  2078. {
  2079. ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2080. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl;
  2081. Sleep(2000);
  2082. }
  2083. }
  2084. else
  2085. {
  2086. break;
  2087. }
  2088. }
  2089. */
  2090. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2091. // 创建POSIX信号量
  2092. sem_t* semaphore = new sem_t;
  2093. sem_init(semaphore, 0, 0);
  2094. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2095. // 创建发送线程
  2096. pthread_t threadId;
  2097. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2098. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2099. // 错误处理...
  2100. sem_destroy(semaphore);
  2101. delete semaphore;
  2102. pLock->Thread_UnLock();
  2103. delete connection;
  2104. return nullptr;
  2105. }
  2106. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2107. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2108. pLock->Thread_UnLock();
  2109. uint64_t dwWaitTick = GetTickCount() - dwTick;
  2110. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2111. << dwWaitTick << " ms" << std::endl;
  2112. return connection;
  2113. }
  2114. //创建额外连接,需要提供回调函数
  2115. LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async)
  2116. {
  2117. DWORD dwTick = GetTickCount();
  2118. ////mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
  2119. //mqtt_client* pMqttClient = nullptr;
  2120. //pMqttClient = mqtt_lease();
  2121. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2122. //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  2123. //std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2124. mqtt_topic_list* pTopicList = new std::list<string>;
  2125. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  2126. CcosLock* pLock = new CcosLock();
  2127. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2128. pLock->Thread_Lock();
  2129. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2130. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2131. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2132. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2133. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2134. std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
  2135. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2136. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2137. mqtt_client* pMqttClient = InnerConnect(connection);
  2138. if (pMqttClient == nullptr)
  2139. {
  2140. pLock->Thread_UnLock();
  2141. delete connection;
  2142. delete pfilter;
  2143. return nullptr;
  2144. }
  2145. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2146. //pMqttClient->mqtt_conn_context = connection;
  2147. //设置回调函数
  2148. //设置MQTT连接参数
  2149. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2150. /*
  2151. //连接Broker
  2152. int rc = 0;
  2153. dwTick = GetTickCount();
  2154. while (true)
  2155. {
  2156. rc = mqtt_connect(pMqttClient);
  2157. if (rc != MQTT_SUCCESS_ERROR)
  2158. {
  2159. if (GetTickCount() - dwTick > dwOpenTimeout)
  2160. {
  2161. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2162. pLock->Thread_UnLock();
  2163. delete connection;
  2164. delete pfilter;
  2165. return nullptr;
  2166. }
  2167. else
  2168. {
  2169. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2170. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
  2171. Sleep(20);
  2172. }
  2173. }
  2174. else
  2175. {
  2176. break;
  2177. }
  2178. }*/
  2179. DWORD dwWaitTick = GetTickCount() - dwTick;
  2180. ////mLog::FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  2181. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2182. // 创建POSIX信号量
  2183. sem_t* semaphore = new sem_t;
  2184. sem_init(semaphore, 0, 0);
  2185. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2186. // 创建发送线程
  2187. pthread_t threadId;
  2188. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2189. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2190. // 错误处理...
  2191. sem_destroy(semaphore);
  2192. delete semaphore;
  2193. pLock->Thread_UnLock();
  2194. delete connection;
  2195. return nullptr;
  2196. }
  2197. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2198. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2199. pLock->Thread_UnLock();
  2200. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2201. << dwWaitTick << " ms" << std::endl;
  2202. return connection;
  2203. }
  2204. //重置连接
  2205. LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection)
  2206. {
  2207. if (hConnection == nullptr)
  2208. {
  2209. return;
  2210. }
  2211. ////mLog::FWARN(" Reset Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  2212. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
  2213. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2214. if (pconn != nullptr) {
  2215. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2216. pLock->Thread_Lock();
  2217. //mqtt_do_reconnect(pconn);
  2218. mqtt_disconnect(pconn);
  2219. /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic);
  2220. mqtt_connect(pconn);*/
  2221. mqtt_release(pconn);
  2222. pconn = InnerConnect(hConnection);
  2223. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2224. if (pconn != nullptr)
  2225. {
  2226. //std::get<MQTT_CLT_ID>(*hConnection) = pconn;
  2227. //resubscribe_topic(pconn, pconn->mqtt_conn_context);
  2228. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2229. }
  2230. pLock->Thread_UnLock();
  2231. }
  2232. }
  2233. //关闭并释放连接
  2234. LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
  2235. {
  2236. if (!hConnection) return;
  2237. auto clientId = std::get<CLINET_ID_ID>(*hConnection);
  2238. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl;
  2239. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2240. if (nullptr != pLock)
  2241. pLock->Thread_Lock();
  2242. else
  2243. return;
  2244. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2245. sem_t* semaphore = (sem_t*)std::get<SEMAPHORE_HANDLE_ID>(*hConnection);
  2246. std::get<MQTT_CLT_ID>(*hConnection) = nullptr;
  2247. std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
  2248. ccos_mqtt_msg_filter* pfilter = static_cast<ccos_mqtt_msg_filter*>(std::get<MSG_HOOK_ID>(*hConnection));
  2249. mqtt_msg_list* pList = static_cast<mqtt_msg_list*>(std::get<MSG_LIST_ID>(*hConnection));
  2250. pthread_t thread = std::get<CONNECTED_HANDLE_ID>(*hConnection);
  2251. ////mLog::FINFO(" Close Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  2252. if (pconn) {
  2253. // 优雅停止代替强制取消
  2254. mqtt_disconnect(pconn);
  2255. // 设置停止标志,等待线程退出
  2256. if (thread) {
  2257. pthread_cancel(thread);
  2258. //pthread_join(thread, nullptr); // 等待线程结束
  2259. }
  2260. }
  2261. if (pconn) {
  2262. mqtt_release(pconn);
  2263. }
  2264. if (pfilter) {
  2265. std::cout << "Free Connection filter [" << (UINT64)pfilter << "] .." << endl;
  2266. // 释放filter内部成员
  2267. delete std::get<FILTER_RES_OBJ_ID>(*pfilter);
  2268. delete std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
  2269. delete pfilter;
  2270. }
  2271. if (semaphore) {
  2272. sem_destroy(semaphore); // Destroy semaphore if applicable
  2273. delete semaphore;
  2274. }
  2275. if (pList) {
  2276. delete pList;
  2277. }
  2278. std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
  2279. if (nullptr != pLock)
  2280. pLock->Thread_UnLock();
  2281. delete hConnection;
  2282. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl;
  2283. }
  2284. //主动订阅主题
  2285. LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
  2286. {
  2287. if (hConnection == nullptr)
  2288. {
  2289. return 0;
  2290. }
  2291. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2292. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2293. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2294. pLock->Thread_Lock();
  2295. pTopicList->push_back(pszTopic);
  2296. //pMqttClient->subscribe(pszTopic, 1, opts);
  2297. int rc = MQTT_SUCCESS_ERROR;
  2298. DWORD dwTick = GetTickCount();
  2299. int nTryTimes = 0;
  2300. do
  2301. {
  2302. if (pTopicList->size() < 1)
  2303. {
  2304. int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2305. ////mLog::FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2306. std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe First " << pszTopic << endl;
  2307. }
  2308. else
  2309. {
  2310. int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2311. ////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2312. std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << endl;
  2313. }
  2314. if (rc != MQTT_SUCCESS_ERROR)
  2315. {
  2316. ////mLog::FWARN("try do mqtt reconnect ..");
  2317. //mqtt_do_reconnect(pMqttClient);
  2318. mqtt_disconnect(pMqttClient);
  2319. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2320. mqtt_release(pMqttClient);
  2321. pMqttClient = InnerConnect(hConnection);
  2322. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2323. if (pMqttClient != nullptr)
  2324. {
  2325. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2326. //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2327. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2328. }
  2329. usleep(2000 * 1000);
  2330. }
  2331. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2332. pLock->Thread_UnLock();
  2333. return 0;
  2334. }
  2335. //主题订阅取消
  2336. LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  2337. {
  2338. if (hConnection == nullptr)
  2339. {
  2340. return 0;
  2341. }
  2342. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2343. //if (!pMqttClient->is_connected()) {
  2344. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2345. // return 0;
  2346. //}
  2347. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2348. //auto ret = pMqttClient->unsubscribe(pszTopic);
  2349. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2350. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2351. pLock->Thread_Lock();
  2352. //MQTTAsync_responseOptions resp;
  2353. pTopicList->remove(pszTopic);
  2354. int rc = MQTT_SUCCESS_ERROR;
  2355. DWORD dwTick = GetTickCount();
  2356. int nTryTimes = 0;
  2357. do
  2358. {
  2359. int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
  2360. ////mLog::FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  2361. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2362. pLock->Thread_UnLock();
  2363. return 2;
  2364. }
  2365. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  2366. LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout)
  2367. {
  2368. char pszClientID[256];
  2369. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2370. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2371. getpid(), GetCurrentThreadId());
  2372. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
  2373. });
  2374. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
  2375. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  2376. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2377. string pLoad = pCmd->encode();
  2378. int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName);
  2379. std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2380. ////mLog::FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc);
  2381. CloseConnection(connObj);
  2382. return rc;
  2383. }
  2384. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  2385. LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout)
  2386. {
  2387. if (hConnection == nullptr)
  2388. {
  2389. ////mLog::FDEBUG("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
  2390. std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
  2391. return 0;
  2392. }
  2393. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  2394. string topic = pszTopic;
  2395. if (topic.length() <= 0)
  2396. {
  2397. ////mLog::FWARN("ignore empty topic packet {$}", pAction->encode());
  2398. return 2;
  2399. }
  2400. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2401. //if (!pMqttClient->is_connected()) {
  2402. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2403. // return 0;
  2404. //}
  2405. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2406. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2407. pLock->Thread_Lock();
  2408. ////mLog::FDEBUG("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode());
  2409. //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction);
  2410. //if(org_publisher.length() <= 0)
  2411. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
  2412. ResDataObject* pPacket = new ResDataObject();
  2413. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2414. pPacket->add(pszTopic, pAction->encode());
  2415. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2416. pList->push_back(pPacket);
  2417. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2418. std::cout << "try publish " << pAction->encode() << endl;
  2419. //pMqttClient->publish(pszTopic, pAction->encode());
  2420. /*
  2421. //pConn->publish(pszTopic, pCmd->encode());
  2422. std::cout << "try publish " << pAction->encode() << endl;
  2423. const char* pLoad = pAction->encode();
  2424. int len = strlen(pLoad);
  2425. //MQTTAsync_responseOptions resp;
  2426. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2427. mqtt_message_t msg;
  2428. memset(&msg, 0, sizeof(msg));
  2429. msg.payload = (void*)pLoad;
  2430. msg.qos = MQTT_QOS;
  2431. msg.payloadlen = len;
  2432. int rc = MQTT_SUCCESS_ERROR;
  2433. DWORD dwTick = GetTickCount();
  2434. int nTryTimes = 0;
  2435. do
  2436. {
  2437. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2438. nTryTimes++;
  2439. if (rc != MQTT_SUCCESS_ERROR)
  2440. {
  2441. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id);
  2442. //mqtt_do_reconnect(pMqttClient);
  2443. mqtt_disconnect(pMqttClient);
  2444. rc = mqtt_connect(pMqttClient);
  2445. if (rc == MQTT_SUCCESS_ERROR)
  2446. {
  2447. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2448. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2449. }
  2450. Sleep(2);
  2451. }
  2452. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout);
  2453. //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2454. ////mLog::FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2455. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  2456. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
  2457. if (rc < 0)
  2458. {
  2459. ////mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2460. //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
  2461. pLock->Thread_UnLock();
  2462. return rc;
  2463. }*/
  2464. //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
  2465. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2466. ////mLog::FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size());
  2467. pLock->Thread_UnLock();
  2468. return 2;
  2469. }
  2470. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  2471. int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos)
  2472. {
  2473. const int dwTimeout = 500;
  2474. if (pMqttClient == nullptr)
  2475. {
  2476. ////mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message);
  2477. return 0;
  2478. }
  2479. ////mLog::FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message);
  2480. const char* pLoad = message.c_str();
  2481. int len = message.length();
  2482. mqtt_message_t msg;
  2483. memset(&msg, 0, sizeof(msg));
  2484. msg.payload = (void*)pLoad;
  2485. msg.payloadlen = len;
  2486. msg.qos = qos;
  2487. int rc = MQTT_SUCCESS_ERROR;
  2488. DWORD dwTick = GetTickCount();
  2489. int nTryTimes = 0;
  2490. do
  2491. {
  2492. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2493. nTryTimes++;
  2494. if (rc != MQTT_SUCCESS_ERROR)
  2495. {
  2496. ////mLog::FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id);
  2497. usleep(10000 * 1000);
  2498. //mqtt_do_reconnect(pMqttClient);
  2499. ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context;
  2500. mqtt_disconnect(pMqttClient);
  2501. /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2502. mqtt_connect(pMqttClient);*/
  2503. mqtt_release(pMqttClient);
  2504. pMqttClient = InnerConnect(hConnection);
  2505. //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2506. if (pMqttClient != nullptr)
  2507. {
  2508. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2509. //resubscribe_topic(pMqttClient, hConnection);
  2510. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2511. }
  2512. }
  2513. } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
  2514. if(nTryTimes > 1)
  2515. ////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2516. ////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2517. if (rc < 0)
  2518. {
  2519. ////mLog::FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2520. return rc;
  2521. }
  2522. return 2;
  2523. }
  2524. /*
  2525. /// <summary>
  2526. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  2527. /// 超时没收到应答返回失败,
  2528. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  2529. /// </summary>
  2530. /// <param name="pAction">要发送的命令Action名</param>
  2531. /// <param name="pContext">命令携带的参数</param>
  2532. /// <param name="pszTopic">要发送的目标topic</param>
  2533. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  2534. /// <param name="resObj">应答返回的参数结果</param>
  2535. /// <param name="dwWaitTime">等待超时时间</param>
  2536. /// <param name="hConnection">复用的MQTT链接句柄</param>
  2537. /// <param name="onmsg">复用的链接的消息处理函数</param>
  2538. /// <returns>成功返回2,其他返回错误码</returns>
  2539. LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  2540. ResDataObject& resObj, DWORD dwWaitTime)
  2541. {
  2542. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2543. if (pszRespTopic != nullptr)
  2544. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2545. if (hConnection == nullptr)
  2546. {
  2547. return 0;
  2548. }
  2549. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2550. //if (!pMqttClient->is_connected()) {
  2551. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2552. // return 0;
  2553. //}
  2554. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  2555. //SubscribeTopic()
  2556. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
  2557. string topic = topicName;//msg->get_topic().c_str();
  2558. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  2559. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
  2560. //应答到了,处理
  2561. ResDataObject req;
  2562. req.decode((const char*)message->payload);
  2563. PacketAnalizer::GetPacketContext(&req, resObj);
  2564. SetEvent(hEvent);
  2565. //处理结束 将函数指针换回去
  2566. void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2567. //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  2568. MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
  2569. }
  2570. else
  2571. {
  2572. //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2573. (msgarrvd)(context, topicName, topicLen, message);
  2574. }
  2575. };
  2576. //接管回调
  2577. // pMqttClient->set_message_callback(action_func);
  2578. MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
  2579. //pMqttClient->subscribe(pszRespTopic, 1);
  2580. MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2581. MQTTProperties prop = MQTTProperties_initializer;
  2582. std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
  2583. //pMqttClient->subscribe(pszTopic, 1, opts);
  2584. MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
  2585. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  2586. //pMqttClient->publish(pszTopic, req.encode());
  2587. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  2588. MQTTClient_message* m = NULL;
  2589. MQTTClient_deliveryToken dt;
  2590. MQTTProperty property;
  2591. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  2592. property.value.data.data = "test user property";
  2593. property.value.data.len = (int)strlen(property.value.data.data);
  2594. property.value.value.data = "test user property value";
  2595. property.value.value.len = (int)strlen(property.value.value.data);
  2596. MQTTProperties properties = MQTTProperties_initializer;
  2597. MQTTProperties_add(&properties, &property);
  2598. //pConn->publish(pszTopic, pCmd->encode());
  2599. const char* pLoad = req.encode();
  2600. MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
  2601. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2602. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  2603. if (ret == WAIT_OBJECT_0) {
  2604. //等到应答了
  2605. return 2;
  2606. }
  2607. return 0;
  2608. }
  2609. */
  2610. /// <summary>
  2611. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  2612. /// </summary>
  2613. /// <param name="hConnection"></param>
  2614. /// <param name="pAction"></param>
  2615. /// <param name="pContext"></param>
  2616. /// <param name="pszTopic"></param>
  2617. /// <param name="resObj"></param>
  2618. /// <param name="hEvent"></param>
  2619. /// <param name="dwWaitTime"></param>
  2620. /// <returns></returns>
  2621. LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
  2622. const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
  2623. {
  2624. usleep(1000 * 1000);
  2625. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2626. if (hConnection == nullptr)
  2627. {
  2628. return 0;
  2629. }
  2630. DWORD dwTick = GetTickCount();
  2631. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2632. sem_t hEvent;
  2633. sem_init(&hEvent, 0, 0); // 初始化信号量为0
  2634. ResDataObject resContext;
  2635. string strResObject;
  2636. char* pszPad = 0;
  2637. char* pszPad2 = 0;
  2638. auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool {
  2639. ////mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
  2640. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
  2641. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  2642. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  2643. {
  2644. if (GetTickCount() - dwTick < dwWaitTime)
  2645. {
  2646. ////mLog::FDEBUG("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2647. resObj = *rsp;
  2648. return true;
  2649. }
  2650. else
  2651. {
  2652. ////mLog::FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2653. return false;
  2654. }
  2655. std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
  2656. }
  2657. ////mLog::FDEBUG("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2658. std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
  2659. return false;
  2660. };
  2661. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2662. pLock->Thread_Lock();
  2663. ////mLog::FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
  2664. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  2665. ResDataObject *resActions = std::get<FILTER_RES_OBJ_ID>(*pfilter);
  2666. resActions->add(pAction, pszTopic);
  2667. //std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
  2668. std::get<FILTER_FUNC_ID>(*pfilter) = func;
  2669. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2670. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  2671. const char* pLoad = req.encode();
  2672. ResDataObject* pPacket = new ResDataObject();
  2673. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2674. pPacket->add(pszTopic, pLoad);
  2675. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2676. pList->push_back(pPacket);
  2677. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2678. /*
  2679. mqtt_message_t msg;
  2680. memset(&msg, 0, sizeof(msg));
  2681. msg.payload = (void*)pLoad;
  2682. msg.qos = MQTT_QOS;
  2683. msg.payloadlen = strlen(pLoad);
  2684. //int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2685. int rc = MQTT_SUCCESS_ERROR;
  2686. dwTick = GetTickCount();
  2687. int nTryTimes = 0;
  2688. do
  2689. {
  2690. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2691. nTryTimes++;
  2692. if (rc != MQTT_SUCCESS_ERROR)
  2693. {
  2694. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id);
  2695. //mqtt_do_reconnect(pMqttClient);
  2696. mqtt_disconnect(pMqttClient);
  2697. rc = mqtt_connect(pMqttClient);
  2698. if (rc == MQTT_SUCCESS_ERROR)
  2699. {
  2700. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2701. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2702. }
  2703. Sleep(2);
  2704. }
  2705. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3);
  2706. if (nTryTimes >= 1)
  2707. ////mLog::FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, nTryTimes, rc);
  2708. */
  2709. pLock->Thread_UnLock();
  2710. dwTick = GetTickCount() - dwTick;
  2711. //////mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
  2712. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
  2713. dwTick = GetTickCount();
  2714. DWORD ret = sem_wait(&hEvent);
  2715. if (ret == WAIT_OBJECT_0) {
  2716. //等到应答了
  2717. dwTick = GetTickCount() - dwTick;
  2718. pLock->Thread_Lock();
  2719. ResDataObject* pResp = std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
  2720. for (int x = 0; x < pResp->size(); x++)
  2721. {
  2722. ResDataObject r = (*pResp)[x];
  2723. if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r))
  2724. {
  2725. resObj = r;
  2726. pResp->eraseOneOf(pAction, x);
  2727. break;
  2728. }
  2729. }
  2730. pLock->Thread_UnLock();
  2731. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
  2732. ////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
  2733. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2734. sem_destroy(&hEvent);
  2735. return 2;
  2736. }
  2737. //std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2738. //resObj.decode(strResObject.c_str());
  2739. ////mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
  2740. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
  2741. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2742. sem_destroy(&hEvent);
  2743. return 0;
  2744. }
  2745. /// <summary>
  2746. /// 新建MQTT连接发送Ation并等待应答
  2747. /// </summary>
  2748. /// <param name="pAction"></param>
  2749. /// <param name="pContext"></param>
  2750. /// <param name="pszTopic"></param>
  2751. /// <param name="pszRespTopic"></param>
  2752. /// <param name="resObj"></param>
  2753. /// <param name="dwWaitTime"></param>
  2754. /// <param name="pszSenderName"></param>
  2755. /// <returns></returns>
  2756. LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  2757. DWORD dwWaitTime, const char* pszSenderName)
  2758. {
  2759. ResDataObject req;
  2760. if (pszRespTopic != nullptr)
  2761. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2762. //临时创建连接并发送和接收应答
  2763. char pszClientID[256];
  2764. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2765. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2766. getpid(), GetCurrentThreadId());
  2767. sem_t sem;
  2768. if (sem_init(&sem, 0, 0) != 0) {
  2769. return 0; // 如果初始化失败,返回0
  2770. }
  2771. std::cout << " ActionAndResp->NewConnection " << endl;
  2772. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) {
  2773. //应答到了,处理
  2774. PacketAnalizer::GetPacketContext(req, resObj);
  2775. sem_post(&sem);
  2776. });
  2777. ////发布消息,并等待应答
  2778. PublishAction(&req, pszTopic, connObj);
  2779. // 等待应答或超时
  2780. struct timespec ts;
  2781. clock_gettime(CLOCK_REALTIME, &ts);
  2782. ts.tv_sec += dwWaitTime / 1000; // 转换为秒
  2783. ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒
  2784. // 等待信号量或者超时
  2785. int ret = 0;
  2786. while (ret == 0) {
  2787. ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量
  2788. if (ret == -1 && errno == ETIMEDOUT) {
  2789. // 超时
  2790. sem_destroy(&sem);
  2791. CloseConnection(connObj);
  2792. return 0; // 超时返回0
  2793. }
  2794. }
  2795. // 等到应答了
  2796. sem_destroy(&sem); // 销毁信号量
  2797. CloseConnection(connObj);
  2798. return 2; // 返回2表示应答成功
  2799. }