#include "ModuleClient.h" #include "PacketAnalizer.h" #include "devGrpcClient.h" ModuleClient::ModuleClient(string szClientName, string szTransaction, string szType, bool bNeedNotify): LogicClient(szClientName, szTransaction, szType, bNeedNotify) { m_pGrpcClient = nullptr; } ModuleClient::~ModuleClient() { } RET_STATUS ModuleClient::ConnectGrpc(string strDevHost, string devPath) { if (strDevHost.length() <= 0) return RET_NOSUPPORT; int httpPort = atoi(strDevHost.c_str()); if ( 0 != httpPort) { //纯数字,比如9051 //grpc端口:-1 httpPort--; strDevHost = "localhost:" + to_string(httpPort); cout << "Try connect " << strDevHost << endl; } if (m_pGrpcClient != nullptr) FreeClient(m_pGrpcClient); m_pGrpcClient = CreateClient(strDevHost.c_str()); if (devPath.length() > 0) { //mLog::FINFO("Test Open Device {$}", devPath); int ret = m_pGrpcClient->OpenDevice(devPath, ""); //mLog::FINFO("Open Dev Result {$} resource {$}", ret, ret == 2 ? m_pGrpcClient->GetOpendDeviceResource() : "Failed."); } // /* * channel is idle GRPC_CHANNEL_IDLE, 0 channel is connecting GRPC_CHANNEL_CONNECTING, 1 channel is ready for work GRPC_CHANNEL_READY, 2 channel has seen a failure but expects to recover GRPC_CHANNEL_TRANSIENT_FAILURE, 3 channel has seen a failure that it cannot recover from GRPC_CHANNEL_SHUTDOWN 4 */ int status = m_pGrpcClient->GetClientStatus(); if (status <= 2) return RET_SUCCEED; return RET_FAILED; } RET_STATUS ModuleClient::ModuleReq(PACKET_CMD cmd, const char* pszResource, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath) { ResDataObject req; PacketAnalizer::MakeRequest(req, pszResource, cmd, pReqParam); string topic; if (pszDevPath == nullptr) { //默认设备路径 topic = m_strDevicePath; } else { topic = pszDevPath; } topic += "/"; topic += _Packet_Cmd_String[cmd] + "/"; topic += pszResource; PublishAction(&req, topic.c_str(), m_pMqttConn); return RET_SUCCEED; } RET_STATUS ModuleClient::DevGet(const char* pszPropties, ResDataObject& pResponse, const char* pszDevPath) { ResDataObject param; if(m_pGrpcClient == nullptr) return ModuleReq(PACKET_CMD_GET, pszPropties, ¶m, pResponse, pszDevPath); string devRes, resMsg; RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Get(pszPropties, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return RET_FAILED; } RET_STATUS ModuleClient::DevSet(const char* pszPropties, ResDataObject* pNewValue, ResDataObject& pResponse, const char* pszDevPath) { if (m_pGrpcClient == nullptr) return ModuleReq(PACKET_CMD_SET, pszPropties, pNewValue, pResponse, pszDevPath); string reqParam, devRes, resMsg; if (pNewValue->size() > 0) reqParam = pNewValue->encode(); else reqParam = (const char*)(*pNewValue); RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return ret; } RET_STATUS ModuleClient::DevAdd(const char* pszPropties, ResDataObject* pAddValue, ResDataObject& pResponse, const char* pszDevPath) { if (m_pGrpcClient == nullptr) return ModuleReq(PACKET_CMD_ADD, pszPropties, pAddValue , pResponse, pszDevPath); string reqParam, devRes, resMsg; if (pAddValue->size() > 0) reqParam = pAddValue->encode(); else reqParam = (const char*)(*pAddValue); RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return ret; } RET_STATUS ModuleClient::DevDel(const char* pszPropties, ResDataObject* pDelValue, ResDataObject& pResponse, const char* pszDevPath) { if (m_pGrpcClient == nullptr) return ModuleReq(PACKET_CMD_DEL, pszPropties,pDelValue , pResponse, pszDevPath); string reqParam, devRes, resMsg; if (pDelValue->size() > 0) reqParam = pDelValue->encode(); else reqParam = (const char*)(*pDelValue); RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Del(pszPropties, reqParam, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return ret; } RET_STATUS ModuleClient::DevAction(const char* pszActionName, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath) { if (m_pGrpcClient == nullptr) return (RET_STATUS)Action( pszActionName, *pReqParam , pResponse,11000, pszDevPath); string reqParam, devRes, resMsg; if (pReqParam->size() > 0) reqParam = pReqParam->encode(); else reqParam = (const char*)(*pReqParam); RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Action(pszActionName,reqParam, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return ret; } RET_STATUS ModuleClient::DevMessage(const char* pszTopics, ResDataObject* pMessage, ResDataObject& pResponse, const char* pszDevPath) { if (m_pGrpcClient == nullptr) return ModuleReq(PACKET_CMD_MSG, pszTopics, pMessage, pResponse, pszDevPath); string reqParam, devRes, resMsg; if (pMessage->size() > 0) reqParam = pMessage->encode(); else reqParam = (const char*)(*pMessage); RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Message(pszTopics, reqParam, devRes, resMsg, pszDevPath); if (ret == RET_SUCCEED) { pResponse.decode(devRes.c_str()); return ret; } return ret; } int ModuleClient::BeginAyncWait() { try { m_nWaitResultCount = 0; if (m_pGrpcClient != nullptr) return m_pGrpcClient->BeginAyncWait(); } catch (...) { //mLog::FWARN("Exception occurred "); } return 0; } void ModuleClient::WaitAllComplete() { try { if (m_pGrpcClient != nullptr) m_pGrpcClient->WaitAllComplete(); } catch (...) { //mLog::FWARN("Exception occurred "); } } int ModuleClient::GetAsyncResult(int idx, string& devResource, string& devRes, string& calResMsg) { try { if (m_pGrpcClient != nullptr) return m_pGrpcClient->GetAsyncResult(idx, devResource, devRes, calResMsg); } catch (...) { //mLog::FWARN("Exception occurred "); } return 0; } void ModuleClient::EndAync() { try { if (m_pGrpcClient != nullptr) m_pGrpcClient->EndAync(); } catch (...) { //mLog::FWARN("Exception occurred "); } } int ModuleClient::AsyncAction(string devResource, ResDataObject* pReqParam, string devUri, string calSection, string calClientID) { try { string reqParam; if (pReqParam->size() > 0) reqParam = pReqParam->encode(); else reqParam = (const char*)(*pReqParam); if (m_pGrpcClient != nullptr) return m_nWaitResultCount= m_pGrpcClient->AsyncAction(devResource, reqParam, devUri, calSection, calClientID); } catch (...) { //mLog::FWARN("Exception occurred "); } return 0; } int ModuleClient::AsyncMessage(string devResource, ResDataObject* pReqParam, string devUri, string calSection, string calClientID) { try { string reqParam; if (pReqParam->size() > 0) reqParam = pReqParam->encode(); else reqParam = (const char*)(*pReqParam); if (m_pGrpcClient != nullptr) return m_nWaitResultCount = m_pGrpcClient->AsyncMessage(devResource, reqParam, devUri, calSection, calClientID); } catch (...) { //mLog::FWARN("Exception occurred "); } return 0; } int ModuleClient::ClearFilter() { if (!Lock()) return 0; m_mpNotifyIdx.clear(); UnLock(); return 2; } int ModuleClient::InstallFilterNotify(string strKey, bool install) { string topic = m_strDevicePath + "/Notify/" + strKey; SubScribeTopic(topic.c_str(), install); //CCOS/DEVICE/Detector/ecomdemo/demo/1234 /Notify/DetectorStatus std::vector nods; SplitCcosDevicePath(m_strDevicePath, nods); if (nods.size() > 5 && m_strWS.length() > 0) { //CCOS/Table/Detector /Notify/DetectorStatus topic = "CCOS/"; topic += m_strWS + "/" + nods[2] + "/Notify/" + strKey; SubScribeTopic(topic.c_str(), install); } return 2; } PACKET_CMD ModuleClient::ReadFilterNotify(ResDataObject& CmdObject) { PACKET_CMD cmd = PACKET_CMD_NONE; //ResDataObject resPacket; cmd = ReadNotify(CmdObject); if (cmd != PACKET_CMD_NONE ) { PACKET_TYPE type = PacketAnalizer::GetPacketType(&CmdObject); if (type == PACKET_TYPE_NOTIFY) { DWORD idx = PacketAnalizer::GetPacketIdx(&CmdObject); string key = PacketAnalizer::GetPacketKey(&CmdObject); if (Lock()) { auto itf = m_mpNotifyIdx.find(key); if (itf != m_mpNotifyIdx.end()) { if (itf->second != idx) { // idx不同 m_mpNotifyIdx[key] = idx; UnLock(); return cmd; } else { UnLock(); //mLog::FDEBUG("Same Packet no process.. {$} with idx:[{$}]",key, idx); return PACKET_CMD_NONE; } } else { //首次抵达 m_mpNotifyIdx[key] = idx; UnLock(); return cmd; } } else { //mLog::FERROR("Lock timeout for packet {$}", CmdObject.encode()); } } } return PACKET_CMD_NONE; } int ModuleClient::Notify(const char* pszTopic, const char* pszSender, const char* pszType, const char* pszCmd, const char* pMsg) { ResDataObject req,resMsg; resMsg.add("DevUri", pszSender); resMsg.add("Type", pszType); resMsg.add("Message", pMsg); PacketAnalizer::MakeNotify(req, PACKET_CMD_MSG, pszCmd, ""); PacketAnalizer::UpdatePacketContext(req, resMsg); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pszSender ); return PublishAction(&req, pszTopic, m_pMqttConn); } void ModuleClient::SplitCcosDevicePath(string DevicePath, vector& resTopicParams) { // 使用 stringstream 和 getline 按照 "/" 切割路径 std::stringstream ss(DevicePath); std::string token; // 清空之前的数据 resTopicParams.clear(); // 按照"/"切割路径并存入resTopicParams while (std::getline(ss, token, '/')) { resTopicParams.push_back(token); } }