StateMachineDevicePool.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. #include "stdafx.h"
  2. #include "PacketAnalizer.h"
  3. #include "StateMachineDevicePool.h"
  4. #include "ConditionEvent.h"
  5. StateMachineDevicePool::StateMachineDevicePool()
  6. {
  7. try {
  8. //用于接收通知的连接
  9. m_pClientNotify.reset(new LogicClient(ClientType_Notify, m_strCurTransaction, "SMN", false));
  10. //用于执行action的连接
  11. m_pClientAction.reset(new LogicClient(ClientType_Action, m_strCurTransaction, "SMA", false));
  12. }
  13. catch (...)
  14. {
  15. //mLog::FERROR(" Create StateMachineDevicePool crash");//创建失败
  16. }
  17. }
  18. StateMachineDevicePool::~StateMachineDevicePool()
  19. {
  20. m_bNotifyLoop = false;
  21. CloseDevices();
  22. m_DevPathMap.clear();
  23. m_TopicList.clear();
  24. }
  25. //设置退出事件
  26. void StateMachineDevicePool::SetTopExitHandle(std::shared_ptr<LinuxEvent> hand)
  27. {
  28. m_hTopExitHandle = hand;
  29. }
  30. //往设备池子中添加设备
  31. bool StateMachineDevicePool::SetDeviceAttributes(ResDataObject& devices, ResDataObject& attributes, string workStation)
  32. {
  33. bool ret = true;
  34. m_strWS != workStation;
  35. try {//解析设备列表
  36. map<string, string> tempDevMap; //设备类型:设备路径
  37. if (ret)
  38. {
  39. for (int i = 0; i < devices.size(); i++)
  40. {
  41. const char* pDeviceType = devices.GetKey(i);
  42. const char* pDevicePath = (const char*)devices[i];
  43. if (strlen(pDeviceType) == 0)
  44. {
  45. //mLog::FERROR("The idx[{$}] DeviceType is empty", (int)i);
  46. ret = false;
  47. break;
  48. }
  49. else if (strlen(pDevicePath) == 0)
  50. {
  51. //mLog::FERROR("The idx[{$}] DevicePath is empty", (int)i);
  52. ret = false;
  53. break;
  54. }
  55. else
  56. {
  57. //mLog::FDEBUG("add idx[{$}] DevicePath[{$}:{$}] Succced", i, pDeviceType, pDevicePath);
  58. tempDevMap[pDeviceType] = pDevicePath;
  59. }
  60. }
  61. }
  62. //解析属性列表
  63. multimap<string, string> tempAttrMap; //设备类型:属性名
  64. if (ret)
  65. {
  66. for (int i = 0; i < attributes.size(); i++)
  67. {
  68. const char* pAttrType = attributes.GetKey(i);
  69. const char* pAttrKey = (const char*)attributes[i];
  70. if (strlen(pAttrType) == 0)
  71. {
  72. //mLog::FERROR("The idx[{$}] AttributeType is empty", (int)i);
  73. ret = false;
  74. break;
  75. }
  76. else if (strlen(pAttrKey) == 0)
  77. {
  78. //mLog::FERROR("The idx[{$}] AttributeKey is empty", (int)i);
  79. ret = false;
  80. break;
  81. }
  82. else
  83. {
  84. //mLog::FDEBUG("add idx[{$}] AttributeKey[{$}:{$}] Succced", i, pAttrType, pAttrKey);
  85. tempAttrMap.insert(make_pair(pAttrType, pAttrKey));
  86. }
  87. }
  88. }
  89. if (ret)
  90. {
  91. //有选择性地订阅设备上报的属性
  92. if (OpenDevices())
  93. {
  94. for (auto& topic : m_TopicList)
  95. {
  96. m_pClientNotify->SubScribeTopic(topic.c_str(), false);
  97. //mLog::FDEBUG("unSubScribe[{$}] ok", topic.c_str());
  98. }
  99. m_TopicList.clear();
  100. m_DevPathMap = tempDevMap;
  101. for (auto& iter : tempAttrMap)
  102. {
  103. auto finder = m_DevPathMap.find(iter.first);
  104. if (finder != m_DevPathMap.end())
  105. {
  106. string topic = finder->second + ClientTopic_NotifyInterval + iter.second;
  107. m_pClientNotify->SubScribeTopic(topic.c_str());
  108. //mLog::FDEBUG("SubScribe[{$}] successed", topic.c_str());
  109. m_TopicList.emplace_back(topic);
  110. }
  111. else
  112. {
  113. string keyAttr = iter.first + ConfItem_delim + iter.second;
  114. //mLog::FWARN("SubScribe[{$}] failed", keyAttr.c_str());
  115. }
  116. }
  117. }
  118. else
  119. {
  120. //mLog::FERROR("OpenDevices failed");
  121. ret = false;
  122. }
  123. }
  124. }
  125. catch (...)
  126. {
  127. //mLog::FERROR("SetDeviceAttributes crash");
  128. }
  129. return ret;
  130. }
  131. //主动获取订阅属性的初始值
  132. void StateMachineDevicePool::GetAttributesInitValue(map<string, string>& initValueMap)
  133. {
  134. #if 0 //GetDeviceResource是非线程安全,暂时不启用
  135. try {
  136. if (m_pClientNotify && !m_pClientNotify->IsClosed())
  137. {
  138. ResDataObject pDeviceResource;
  139. m_pClientNotify->GetDeviceResource(&pDeviceResource);
  140. for (auto item : m_nDevAttributeMap)
  141. {
  142. }
  143. }
  144. }
  145. catch (...)
  146. {
  147. //mLog::FERROR("GetAttributesInitValue crash");
  148. }
  149. #endif
  150. }
  151. //开始、结束消息循环处理
  152. bool StateMachineDevicePool::BeginNotifyLoop(const JudgeNotifyCallback& funCallback)
  153. {
  154. //mLog::FDEBUG("Enter BeginNotifyLoop");
  155. m_JudgeNotifyFun = funCallback;
  156. return StartThread();
  157. }
  158. bool StateMachineDevicePool::EndNotifyLoop()
  159. {
  160. //mLog::FDEBUG("Enter EndNotifyLoop");
  161. StopThread(STATEMACHINE_DefState_TIMEOUT);
  162. m_JudgeNotifyFun = NULL;
  163. return true;
  164. }
  165. /*动作定义; 对应配置项中的Actions
  166. {
  167. "Action": {
  168. "Generator:SetFLFMode": {
  169. "P0": "PF"
  170. },
  171. "Generator:SetValue_PPS": {
  172. "P0": "15"
  173. },
  174. "Detector:SetAcqMode": {
  175. "P0": "PF"
  176. }
  177. }
  178. }
  179. */
  180. CCOSSTMRET StateMachineDevicePool::StateMachineAction(ResDataObject& resActions, ResDataObject& resResult, DWORD Timeout)
  181. {
  182. CCOSSTMRET ret = CCOSSMRET_SUCCESS;
  183. //按顺序投递消息,并接收应答
  184. int nDeviceCount = resActions.size();
  185. try {
  186. if (m_pClientAction && !m_pClientAction->IsClosed())
  187. {
  188. //获取开始时间
  189. DWORD dwStart = GetTickCount();
  190. //顺序执行
  191. for (int devIdx = 0; devIdx < nDeviceCount; devIdx++)
  192. {
  193. string strPathAction = resActions.GetKey(devIdx);
  194. string deviceType = strPathAction.substr(0, strPathAction.find(ConfItem_delim));
  195. auto iter = m_DevPathMap.find(deviceType);
  196. if (iter != m_DevPathMap.end())
  197. {
  198. string actionKey = strPathAction.substr(strPathAction.find(ConfItem_delim) + 1, strPathAction.length());
  199. ResDataObject actionParam = resActions[devIdx];
  200. string actionPath = iter->second;
  201. ResDataObject actionResp;
  202. //mLog::FDEBUG("try Action[{$}] param[{$}] to[{$}]", strPathAction.c_str(), actionParam.encode(), actionPath.c_str());
  203. int actRes = m_pClientAction->Action(actionKey.c_str(), actionParam, actionResp, STATEMACHINE_Action_TIMEOUT, actionPath.c_str());
  204. switch (actRes)
  205. {
  206. case RET_TIMEOUT:
  207. {
  208. //mLog::FWARN("Action[{$}] timeout", strPathAction.c_str());
  209. ret = CCOSSMRET_FAILED;
  210. }break;
  211. case RET_SUCCEED:
  212. case RET_ONGOING:
  213. case RET_FINISHED:
  214. {
  215. //mLog::FDEBUG("Got Resp of [{$}] ret[{$}] successd", strPathAction.c_str(), actRes);
  216. }break;
  217. default:
  218. {
  219. //mLog::FWARN("Got Resp of [{$}] ret[{$}] failed", strPathAction.c_str(), actRes);
  220. ret = CCOSSMRET_FAILED;
  221. }break;
  222. }
  223. //返回值:DeviceName.ActionName : resResp
  224. resResult.add(strPathAction.c_str(), actionResp);
  225. //超时了,还没执行完
  226. if (GetTickCount() - dwStart > Timeout)
  227. {
  228. //mLog::FWARN("current State action timeout");
  229. return CCOSSMRET_TIMEOUT;
  230. }
  231. }
  232. }
  233. }
  234. else
  235. {
  236. //mLog::FERROR("Can't find Action Clinet[{$}]", ClientType_Action);
  237. ret = CCOSSMRET_FAILED;
  238. }
  239. }
  240. catch (...)
  241. {
  242. //mLog::FERROR("StateMachineAction crash");
  243. }
  244. return ret;
  245. }
  246. //打开所有设备
  247. bool StateMachineDevicePool::OpenDevices()
  248. {
  249. bool ret = true;
  250. try {
  251. if (ret && m_pClientAction) //用于执行action的连接
  252. {
  253. if (m_pClientAction->IsClosed())
  254. {
  255. if (m_pClientAction->Open(ClientPath_Root, ACTION | NOTIFY_ACTION, m_strWS.c_str()) >= RET_SUCCEED)
  256. {
  257. //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Succced...", ClientType_Action, ClientPath_Root);
  258. }
  259. else
  260. {
  261. //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Failed...", ClientType_Action, ClientPath_Root);
  262. ret = false;
  263. }
  264. }
  265. }
  266. if (ret && m_pClientNotify) //用于接收通知的连接
  267. {
  268. if (m_pClientNotify->IsClosed())
  269. {
  270. if (m_pClientNotify->Open(ClientPath_Root, NOTIFY_MSG, m_strWS.c_str()) >= RET_SUCCEED)
  271. {
  272. //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Succced...", ClientType_Notify, ClientPath_Root);
  273. }
  274. else
  275. {
  276. //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Failed...", ClientType_Notify, ClientPath_Root);
  277. ret = false;
  278. }
  279. }
  280. }
  281. }
  282. catch (...)
  283. {
  284. //mLog::FERROR("OpenDevices crash");
  285. }
  286. if (!ret)
  287. {
  288. m_pClientAction->Close();
  289. m_pClientNotify->Close();
  290. }
  291. return ret;
  292. }
  293. //关闭设备
  294. bool StateMachineDevicePool::CloseDevices()
  295. {
  296. try {
  297. if (m_pClientNotify)
  298. {
  299. m_pClientNotify->Close();
  300. }
  301. if (m_pClientAction)
  302. {
  303. m_pClientAction->Close();
  304. }
  305. }
  306. catch (...)
  307. {
  308. //mLog::FERROR("CloseDevices crash");
  309. }
  310. return true;
  311. }
  312. //轮询设备组的Notify
  313. CCOSSTMRET StateMachineDevicePool::ReadForDeviceEvents()
  314. {
  315. vector<shared_ptr<LinuxEvent>> waitList; //事件等待列表
  316. try {
  317. if (m_hTopExitHandle && m_pClientNotify && !m_pClientNotify->IsClosed())
  318. {
  319. waitList.push_back(m_hTopExitHandle);
  320. waitList.push_back(GetExitEvt());
  321. waitList.push_back(m_pClientNotify->GetNotifyHandle());
  322. while (m_bNotifyLoop.load())
  323. {
  324. DWORD wait_ret = LinuxEvent::WaitForMultipleEvents(waitList, -1);
  325. switch (wait_ret)
  326. {
  327. case WAIT_TIMEOUT:
  328. {}break;
  329. case WAIT_OBJECT_0://top exit thread
  330. case WAIT_OBJECT_0 + 1://current exit thread
  331. {
  332. return CCOSSMRET_EXIT;
  333. }break;
  334. case WAIT_OBJECT_0 + 2://m_GeneralClientNotify
  335. {
  336. while (m_pClientNotify->IsDataArrived())
  337. {
  338. ResDataObject NotifyData, resTopic, resDevicePath;
  339. PACKET_CMD notifycmd = m_pClientNotify->ReadNotify(NotifyData);
  340. string strKey = PacketAnalizer::GetPacketKey(&NotifyData);
  341. string strValue = (string)NotifyData[ConfItem_CONTEXT];
  342. switch (notifycmd)
  343. {
  344. case PACKET_CMD_OPEN:
  345. {}break;
  346. case PACKET_CMD_CLOSE:
  347. {}break;
  348. case PACKET_CMD_GET:
  349. {}break;
  350. case PACKET_CMD_UPDATE://属性更新
  351. {
  352. //将通知事件上报即可
  353. if (m_JudgeNotifyFun)
  354. {
  355. if (PacketAnalizer::GetPacketTopic(&NotifyData, resTopic))
  356. {
  357. vector<string> resTopicParams;
  358. SplitCcosDevicePath(resTopic, resTopicParams);
  359. if (resTopicParams.size() >= 3)//CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8
  360. {
  361. string NotfiyPath = resTopicParams[2] + ConfItem_delim + strKey;
  362. //mLog::FDEBUG("Get Device Topic CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str());
  363. m_JudgeNotifyFun(NotfiyPath, strValue);
  364. }
  365. }
  366. else
  367. {
  368. if (m_pClientNotify->GetFilePath(resDevicePath))
  369. {
  370. string NotfiyPath = (string)resDevicePath + ConfItem_delim + strKey;
  371. //mLog::FDEBUG("Get Device Path CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str());
  372. m_JudgeNotifyFun(NotfiyPath, strValue);
  373. }
  374. }
  375. }
  376. }break;
  377. case PACKET_CMD_ADD://设备上报错误
  378. {}break;
  379. case PACKET_CMD_DEL://设备消除错误
  380. {}break;
  381. case PACKET_CMD_EXE:
  382. {}break;
  383. case PACKET_CMD_DATA:
  384. {}break;
  385. case PACKET_CMD_MSG:
  386. {}break;
  387. case PACKET_CMD_ONLINE:
  388. {}break;
  389. case PACKET_CMD_PART_UPDATE:
  390. {}break;
  391. }
  392. }
  393. }break;
  394. default:
  395. break;
  396. }
  397. }
  398. }
  399. else
  400. {
  401. //mLog::FERROR("Cannot find Notify Clinet[{$}]", ClientType_Notify);
  402. return CCOSSMRET_FAILED;
  403. }
  404. }
  405. catch (...)
  406. {
  407. //mLog::FERROR("ReadForDeviceEvents crash");
  408. }
  409. return CCOSSMRET_SUCCESS;
  410. }
  411. void StateMachineDevicePool::SplitCcosDevicePath(ResDataObject resTopic, vector<string> &resTopicParams)
  412. {
  413. // 获取路径字符串
  414. std::string path = resTopic.encode(); // CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8
  415. // 使用 stringstream 和 getline 按照 "/" 切割路径
  416. std::stringstream ss(path);
  417. std::string token;
  418. // 清空之前的数据
  419. resTopicParams.clear();
  420. // 按照"/"切割路径并存入resTopicParams
  421. while (std::getline(ss, token, '/')) {
  422. resTopicParams.push_back(token);
  423. }
  424. }
  425. //集成自Ccos_Thread
  426. bool StateMachineDevicePool::Exec()
  427. {
  428. if (m_bNotifyLoop.load())
  429. {
  430. CCOSSTMRET ret = ReadForDeviceEvents();
  431. if (ret != CCOSSMRET_SUCCESS && ret != CCOSSMRET_EXIT)
  432. {
  433. //mLog::FERROR("ReadForDeviceEvents has failed");
  434. }
  435. }
  436. return false;//返回false,Ccos_Thread退出,Exec就不会再次执行了
  437. }
  438. bool StateMachineDevicePool::OnStartThread()
  439. {
  440. m_bNotifyLoop = true;
  441. return true;
  442. }
  443. bool StateMachineDevicePool::OnEndThread()
  444. {
  445. m_bNotifyLoop = false;
  446. return true;
  447. }