123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- #include "ModuleClient.h"
- #include "PacketAnalizer.h"
- #include "devGrpcClient.h"
- ModuleClient::ModuleClient(string szClientName, string szTransaction, string szType, bool bNeedNotify):
- LogicClient(szClientName, szTransaction, szType, bNeedNotify)
- {
- m_pGrpcClient = nullptr;
- }
- ModuleClient::~ModuleClient()
- {
- }
- RET_STATUS ModuleClient::ConnectGrpc(string strDevHost, string devPath)
- {
- if (strDevHost.length() <= 0)
- return RET_NOSUPPORT;
- int httpPort = atoi(strDevHost.c_str());
- if ( 0 != httpPort)
- {
- //纯数字,比如9051
- //grpc端口:-1
- httpPort--;
- strDevHost = "localhost:" + to_string(httpPort);
- cout << "Try connect " << strDevHost << endl;
- }
- if (m_pGrpcClient != nullptr)
- FreeClient(m_pGrpcClient);
- m_pGrpcClient = CreateClient(strDevHost.c_str());
- if (devPath.length() > 0)
- {
- //mLog::FINFO("Test Open Device {$}", devPath);
- int ret = m_pGrpcClient->OpenDevice(devPath, "");
- //mLog::FINFO("Open Dev Result {$} resource {$}", ret, ret == 2 ? m_pGrpcClient->GetOpendDeviceResource() : "Failed.");
- }
- //
- /*
- * channel is idle
- GRPC_CHANNEL_IDLE, 0
- channel is connecting
- GRPC_CHANNEL_CONNECTING, 1
- channel is ready for work
- GRPC_CHANNEL_READY, 2
- channel has seen a failure but expects to recover
- GRPC_CHANNEL_TRANSIENT_FAILURE, 3
- channel has seen a failure that it cannot recover from
- GRPC_CHANNEL_SHUTDOWN 4
- */
- int status = m_pGrpcClient->GetClientStatus();
- if (status <= 2)
- return RET_SUCCEED;
- return RET_FAILED;
- }
- RET_STATUS ModuleClient::ModuleReq(PACKET_CMD cmd, const char* pszResource, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath)
- {
- ResDataObject req;
- PacketAnalizer::MakeRequest(req, pszResource, cmd, pReqParam);
- string topic;
- if (pszDevPath == nullptr)
- {
- //默认设备路径
- topic = m_strDevicePath;
- }
- else
- {
- topic = pszDevPath;
- }
- topic += "/";
- topic += _Packet_Cmd_String[cmd] + "/";
- topic += pszResource;
- PublishAction(&req, topic.c_str(), m_pMqttConn);
- return RET_SUCCEED;
- }
- RET_STATUS ModuleClient::DevGet(const char* pszPropties, ResDataObject& pResponse, const char* pszDevPath)
- {
- ResDataObject param;
- if(m_pGrpcClient == nullptr)
- return ModuleReq(PACKET_CMD_GET, pszPropties, ¶m, pResponse, pszDevPath);
- string devRes, resMsg;
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Get(pszPropties, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return RET_FAILED;
- }
- RET_STATUS ModuleClient::DevSet(const char* pszPropties, ResDataObject* pNewValue, ResDataObject& pResponse, const char* pszDevPath)
- {
- if (m_pGrpcClient == nullptr)
- return ModuleReq(PACKET_CMD_SET, pszPropties, pNewValue, pResponse, pszDevPath);
- string reqParam, devRes, resMsg;
- if (pNewValue->size() > 0)
- reqParam = pNewValue->encode();
- else
- reqParam = (const char*)(*pNewValue);
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return ret;
- }
- RET_STATUS ModuleClient::DevAdd(const char* pszPropties, ResDataObject* pAddValue, ResDataObject& pResponse, const char* pszDevPath)
- {
- if (m_pGrpcClient == nullptr)
- return ModuleReq(PACKET_CMD_ADD, pszPropties, pAddValue , pResponse, pszDevPath);
- string reqParam, devRes, resMsg;
- if (pAddValue->size() > 0)
- reqParam = pAddValue->encode();
- else
- reqParam = (const char*)(*pAddValue);
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Add(pszPropties, reqParam, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return ret;
- }
- RET_STATUS ModuleClient::DevDel(const char* pszPropties, ResDataObject* pDelValue, ResDataObject& pResponse, const char* pszDevPath)
- {
- if (m_pGrpcClient == nullptr)
- return ModuleReq(PACKET_CMD_DEL, pszPropties,pDelValue , pResponse, pszDevPath);
- string reqParam, devRes, resMsg;
- if (pDelValue->size() > 0)
- reqParam = pDelValue->encode();
- else
- reqParam = (const char*)(*pDelValue);
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Del(pszPropties, reqParam, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return ret;
- }
- RET_STATUS ModuleClient::DevAction(const char* pszActionName, ResDataObject* pReqParam, ResDataObject& pResponse, const char* pszDevPath)
- {
- if (m_pGrpcClient == nullptr)
- return (RET_STATUS)Action( pszActionName, *pReqParam , pResponse,11000, pszDevPath);
- string reqParam, devRes, resMsg;
- if (pReqParam->size() > 0)
- reqParam = pReqParam->encode();
- else
- reqParam = (const char*)(*pReqParam);
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Action(pszActionName,reqParam, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return ret;
- }
- RET_STATUS ModuleClient::DevMessage(const char* pszTopics, ResDataObject* pMessage, ResDataObject& pResponse, const char* pszDevPath)
- {
- if (m_pGrpcClient == nullptr)
- return ModuleReq(PACKET_CMD_MSG, pszTopics, pMessage, pResponse, pszDevPath);
- string reqParam, devRes, resMsg;
- if (pMessage->size() > 0)
- reqParam = pMessage->encode();
- else
- reqParam = (const char*)(*pMessage);
- RET_STATUS ret = (RET_STATUS)m_pGrpcClient->Message(pszTopics, reqParam, devRes, resMsg, pszDevPath);
- if (ret == RET_SUCCEED)
- {
- pResponse.decode(devRes.c_str());
- return ret;
- }
- return ret;
- }
- int ModuleClient::BeginAyncWait() {
- try
- {
- m_nWaitResultCount = 0;
- if (m_pGrpcClient != nullptr)
- return m_pGrpcClient->BeginAyncWait();
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- return 0;
- }
- void ModuleClient::WaitAllComplete()
- {
- try
- {
- if (m_pGrpcClient != nullptr)
- m_pGrpcClient->WaitAllComplete();
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- }
- int ModuleClient::GetAsyncResult(int idx, string& devResource, string& devRes, string& calResMsg)
- {
- try
- {
- if (m_pGrpcClient != nullptr)
- return m_pGrpcClient->GetAsyncResult(idx, devResource, devRes, calResMsg);
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- return 0;
- }
- void ModuleClient::EndAync()
- {
- try
- {
- if (m_pGrpcClient != nullptr)
- m_pGrpcClient->EndAync();
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- }
- int ModuleClient::AsyncAction(string devResource, ResDataObject* pReqParam,
- string devUri, string calSection, string calClientID)
- {
- try
- {
- string reqParam;
- if (pReqParam->size() > 0)
- reqParam = pReqParam->encode();
- else
- reqParam = (const char*)(*pReqParam);
- if (m_pGrpcClient != nullptr)
- return m_nWaitResultCount= m_pGrpcClient->AsyncAction(devResource, reqParam, devUri, calSection, calClientID);
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- return 0;
- }
- int ModuleClient::AsyncMessage(string devResource, ResDataObject* pReqParam,
- string devUri, string calSection, string calClientID)
- {
- try
- {
- string reqParam;
- if (pReqParam->size() > 0)
- reqParam = pReqParam->encode();
- else
- reqParam = (const char*)(*pReqParam);
- if (m_pGrpcClient != nullptr)
- return m_nWaitResultCount = m_pGrpcClient->AsyncMessage(devResource, reqParam, devUri, calSection, calClientID);
- }
- catch (...)
- {
- //mLog::FWARN("Exception occurred ");
- }
- return 0;
- }
- int ModuleClient::ClearFilter()
- {
- if (!Lock())
- return 0;
- m_mpNotifyIdx.clear();
- UnLock();
- return 2;
- }
- int ModuleClient::InstallFilterNotify(string strKey, bool install)
- {
- string topic = m_strDevicePath + "/Notify/" + strKey;
- SubScribeTopic(topic.c_str(), install);
- //CCOS/DEVICE/Detector/ecomdemo/demo/1234 /Notify/DetectorStatus
- std::vector<string> nods;
- SplitCcosDevicePath(m_strDevicePath, nods);
- if (nods.size() > 5 && m_strWS.length() > 0)
- {
- //CCOS/Table/Detector /Notify/DetectorStatus
- topic = "CCOS/";
- topic += m_strWS + "/" + nods[2] + "/Notify/" + strKey;
- SubScribeTopic(topic.c_str(), install);
- }
- return 2;
- }
- PACKET_CMD ModuleClient::ReadFilterNotify(ResDataObject& CmdObject)
- {
- PACKET_CMD cmd = PACKET_CMD_NONE;
- //ResDataObject resPacket;
- cmd = ReadNotify(CmdObject);
- if (cmd != PACKET_CMD_NONE )
- {
- PACKET_TYPE type = PacketAnalizer::GetPacketType(&CmdObject);
- if (type == PACKET_TYPE_NOTIFY)
- {
- DWORD idx = PacketAnalizer::GetPacketIdx(&CmdObject);
- string key = PacketAnalizer::GetPacketKey(&CmdObject);
- if (Lock())
- {
- auto itf = m_mpNotifyIdx.find(key);
- if (itf != m_mpNotifyIdx.end())
- {
- if (itf->second != idx)
- {
- // idx不同
- m_mpNotifyIdx[key] = idx;
- UnLock();
- return cmd;
- }
- else
- {
- UnLock();
- //mLog::FDEBUG("Same Packet no process.. {$} with idx:[{$}]",key, idx);
- return PACKET_CMD_NONE;
- }
- }
- else
- {
- //首次抵达
- m_mpNotifyIdx[key] = idx;
- UnLock();
- return cmd;
- }
- }
- else
- {
- //mLog::FERROR("Lock timeout for packet {$}", CmdObject.encode());
- }
- }
- }
- return PACKET_CMD_NONE;
- }
- int ModuleClient::Notify(const char* pszTopic, const char* pszSender, const char* pszType, const char* pszCmd, const char* pMsg)
- {
- ResDataObject req,resMsg;
- resMsg.add("DevUri", pszSender);
- resMsg.add("Type", pszType);
- resMsg.add("Message", pMsg);
- PacketAnalizer::MakeNotify(req, PACKET_CMD_MSG, pszCmd, "");
- PacketAnalizer::UpdatePacketContext(req, resMsg);
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pszSender );
- return PublishAction(&req, pszTopic, m_pMqttConn);
-
- }
- void ModuleClient::SplitCcosDevicePath(string DevicePath, vector<string>& resTopicParams)
- {
- // 使用 stringstream 和 getline 按照 "/" 切割路径
- std::stringstream ss(DevicePath);
- std::string token;
- // 清空之前的数据
- resTopicParams.clear();
- // 按照"/"切割路径并存入resTopicParams
- while (std::getline(ss, token, '/')) {
- resTopicParams.push_back(token);
- }
- }
|