#include "stdafx.h" #include "PacketAnalizer.h" #include "StateMachineDevicePool.h" #include "ConditionEvent.h" StateMachineDevicePool::StateMachineDevicePool() { try { //用于接收通知的连接 m_pClientNotify.reset(new LogicClient(ClientType_Notify, m_strCurTransaction, "SMN", false)); //用于执行action的连接 m_pClientAction.reset(new LogicClient(ClientType_Action, m_strCurTransaction, "SMA", false)); } catch (...) { //mLog::FERROR(" Create StateMachineDevicePool crash");//创建失败 } } StateMachineDevicePool::~StateMachineDevicePool() { m_bNotifyLoop = false; CloseDevices(); m_DevPathMap.clear(); m_TopicList.clear(); } //设置退出事件 void StateMachineDevicePool::SetTopExitHandle(std::shared_ptr hand) { m_hTopExitHandle = hand; } //往设备池子中添加设备 bool StateMachineDevicePool::SetDeviceAttributes(ResDataObject& devices, ResDataObject& attributes, string workStation) { bool ret = true; m_strWS != workStation; try {//解析设备列表 map tempDevMap; //设备类型:设备路径 if (ret) { for (int i = 0; i < devices.size(); i++) { const char* pDeviceType = devices.GetKey(i); const char* pDevicePath = (const char*)devices[i]; if (strlen(pDeviceType) == 0) { //mLog::FERROR("The idx[{$}] DeviceType is empty", (int)i); ret = false; break; } else if (strlen(pDevicePath) == 0) { //mLog::FERROR("The idx[{$}] DevicePath is empty", (int)i); ret = false; break; } else { //mLog::FDEBUG("add idx[{$}] DevicePath[{$}:{$}] Succced", i, pDeviceType, pDevicePath); tempDevMap[pDeviceType] = pDevicePath; } } } //解析属性列表 multimap tempAttrMap; //设备类型:属性名 if (ret) { for (int i = 0; i < attributes.size(); i++) { const char* pAttrType = attributes.GetKey(i); const char* pAttrKey = (const char*)attributes[i]; if (strlen(pAttrType) == 0) { //mLog::FERROR("The idx[{$}] AttributeType is empty", (int)i); ret = false; break; } else if (strlen(pAttrKey) == 0) { //mLog::FERROR("The idx[{$}] AttributeKey is empty", (int)i); ret = false; break; } else { //mLog::FDEBUG("add idx[{$}] AttributeKey[{$}:{$}] Succced", i, pAttrType, pAttrKey); tempAttrMap.insert(make_pair(pAttrType, pAttrKey)); } } } if (ret) { //有选择性地订阅设备上报的属性 if (OpenDevices()) { for (auto& topic : m_TopicList) { m_pClientNotify->SubScribeTopic(topic.c_str(), false); //mLog::FDEBUG("unSubScribe[{$}] ok", topic.c_str()); } m_TopicList.clear(); m_DevPathMap = tempDevMap; for (auto& iter : tempAttrMap) { auto finder = m_DevPathMap.find(iter.first); if (finder != m_DevPathMap.end()) { string topic = finder->second + ClientTopic_NotifyInterval + iter.second; m_pClientNotify->SubScribeTopic(topic.c_str()); //mLog::FDEBUG("SubScribe[{$}] successed", topic.c_str()); m_TopicList.emplace_back(topic); } else { string keyAttr = iter.first + ConfItem_delim + iter.second; //mLog::FWARN("SubScribe[{$}] failed", keyAttr.c_str()); } } } else { //mLog::FERROR("OpenDevices failed"); ret = false; } } } catch (...) { //mLog::FERROR("SetDeviceAttributes crash"); } return ret; } //主动获取订阅属性的初始值 void StateMachineDevicePool::GetAttributesInitValue(map& initValueMap) { #if 0 //GetDeviceResource是非线程安全,暂时不启用 try { if (m_pClientNotify && !m_pClientNotify->IsClosed()) { ResDataObject pDeviceResource; m_pClientNotify->GetDeviceResource(&pDeviceResource); for (auto item : m_nDevAttributeMap) { } } } catch (...) { //mLog::FERROR("GetAttributesInitValue crash"); } #endif } //开始、结束消息循环处理 bool StateMachineDevicePool::BeginNotifyLoop(const JudgeNotifyCallback& funCallback) { //mLog::FDEBUG("Enter BeginNotifyLoop"); m_JudgeNotifyFun = funCallback; return StartThread(); } bool StateMachineDevicePool::EndNotifyLoop() { //mLog::FDEBUG("Enter EndNotifyLoop"); StopThread(STATEMACHINE_DefState_TIMEOUT); m_JudgeNotifyFun = NULL; return true; } /*动作定义; 对应配置项中的Actions { "Action": { "Generator:SetFLFMode": { "P0": "PF" }, "Generator:SetValue_PPS": { "P0": "15" }, "Detector:SetAcqMode": { "P0": "PF" } } } */ CCOSSTMRET StateMachineDevicePool::StateMachineAction(ResDataObject& resActions, ResDataObject& resResult, DWORD Timeout) { CCOSSTMRET ret = CCOSSMRET_SUCCESS; //按顺序投递消息,并接收应答 int nDeviceCount = resActions.size(); try { if (m_pClientAction && !m_pClientAction->IsClosed()) { //获取开始时间 DWORD dwStart = GetTickCount(); //顺序执行 for (int devIdx = 0; devIdx < nDeviceCount; devIdx++) { string strPathAction = resActions.GetKey(devIdx); string deviceType = strPathAction.substr(0, strPathAction.find(ConfItem_delim)); auto iter = m_DevPathMap.find(deviceType); if (iter != m_DevPathMap.end()) { string actionKey = strPathAction.substr(strPathAction.find(ConfItem_delim) + 1, strPathAction.length()); ResDataObject actionParam = resActions[devIdx]; string actionPath = iter->second; ResDataObject actionResp; //mLog::FDEBUG("try Action[{$}] param[{$}] to[{$}]", strPathAction.c_str(), actionParam.encode(), actionPath.c_str()); int actRes = m_pClientAction->Action(actionKey.c_str(), actionParam, actionResp, STATEMACHINE_Action_TIMEOUT, actionPath.c_str()); switch (actRes) { case RET_TIMEOUT: { //mLog::FWARN("Action[{$}] timeout", strPathAction.c_str()); ret = CCOSSMRET_FAILED; }break; case RET_SUCCEED: case RET_ONGOING: case RET_FINISHED: { //mLog::FDEBUG("Got Resp of [{$}] ret[{$}] successd", strPathAction.c_str(), actRes); }break; default: { //mLog::FWARN("Got Resp of [{$}] ret[{$}] failed", strPathAction.c_str(), actRes); ret = CCOSSMRET_FAILED; }break; } //返回值:DeviceName.ActionName : resResp resResult.add(strPathAction.c_str(), actionResp); //超时了,还没执行完 if (GetTickCount() - dwStart > Timeout) { //mLog::FWARN("current State action timeout"); return CCOSSMRET_TIMEOUT; } } } } else { //mLog::FERROR("Can't find Action Clinet[{$}]", ClientType_Action); ret = CCOSSMRET_FAILED; } } catch (...) { //mLog::FERROR("StateMachineAction crash"); } return ret; } //打开所有设备 bool StateMachineDevicePool::OpenDevices() { bool ret = true; try { if (ret && m_pClientAction) //用于执行action的连接 { if (m_pClientAction->IsClosed()) { if (m_pClientAction->Open(ClientPath_Root, ACTION | NOTIFY_ACTION, m_strWS.c_str()) >= RET_SUCCEED) { //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Succced...", ClientType_Action, ClientPath_Root); } else { //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Action Failed...", ClientType_Action, ClientPath_Root); ret = false; } } } if (ret && m_pClientNotify) //用于接收通知的连接 { if (m_pClientNotify->IsClosed()) { if (m_pClientNotify->Open(ClientPath_Root, NOTIFY_MSG, m_strWS.c_str()) >= RET_SUCCEED) { //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Succced...", ClientType_Notify, ClientPath_Root); } else { //mLog::FDEBUG("StateMachine Clinet[{$}] try open[{$}] Notify Failed...", ClientType_Notify, ClientPath_Root); ret = false; } } } } catch (...) { //mLog::FERROR("OpenDevices crash"); } if (!ret) { m_pClientAction->Close(); m_pClientNotify->Close(); } return ret; } //关闭设备 bool StateMachineDevicePool::CloseDevices() { try { if (m_pClientNotify) { m_pClientNotify->Close(); } if (m_pClientAction) { m_pClientAction->Close(); } } catch (...) { //mLog::FERROR("CloseDevices crash"); } return true; } //轮询设备组的Notify CCOSSTMRET StateMachineDevicePool::ReadForDeviceEvents() { vector> waitList; //事件等待列表 try { if (m_hTopExitHandle && m_pClientNotify && !m_pClientNotify->IsClosed()) { waitList.push_back(m_hTopExitHandle); waitList.push_back(GetExitEvt()); waitList.push_back(m_pClientNotify->GetNotifyHandle()); while (m_bNotifyLoop.load()) { DWORD wait_ret = LinuxEvent::WaitForMultipleEvents(waitList, -1); switch (wait_ret) { case WAIT_TIMEOUT: {}break; case WAIT_OBJECT_0://top exit thread case WAIT_OBJECT_0 + 1://current exit thread { return CCOSSMRET_EXIT; }break; case WAIT_OBJECT_0 + 2://m_GeneralClientNotify { while (m_pClientNotify->IsDataArrived()) { ResDataObject NotifyData, resTopic, resDevicePath; PACKET_CMD notifycmd = m_pClientNotify->ReadNotify(NotifyData); string strKey = PacketAnalizer::GetPacketKey(&NotifyData); string strValue = (string)NotifyData[ConfItem_CONTEXT]; switch (notifycmd) { case PACKET_CMD_OPEN: {}break; case PACKET_CMD_CLOSE: {}break; case PACKET_CMD_GET: {}break; case PACKET_CMD_UPDATE://属性更新 { //将通知事件上报即可 if (m_JudgeNotifyFun) { if (PacketAnalizer::GetPacketTopic(&NotifyData, resTopic)) { vector resTopicParams; SplitCcosDevicePath(resTopic, resTopicParams); if (resTopicParams.size() >= 3)//CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8 { string NotfiyPath = resTopicParams[2] + ConfItem_delim + strKey; //mLog::FDEBUG("Get Device Topic CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str()); m_JudgeNotifyFun(NotfiyPath, strValue); } } else { if (m_pClientNotify->GetFilePath(resDevicePath)) { string NotfiyPath = (string)resDevicePath + ConfItem_delim + strKey; //mLog::FDEBUG("Get Device Path CMD_UPDATE[{$}]:[{$}]", NotfiyPath.c_str(), strValue.c_str()); m_JudgeNotifyFun(NotfiyPath, strValue); } } } }break; case PACKET_CMD_ADD://设备上报错误 {}break; case PACKET_CMD_DEL://设备消除错误 {}break; case PACKET_CMD_EXE: {}break; case PACKET_CMD_DATA: {}break; case PACKET_CMD_MSG: {}break; case PACKET_CMD_ONLINE: {}break; case PACKET_CMD_PART_UPDATE: {}break; } } }break; default: break; } } } else { //mLog::FERROR("Cannot find Notify Clinet[{$}]", ClientType_Notify); return CCOSSMRET_FAILED; } } catch (...) { //mLog::FERROR("ReadForDeviceEvents crash"); } return CCOSSMRET_SUCCESS; } void StateMachineDevicePool::SplitCcosDevicePath(ResDataObject resTopic, vector &resTopicParams) { // 获取路径字符串 std::string path = resTopic.encode(); // CCOS/DEVICE/Detector/RunZe/WDB/04DFE5E8 // 使用 stringstream 和 getline 按照 "/" 切割路径 std::stringstream ss(path); std::string token; // 清空之前的数据 resTopicParams.clear(); // 按照"/"切割路径并存入resTopicParams while (std::getline(ss, token, '/')) { resTopicParams.push_back(token); } } //集成自Ccos_Thread bool StateMachineDevicePool::Exec() { if (m_bNotifyLoop.load()) { CCOSSTMRET ret = ReadForDeviceEvents(); if (ret != CCOSSMRET_SUCCESS && ret != CCOSSMRET_EXIT) { //mLog::FERROR("ReadForDeviceEvents has failed"); } } return false;//返回false,Ccos_Thread退出,Exec就不会再次执行了 } bool StateMachineDevicePool::OnStartThread() { m_bNotifyLoop = true; return true; } bool StateMachineDevicePool::OnEndThread() { m_bNotifyLoop = false; return true; }