StateMachineDevicePool.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #include "stdafx.h"
  2. #include "PacketAnalizer.h"
  3. #include "StateMachineDevicePool.h"
  4. #include "ConditionEvent.h"
  5. #include "LogLocalHelper.h"
  6. #include "Log4CPP.h"
  7. StateMachineDevicePool::StateMachineDevicePool()
  8. {
  9. try {
  10. //用于接收通知的连接
  11. m_pClientNotify.reset(new LogicClient(ClientType_Notify, m_strCurTransaction, "SMN", false));
  12. //用于执行action的连接
  13. m_pClientAction.reset(new LogicClient(ClientType_Action, m_strCurTransaction, "SMA", false));
  14. }
  15. catch (...)
  16. {
  17. FERROR(" Create StateMachineDevicePool crash");//创建失败
  18. }
  19. }
  20. StateMachineDevicePool::~StateMachineDevicePool()
  21. {
  22. m_bNotifyLoop = false;
  23. CloseDevices();
  24. m_DevPathMap.clear();
  25. m_TopicList.clear();
  26. }
  27. //设置退出事件
  28. void StateMachineDevicePool::SetTopExitHandle(std::shared_ptr<LinuxEvent> hand)
  29. {
  30. m_hTopExitHandle = hand;
  31. }
  32. //往设备池子中添加设备
  33. bool StateMachineDevicePool::SetDeviceAttributes(ResDataObject& devices, ResDataObject& attributes, string workStation)
  34. {
  35. bool ret = true;
  36. m_strWS != workStation;
  37. try {//解析设备列表
  38. map<string, string> tempDevMap; //设备类型:设备路径
  39. if (ret)
  40. {
  41. for (int i = 0; i < devices.size(); i++)
  42. {
  43. const char* pDeviceType = devices.GetKey(i);
  44. const char* pDevicePath = (const char*)devices[i];
  45. if (strlen(pDeviceType) == 0)
  46. {
  47. FERROR("The idx[{$}] DeviceType is empty", (int)i);
  48. ret = false;
  49. break;
  50. }
  51. else if (strlen(pDevicePath) == 0)
  52. {
  53. FERROR("The idx[{$}] DevicePath is empty", (int)i);
  54. ret = false;
  55. break;
  56. }
  57. else
  58. {
  59. FDEBUG("add idx[{$}] DevicePath[{$}:{$}] Succced", i, pDeviceType, pDevicePath);
  60. tempDevMap[pDeviceType] = pDevicePath;
  61. }
  62. }
  63. }
  64. //解析属性列表
  65. multimap<string, string> tempAttrMap; //设备类型:属性名
  66. if (ret)
  67. {
  68. for (int i = 0; i < attributes.size(); i++)
  69. {
  70. const char* pAttrType = attributes.GetKey(i);
  71. const char* pAttrKey = (const char*)attributes[i];
  72. if (strlen(pAttrType) == 0)
  73. {
  74. FERROR("The idx[{$}] AttributeType is empty", (int)i);
  75. ret = false;
  76. break;
  77. }
  78. else if (strlen(pAttrKey) == 0)
  79. {
  80. FERROR("The idx[{$}] AttributeKey is empty", (int)i);
  81. ret = false;
  82. break;
  83. }
  84. else
  85. {
  86. FDEBUG("add idx[{$}] AttributeKey[{$}:{$}] Succced", i, pAttrType, pAttrKey);
  87. tempAttrMap.insert(make_pair(pAttrType, pAttrKey));
  88. }
  89. }
  90. }
  91. if (ret)
  92. {
  93. //有选择性地订阅设备上报的属性
  94. if (OpenDevices())
  95. {
  96. for (auto& topic : m_TopicList)
  97. {
  98. m_pClientNotify->SubScribeTopic(topic.c_str(), false);
  99. FDEBUG("unSubScribe[{$}] ok", topic.c_str());
  100. }
  101. m_TopicList.clear();
  102. m_DevPathMap = tempDevMap;
  103. for (auto& iter : tempAttrMap)
  104. {
  105. auto finder = m_DevPathMap.find(iter.first);
  106. if (finder != m_DevPathMap.end())
  107. {
  108. string topic = finder->second + ClientTopic_NotifyInterval + iter.second;
  109. m_pClientNotify->SubScribeTopic(topic.c_str());
  110. FDEBUG("SubScribe[{$}] successed", topic.c_str());
  111. m_TopicList.emplace_back(topic);
  112. }
  113. else
  114. {
  115. string keyAttr = iter.first + ConfItem_delim + iter.second;
  116. FWARN("SubScribe[{$}] failed", keyAttr.c_str());
  117. }
  118. }
  119. }
  120. else
  121. {
  122. FERROR("OpenDevices failed");
  123. ret = false;
  124. }
  125. }
  126. }
  127. catch (...)
  128. {
  129. FERROR("SetDeviceAttributes crash");
  130. }
  131. return ret;
  132. }
  133. //主动获取订阅属性的初始值
  134. void StateMachineDevicePool::GetAttributesInitValue(map<string, string>& initValueMap)
  135. {
  136. #if 0 //GetDeviceResource是非线程安全,暂时不启用
  137. try {
  138. if (m_pClientNotify && !m_pClientNotify->IsClosed())
  139. {
  140. ResDataObject pDeviceResource;
  141. m_pClientNotify->GetDeviceResource(&pDeviceResource);
  142. for (auto item : m_nDevAttributeMap)
  143. {
  144. }
  145. }
  146. }
  147. catch (...)
  148. {
  149. FERROR("GetAttributesInitValue crash");
  150. }
  151. #endif
  152. }
  153. //开始、结束消息循环处理
  154. bool StateMachineDevicePool::BeginNotifyLoop(const JudgeNotifyCallback& funCallback)
  155. {
  156. FDEBUG("Enter BeginNotifyLoop");
  157. m_JudgeNotifyFun = funCallback;
  158. return StartThread();
  159. }
  160. bool StateMachineDevicePool::EndNotifyLoop()
  161. {
  162. FDEBUG("Enter EndNotifyLoop");
  163. StopThread(STATEMACHINE_DefState_TIMEOUT);
  164. m_JudgeNotifyFun = NULL;
  165. return true;
  166. }
  167. /*动作定义; 对应配置项中的Actions
  168. {
  169. "Action": {
  170. "Generator:SetFLFMode": {
  171. "P0": "PF"
  172. },
  173. "Generator:SetValue_PPS": {
  174. "P0": "15"
  175. },
  176. "Detector:SetAcqMode": {
  177. "P0": "PF"
  178. }
  179. }
  180. }
  181. */
  182. CCOSSTMRET StateMachineDevicePool::StateMachineAction(ResDataObject& resActions, ResDataObject& resResult, DWORD Timeout)
  183. {
  184. CCOSSTMRET ret = CCOSSMRET_SUCCESS;
  185. //按顺序投递消息,并接收应答
  186. int nDeviceCount = resActions.size();
  187. try {
  188. if (m_pClientAction && !m_pClientAction->IsClosed())
  189. {
  190. //获取开始时间
  191. DWORD dwStart = GetTickCount();
  192. //顺序执行
  193. for (int devIdx = 0; devIdx < nDeviceCount; devIdx++)
  194. {
  195. string strPathAction = resActions.GetKey(devIdx);
  196. string deviceType = strPathAction.substr(0, strPathAction.find(ConfItem_delim));
  197. auto iter = m_DevPathMap.find(deviceType);
  198. if (iter != m_DevPathMap.end())
  199. {
  200. string actionKey = strPathAction.substr(strPathAction.find(ConfItem_delim) + 1, strPathAction.length());
  201. ResDataObject actionParam = resActions[devIdx];
  202. string actionPath = iter->second;
  203. ResDataObject actionResp;
  204. FDEBUG("try Action[{$}] param[{$}] to[{$}]", strPathAction.c_str(), actionParam.encode(), actionPath.c_str());
  205. int actRes = m_pClientAction->Action(actionKey.c_str(), actionParam, actionResp, STATEMACHINE_Action_TIMEOUT, actionPath.c_str());
  206. switch (actRes)
  207. {
  208. case RET_TIMEOUT:
  209. {
  210. FWARN("Action[{$}] timeout", strPathAction.c_str());
  211. ret = CCOSSMRET_FAILED;
  212. }break;
  213. case RET_SUCCEED:
  214. case RET_ONGOING:
  215. case RET_FINISHED:
  216. {
  217. FDEBUG("Got Resp of [{$}] ret[{$}] successd", strPathAction.c_str(), actRes);
  218. }break;
  219. default:
  220. {
  221. FWARN("Got Resp of [{$}] ret[{$}] failed", strPathAction.c_str(), actRes);
  222. ret = CCOSSMRET_FAILED;
  223. }break;
  224. }
  225. //返回值:DeviceName.ActionName : resResp
  226. resResult.add(strPathAction.c_str(), actionResp);
  227. //超时了,还没执行完
  228. if (GetTickCount() - dwStart > Timeout)
  229. {
  230. FWARN("current State action timeout");
  231. return CCOSSMRET_TIMEOUT;
  232. }
  233. }
  234. }
  235. }
  236. else
  237. {
  238. FERROR("Can't find Action Clinet[{$}]", ClientType_Action);
  239. ret = CCOSSMRET_FAILED;
  240. }
  241. }
  242. catch (...)
  243. {
  244. FERROR("StateMachineAction crash");
  245. }
  246. return ret;
  247. }
  248. //打开所有设备
  249. bool StateMachineDevicePool::OpenDevices()
  250. {
  251. bool ret = true;
  252. try {
  253. if (ret && m_pClientAction) //用于执行action的连接
  254. {
  255. if (m_pClientAction->IsClosed())
  256. {
  257. if (m_pClientAction->Open(ClientPath_Root, ACTION | NOTIFY_ACTION, m_strWS.c_str()) >= RET_SUCCEED)
  258. {
  259. FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Succced...", ClientType_Action, ClientPath_Root);
  260. }
  261. else
  262. {
  263. FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Failed...", ClientType_Action, ClientPath_Root);
  264. ret = false;
  265. }
  266. }
  267. }
  268. if (ret && m_pClientNotify) //用于接收通知的连接
  269. {
  270. if (m_pClientNotify->IsClosed())
  271. {
  272. if (m_pClientNotify->Open(ClientPath_Root, NOTIFY_MSG, m_strWS.c_str()) >= RET_SUCCEED)
  273. {
  274. FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Succced...", ClientType_Notify, ClientPath_Root);
  275. }
  276. else
  277. {
  278. FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Failed...", ClientType_Notify, ClientPath_Root);
  279. ret = false;
  280. }
  281. }
  282. }
  283. }
  284. catch (...)
  285. {
  286. FERROR("OpenDevices crash");
  287. }
  288. if (!ret)
  289. {
  290. m_pClientAction->Close();
  291. m_pClientNotify->Close();
  292. }
  293. return ret;
  294. }
  295. //关闭设备
  296. bool StateMachineDevicePool::CloseDevices()
  297. {
  298. try {
  299. if (m_pClientNotify)
  300. {
  301. m_pClientNotify->Close();
  302. }
  303. if (m_pClientAction)
  304. {
  305. m_pClientAction->Close();
  306. }
  307. }
  308. catch (...)
  309. {
  310. FERROR("CloseDevices crash");
  311. }
  312. return true;
  313. }
  314. //轮询设备组的Notify
  315. CCOSSTMRET StateMachineDevicePool::ReadForDeviceEvents()
  316. {
  317. vector<shared_ptr<LinuxEvent>> waitList; //事件等待列表
  318. try {
  319. if (m_hTopExitHandle && m_pClientNotify && !m_pClientNotify->IsClosed())
  320. {
  321. waitList.push_back(m_hTopExitHandle);
  322. waitList.push_back(GetExitEvt());
  323. waitList.push_back(m_pClientNotify->GetNotifyHandle());
  324. while (m_bNotifyLoop.load())
  325. {
  326. DWORD wait_ret = LinuxEvent::WaitForMultipleEvents(waitList, -1);
  327. switch (wait_ret)
  328. {
  329. case WAIT_TIMEOUT:
  330. {}break;
  331. case WAIT_OBJECT_0://top exit thread
  332. case WAIT_OBJECT_0 + 1://current exit thread
  333. {
  334. return CCOSSMRET_EXIT;
  335. }break;
  336. case WAIT_OBJECT_0 + 2://m_GeneralClientNotify
  337. {
  338. while (m_pClientNotify->IsDataArrived())
  339. {
  340. ResDataObject NotifyData, resTopic, resDevicePath;
  341. PACKET_CMD notifycmd = m_pClientNotify->ReadNotify(NotifyData);
  342. string strKey = PacketAnalizer::GetPacketKey(&NotifyData);
  343. string strValue = (string)NotifyData[ConfItem_CONTEXT];
  344. switch (notifycmd)
  345. {
  346. case PACKET_CMD_OPEN:
  347. {}break;
  348. case PACKET_CMD_CLOSE:
  349. {}break;
  350. case PACKET_CMD_GET:
  351. {}break;
  352. case PACKET_CMD_UPDATE://属性更新
  353. {
  354. //将通知事件上报即可
  355. if (m_JudgeNotifyFun)
  356. {
  357. if (PacketAnalizer::GetPacketTopic(&NotifyData, resTopic))
  358. {
  359. vector<string> resTopicParams;
  360. SplitCcosDevicePath(resTopic, resTopicParams);
  361. if (resTopicParams.size() >= 3)//CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8
  362. {
  363. string NotfiyPath = resTopicParams[2] + ConfItem_delim + strKey;
  364. FDEBUG("Get Device Topic CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str());
  365. m_JudgeNotifyFun(NotfiyPath, strValue);
  366. }
  367. }
  368. else
  369. {
  370. if (m_pClientNotify->GetFilePath(resDevicePath))
  371. {
  372. string NotfiyPath = (string)resDevicePath + ConfItem_delim + strKey;
  373. FDEBUG("Get Device Path CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str());
  374. m_JudgeNotifyFun(NotfiyPath, strValue);
  375. }
  376. }
  377. }
  378. }break;
  379. case PACKET_CMD_ADD://设备上报错误
  380. {}break;
  381. case PACKET_CMD_DEL://设备消除错误
  382. {}break;
  383. case PACKET_CMD_EXE:
  384. {}break;
  385. case PACKET_CMD_DATA:
  386. {}break;
  387. case PACKET_CMD_MSG:
  388. {}break;
  389. case PACKET_CMD_ONLINE:
  390. {}break;
  391. case PACKET_CMD_PART_UPDATE:
  392. {}break;
  393. }
  394. }
  395. }break;
  396. default:
  397. break;
  398. }
  399. }
  400. }
  401. else
  402. {
  403. FERROR("Cannot find Notify Clinet[{$}]", ClientType_Notify);
  404. return CCOSSMRET_FAILED;
  405. }
  406. }
  407. catch (...)
  408. {
  409. FERROR("ReadForDeviceEvents crash");
  410. }
  411. return CCOSSMRET_SUCCESS;
  412. }
  413. void StateMachineDevicePool::SplitCcosDevicePath(ResDataObject resTopic, vector<string> &resTopicParams)
  414. {
  415. // 获取路径字符串
  416. std::string path = resTopic.encode(); // CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8
  417. // 使用 stringstream 和 getline 按照 "/" 切割路径
  418. std::stringstream ss(path);
  419. std::string token;
  420. // 清空之前的数据
  421. resTopicParams.clear();
  422. // 按照"/"切割路径并存入resTopicParams
  423. while (std::getline(ss, token, '/')) {
  424. resTopicParams.push_back(token);
  425. }
  426. }
  427. //集成自Ccos_Thread
  428. bool StateMachineDevicePool::Exec()
  429. {
  430. if (m_bNotifyLoop.load())
  431. {
  432. CCOSSTMRET ret = ReadForDeviceEvents();
  433. if (ret != CCOSSMRET_SUCCESS && ret != CCOSSMRET_EXIT)
  434. {
  435. FERROR("ReadForDeviceEvents has failed");
  436. }
  437. }
  438. return false;//返回false,Ccos_Thread退出,Exec就不会再次执行了
  439. }
  440. bool StateMachineDevicePool::OnStartThread()
  441. {
  442. m_bNotifyLoop = true;
  443. return true;
  444. }
  445. bool StateMachineDevicePool::OnEndThread()
  446. {
  447. m_bNotifyLoop = false;
  448. return true;
  449. }