ModuleClient.cpp 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. #include "ModuleClient.h"
  2. #include "PacketAnalizer.h"
  3. #include "devGrpcClient.h"
  4. ModuleClient::ModuleClient(string szClientName, string szTransaction, string szType, bool bNeedNotify):
  5. LogicClient(szClientName, szTransaction, szType, bNeedNotify)
  6. {
  7. m_pGrpcClient = nullptr;
  8. }
  9. ModuleClient::~ModuleClient()
  10. {
  11. }
  12. RET_STATUS ModuleClient::ConnectGrpc(string strDevHost, string devPath)
  13. {
  14. if (strDevHost.length() <= 0)
  15. return RET_NOSUPPORT;
  16. int httpPort = atoi(strDevHost.c_str());
  17. if ( 0 != httpPort)
  18. {
  19. //纯数字,比如9051
  20. //grpc端口:-1
  21. httpPort--;
  22. strDevHost = "localhost:" + to_string(httpPort);
  23. cout << "Try connect " << strDevHost << endl;
  24. }
  25. if (m_pGrpcClient != nullptr)
  26. FreeClient(m_pGrpcClient);
  27. m_pGrpcClient = CreateClient(strDevHost.c_str());
  28. if (devPath.length() > 0)
  29. {
  30. //mLog::FINFO("Test Open Device {$}", devPath);
  31. int ret = m_pGrpcClient->OpenDevice(devPath, "");
  32. //mLog::FINFO("Open Dev Result {$} resource {$}", ret, ret == 2 ? m_pGrpcClient->GetOpendDeviceResource() : "Failed.");
  33. }
  34. //
  35. /*
  36. * channel is idle
  37. GRPC_CHANNEL_IDLE, 0
  38. channel is connecting
  39. GRPC_CHANNEL_CONNECTING, 1
  40. channel is ready for work
  41. GRPC_CHANNEL_READY, 2
  42. channel has seen a failure but expects to recover
  43. GRPC_CHANNEL_TRANSIENT_FAILURE, 3
  44. channel has seen a failure that it cannot recover from
  45. GRPC_CHANNEL_SHUTDOWN 4
  46. */
  47. int status = m_pGrpcClient->GetClientStatus();
  48. if (status <= 2)
  49. return RET_SUCCEED;
  50. return RET_FAILED;
  51. }
  52. RET_STATUS ModuleClient::ModuleReq(PACKET_CMD cmd, const char* pszResource, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath)
  53. {
  54. ResDataObject req;
  55. PacketAnalizer::MakeRequest(req, pszResource, cmd, pReqParam);
  56. string topic;
  57. if (pszDevPath == nullptr)
  58. {
  59. //默认设备路径
  60. topic = m_strDevicePath;
  61. }
  62. else
  63. {
  64. topic = pszDevPath;
  65. }
  66. topic += "/";
  67. topic += _Packet_Cmd_String[cmd] + "/";
  68. topic += pszResource;
  69. PublishAction(&req, topic.c_str(), m_pMqttConn);
  70. return RET_SUCCEED;
  71. }
  72. RET_STATUS ModuleClient::DevGet(const char* pszPropties, ResDataObject& pResponse, const char* pszDevPath)
  73. {
  74. ResDataObject param;
  75. if(m_pGrpcClient == nullptr)
  76. return ModuleReq(PACKET_CMD_GET, pszPropties, &param, pResponse, pszDevPath);
  77. string devRes, resMsg;
  78. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Get(pszPropties, devRes, resMsg, pszDevPath);
  79. if (ret == RET_SUCCEED)
  80. {
  81. pResponse.decode(devRes.c_str());
  82. return ret;
  83. }
  84. return RET_FAILED;
  85. }
  86. RET_STATUS ModuleClient::DevSet(const char* pszPropties, ResDataObject* pNewValue, ResDataObject& pResponse, const char* pszDevPath)
  87. {
  88. if (m_pGrpcClient == nullptr)
  89. return ModuleReq(PACKET_CMD_SET, pszPropties, pNewValue, pResponse, pszDevPath);
  90. string reqParam, devRes, resMsg;
  91. if (pNewValue->size() > 0)
  92. reqParam = pNewValue->encode();
  93. else
  94. reqParam = (const char*)(*pNewValue);
  95. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath);
  96. if (ret == RET_SUCCEED)
  97. {
  98. pResponse.decode(devRes.c_str());
  99. return ret;
  100. }
  101. return ret;
  102. }
  103. RET_STATUS ModuleClient::DevAdd(const char* pszPropties, ResDataObject* pAddValue, ResDataObject& pResponse, const char* pszDevPath)
  104. {
  105. if (m_pGrpcClient == nullptr)
  106. return ModuleReq(PACKET_CMD_ADD, pszPropties, pAddValue , pResponse, pszDevPath);
  107. string reqParam, devRes, resMsg;
  108. if (pAddValue->size() > 0)
  109. reqParam = pAddValue->encode();
  110. else
  111. reqParam = (const char*)(*pAddValue);
  112. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath);
  113. if (ret == RET_SUCCEED)
  114. {
  115. pResponse.decode(devRes.c_str());
  116. return ret;
  117. }
  118. return ret;
  119. }
  120. RET_STATUS ModuleClient::DevDel(const char* pszPropties, ResDataObject* pDelValue, ResDataObject& pResponse, const char* pszDevPath)
  121. {
  122. if (m_pGrpcClient == nullptr)
  123. return ModuleReq(PACKET_CMD_DEL, pszPropties,pDelValue , pResponse, pszDevPath);
  124. string reqParam, devRes, resMsg;
  125. if (pDelValue->size() > 0)
  126. reqParam = pDelValue->encode();
  127. else
  128. reqParam = (const char*)(*pDelValue);
  129. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Del(pszPropties, reqParam, devRes, resMsg, pszDevPath);
  130. if (ret == RET_SUCCEED)
  131. {
  132. pResponse.decode(devRes.c_str());
  133. return ret;
  134. }
  135. return ret;
  136. }
  137. RET_STATUS ModuleClient::DevAction(const char* pszActionName, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath)
  138. {
  139. if (m_pGrpcClient == nullptr)
  140. return (RET_STATUS)Action( pszActionName, *pReqParam , pResponse,11000, pszDevPath);
  141. string reqParam, devRes, resMsg;
  142. if (pReqParam->size() > 0)
  143. reqParam = pReqParam->encode();
  144. else
  145. reqParam = (const char*)(*pReqParam);
  146. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Action(pszActionName,reqParam, devRes, resMsg, pszDevPath);
  147. if (ret == RET_SUCCEED)
  148. {
  149. pResponse.decode(devRes.c_str());
  150. return ret;
  151. }
  152. return ret;
  153. }
  154. RET_STATUS ModuleClient::DevMessage(const char* pszTopics, ResDataObject* pMessage, ResDataObject& pResponse, const char* pszDevPath)
  155. {
  156. if (m_pGrpcClient == nullptr)
  157. return ModuleReq(PACKET_CMD_MSG, pszTopics, pMessage, pResponse, pszDevPath);
  158. string reqParam, devRes, resMsg;
  159. if (pMessage->size() > 0)
  160. reqParam = pMessage->encode();
  161. else
  162. reqParam = (const char*)(*pMessage);
  163. RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Message(pszTopics, reqParam, devRes, resMsg, pszDevPath);
  164. if (ret == RET_SUCCEED)
  165. {
  166. pResponse.decode(devRes.c_str());
  167. return ret;
  168. }
  169. return ret;
  170. }
  171. int ModuleClient::BeginAyncWait() {
  172. try
  173. {
  174. m_nWaitResultCount = 0;
  175. if (m_pGrpcClient != nullptr)
  176. return m_pGrpcClient->BeginAyncWait();
  177. }
  178. catch (...)
  179. {
  180. //mLog::FWARN("Exception occurred ");
  181. }
  182. return 0;
  183. }
  184. void ModuleClient::WaitAllComplete()
  185. {
  186. try
  187. {
  188. if (m_pGrpcClient != nullptr)
  189. m_pGrpcClient->WaitAllComplete();
  190. }
  191. catch (...)
  192. {
  193. //mLog::FWARN("Exception occurred ");
  194. }
  195. }
  196. int ModuleClient::GetAsyncResult(int idx, string& devResource, string& devRes, string& calResMsg)
  197. {
  198. try
  199. {
  200. if (m_pGrpcClient != nullptr)
  201. return m_pGrpcClient->GetAsyncResult(idx, devResource, devRes, calResMsg);
  202. }
  203. catch (...)
  204. {
  205. //mLog::FWARN("Exception occurred ");
  206. }
  207. return 0;
  208. }
  209. void ModuleClient::EndAync()
  210. {
  211. try
  212. {
  213. if (m_pGrpcClient != nullptr)
  214. m_pGrpcClient->EndAync();
  215. }
  216. catch (...)
  217. {
  218. //mLog::FWARN("Exception occurred ");
  219. }
  220. }
  221. int ModuleClient::AsyncAction(string devResource, ResDataObject* pReqParam,
  222. string devUri, string calSection, string calClientID)
  223. {
  224. try
  225. {
  226. string reqParam;
  227. if (pReqParam->size() > 0)
  228. reqParam = pReqParam->encode();
  229. else
  230. reqParam = (const char*)(*pReqParam);
  231. if (m_pGrpcClient != nullptr)
  232. return m_nWaitResultCount= m_pGrpcClient->AsyncAction(devResource, reqParam, devUri, calSection, calClientID);
  233. }
  234. catch (...)
  235. {
  236. //mLog::FWARN("Exception occurred ");
  237. }
  238. return 0;
  239. }
  240. int ModuleClient::AsyncMessage(string devResource, ResDataObject* pReqParam,
  241. string devUri, string calSection, string calClientID)
  242. {
  243. try
  244. {
  245. string reqParam;
  246. if (pReqParam->size() > 0)
  247. reqParam = pReqParam->encode();
  248. else
  249. reqParam = (const char*)(*pReqParam);
  250. if (m_pGrpcClient != nullptr)
  251. return m_nWaitResultCount = m_pGrpcClient->AsyncMessage(devResource, reqParam, devUri, calSection, calClientID);
  252. }
  253. catch (...)
  254. {
  255. //mLog::FWARN("Exception occurred ");
  256. }
  257. return 0;
  258. }
  259. int ModuleClient::ClearFilter()
  260. {
  261. if (!Lock())
  262. return 0;
  263. m_mpNotifyIdx.clear();
  264. UnLock();
  265. return 2;
  266. }
  267. int ModuleClient::InstallFilterNotify(string strKey, bool install)
  268. {
  269. string topic = m_strDevicePath + "/Notify/" + strKey;
  270. SubScribeTopic(topic.c_str(), install);
  271. //CCOS/DEVICE/Detector/ecomdemo/demo/1234 /Notify/DetectorStatus
  272. std::vector<string> nods;
  273. SplitCcosDevicePath(m_strDevicePath, nods);
  274. if (nods.size() > 5 && m_strWS.length() > 0)
  275. {
  276. //CCOS/Table/Detector /Notify/DetectorStatus
  277. topic = "CCOS/";
  278. topic += m_strWS + "/" + nods[2] + "/Notify/" + strKey;
  279. SubScribeTopic(topic.c_str(), install);
  280. }
  281. return 2;
  282. }
  283. PACKET_CMD ModuleClient::ReadFilterNotify(ResDataObject& CmdObject)
  284. {
  285. PACKET_CMD cmd = PACKET_CMD_NONE;
  286. //ResDataObject resPacket;
  287. cmd = ReadNotify(CmdObject);
  288. if (cmd != PACKET_CMD_NONE )
  289. {
  290. PACKET_TYPE type = PacketAnalizer::GetPacketType(&CmdObject);
  291. if (type == PACKET_TYPE_NOTIFY)
  292. {
  293. DWORD idx = PacketAnalizer::GetPacketIdx(&CmdObject);
  294. string key = PacketAnalizer::GetPacketKey(&CmdObject);
  295. if (Lock())
  296. {
  297. auto itf = m_mpNotifyIdx.find(key);
  298. if (itf != m_mpNotifyIdx.end())
  299. {
  300. if (itf->second != idx)
  301. {
  302. // idx不同
  303. m_mpNotifyIdx[key] = idx;
  304. UnLock();
  305. return cmd;
  306. }
  307. else
  308. {
  309. UnLock();
  310. //mLog::FDEBUG("Same Packet no process.. {$} with idx:[{$}]",key, idx);
  311. return PACKET_CMD_NONE;
  312. }
  313. }
  314. else
  315. {
  316. //首次抵达
  317. m_mpNotifyIdx[key] = idx;
  318. UnLock();
  319. return cmd;
  320. }
  321. }
  322. else
  323. {
  324. //mLog::FERROR("Lock timeout for packet {$}", CmdObject.encode());
  325. }
  326. }
  327. }
  328. return PACKET_CMD_NONE;
  329. }
  330. int ModuleClient::Notify(const char* pszTopic, const char* pszSender, const char* pszType, const char* pszCmd, const char* pMsg)
  331. {
  332. ResDataObject req,resMsg;
  333. resMsg.add("DevUri", pszSender);
  334. resMsg.add("Type", pszType);
  335. resMsg.add("Message", pMsg);
  336. PacketAnalizer::MakeNotify(req, PACKET_CMD_MSG, pszCmd, "");
  337. PacketAnalizer::UpdatePacketContext(req, resMsg);
  338. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pszSender );
  339. return PublishAction(&req, pszTopic, m_pMqttConn);
  340. }
  341. void ModuleClient::SplitCcosDevicePath(string DevicePath, vector<string>& resTopicParams)
  342. {
  343. // 使用 stringstream 和 getline 按照 "/" 切割路径
  344. std::stringstream ss(DevicePath);
  345. std::string token;
  346. // 清空之前的数据
  347. resTopicParams.clear();
  348. // 按照"/"切割路径并存入resTopicParams
  349. while (std::getline(ss, token, '/')) {
  350. resTopicParams.push_back(token);
  351. }
  352. }