#include "StdAfx.h" #include "SysIF.h" #include "CDInterface.h" #include "CcosLock.h" #include "LocalConfig.h" #include "CcosFileHandle.h" #include "PacketAnalizer.h" #include "Logger.h" #include "mqtt/async_client.h" SysIF::SysIF(void) { m_Lock = (HANDLE)new CcosLock(); m_InternalLock = (HANDLE)new CcosLock(); } SysIF::~SysIF(void) { delete (CcosLock*)m_Lock; m_Lock = NULL; delete (CcosLock*)m_InternalLock; m_InternalLock = NULL; } bool SysIF::Lock(DWORD timeout) { if (((CcosLock*)m_Lock)->Thread_Lock(timeout) == WAIT_OBJECT_0) { return true; } return false; } void SysIF::UnLock() { ((CcosLock*)m_Lock)->Thread_UnLock(); } bool SysIF::InternalLock(DWORD timeout) { if (((CcosLock*)m_InternalLock)->Thread_Lock(timeout) == WAIT_OBJECT_0) { return true; } return false; } void SysIF::InternalUnLock() { ((CcosLock*)m_InternalLock)->Thread_UnLock(); } RET_STATUS HW_ACTION SysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd) { assert(0);//supposed not call in return RET_NOSUPPORT; } /* //-----------------------------Client SYSIF---------------------------------------- ClientSysIF::ClientSysIF(void) { m_pRequestList = new MsgVector(); } ClientSysIF::~ClientSysIF(void) { delete m_pRequestList; } void ClientSysIF::RegistClientToCDI() { CDInterface *pcdi = CDInterface::GetCDI(); if (pcdi) { LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this; //pcdi->RegistClient((UINT64)pSys); } } void ClientSysIF::UnRegistClientFromCDI() { CDInterface *pcdi = CDInterface::GetCDI(); if (pcdi) { LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this; //pcdi->UnRegistClient((UINT64)pSys); } } RET_STATUS HW_ACTION ClientSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd) { //kick notify&response PACKET_TYPE Type = PacketAnalizer::GetPacketType(pCmd); if (Type != PACKET_TYPE_REQ) { return RET_FAILED; } //regist packet as req DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd); InternalLock(); m_pRequestList->PushBack(Idx); //GPRINTA_DEBUG("FromClient Idx: %d", Idx); InternalUnLock(); //find CDI CDInterface *pcdi = CDInterface::GetCDI(); if (pcdi) { //if (pcdi->ReceivedFromClient(*pCmd)) { return RET_SUCCEED; } } # return RET_FAILED; } //notify to lower layer RET_STATUS ClientSysIF::CmdToLogicDev(ResDataObject *pCmd) { if (m_pLogicDev) { PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd); if (type1 == PACKET_TYPE_RES) { DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd); InternalLock(); if (m_pRequestList->FindAndClear(Idx) == false) { //some log here InternalUnLock(); return RET_FAILED; } InternalUnLock(); //GPRINTA_DEBUG("ToClient Idx: %d", Idx); } else if (type1 == PACKET_TYPE_REQ) { //some log here //GPRINTA_DEBUG("ToClient Idx is REQ"); return RET_FAILED; } //GPRINTA_DEBUG("ToClient Cmd OK"); return m_pLogicDev->CmdToLogicDev(pCmd); } return RET_NOSUPPORT; } bool ClientSysIF::IsAllRequestFinished() { InternalLock(); bool ret = (m_pRequestList->Size() == 0); InternalUnLock(); return ret; } void ClientSysIF::Clear() { InternalLock(); m_pRequestList->Clear(); InternalUnLock(); } */ //-----------------------------Server SYSIF---------------------------------------- ServerSysIF::ServerSysIF(void) { m_pDeviceReqThread = NULL; m_pDeviceResThread = NULL; } ServerSysIF::~ServerSysIF(void) { } RET_STATUS HW_ACTION ServerSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd) { //all notify PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd); if (type == PACKET_TYPE_NOTIFY) { LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this; //fill up the device info if (PacketAnalizer::GetPacketHandleExistance(pCmd) == false) { if (PacketAnalizer::UpdateDeviceNotifyResponse((*pCmd), getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)pSys) == false) { return RET_FAILED; } } //send Packet if (m_pDeviceResThread) { if (m_pDeviceResThread->PushResDataObject(*pCmd)) { return RET_SUCCEED; } } /* if (m_pDeviceThread) { if (m_pDeviceThread->PushResDataObject(*pCmd)) { return RET_SUCCEED; } }*/ } return RET_FAILED; } //notify to lower layer RET_STATUS ServerSysIF::CmdToLogicDev(ResDataObject *pCmd) { if (m_pLogicDev) { //do not use CmdToLogicDev,it's use less for the device //only act request packet PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd); if (type1 == PACKET_TYPE_REQ) { //do request //ResDataObject response; //RET_STATUS ret = m_pLogicDev->Request(pCmd, &response); //send Packet if (m_pDeviceReqThread->PushReqDataObject(*pCmd)) { return RET_SUCCEED; } /* if (m_pDeviceThread->PushReqDataObject(*pCmd)) { return RET_SUCCEED; }*/ } } return RET_FAILED; } /* void ServerSysIF::SetWorkThread(Dual_Driver_Thread *p) { m_pDeviceThread = p; }*/ void ServerSysIF::SetWorkThread(Work_Thread* pDeviceReqThread, Work_Thread* pDeviceResThread) { m_pDeviceReqThread = pDeviceReqThread; m_pDeviceResThread = pDeviceResThread; } /* string SERVER_ADDRESS; //创建额外连接,需要提供回调函数 SYSIF_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg) { //Logger* pDevLog = GetLogHandle(); std::cout << "=======Driver Module Init ============" << std::endl; //PRINTA_INFO(pDevLog, "=======Driver Module Init ============"); //连接MQTT broker // 开启接受通知 线程 mqtt::async_client *pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID); ccos_mqtt_connection* connection = new ccos_mqtt_connection; std::get<0>(*connection) = pMqttClient; ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter; std::get<0>(*pfilter) = nullptr; std::get<2>(*connection) = pfilter; mqtt::connect_options connOpts; connOpts.set_clean_session(false); connOpts.set_automatic_reconnect(true); // Install the callback(s) before connecting. //callback *pCallback = new callback(*pMqttClient, connOpts, onmsg); //pMqttClient->set_callback(*pCallback); auto func = [onmsg,&connection] (mqtt::const_message_ptr msg) -> void { ResDataObject req; req.decode(msg->get_payload_str().c_str()); string topic = msg->get_topic().c_str(); //取消息钩子 const void* pHook = std::get<2>(*connection); if (pHook != nullptr) { ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook); ccos_mqtt_msg_filter_func func = std::get<0>(*filter); //消息钩子函数存在 if (func != nullptr) { ResDataObject* resObj = std::get<2>(*filter); HANDLE hEvent = std::get<1>(*filter); if (func(&req)) { //勾住了,是该钩子的消息,则通知 PacketAnalizer::GetPacketContext(&req, *resObj); SetEvent(hEvent); return ; } } } onmsg(&req, topic.c_str()); }; std::get<1>(*connection) = &func; pMqttClient->set_message_callback( func ); //std::function pMqttClient->set_connected_handler([](const string& cause) { std::cout << "Connecting to the MQTT server Succeeded.." << endl; }); //std::function pMqttClient->set_disconnected_handler([](const mqtt::properties&, mqtt::ReasonCode) { std::cout << "MQTT Server Connection lost...." << endl; }); // Start the connection. // When completed, the callback will subscribe to topic. std::cout << "=======Connecting to the MQTT ============" << std::endl; try { std::cout << "Connecting to the MQTT server..." << std::flush; //PRINTA_INFO(pDevLog, "Connecting to the MQTT server..."); // pMqttClient->connect(connOpts); return connection; } catch (const mqtt::exception& exc) { //PRINTA_ERROR(pDevLog, " Unable to connect to MQTT server: %s ", SERVER_ADDRESS); std::cerr << "\nERROR: Unable to connect to MQTT server: '" << SERVER_ADDRESS << "'" << exc << std::endl; } delete pfilter; delete connection; delete pMqttClient; return nullptr; } //关闭并释放连接 SYSIF_API void CloseConnection(ccos_mqtt_connection* hConnection) { if (hConnection == nullptr) { return; } mqtt::async_client* pconn = (mqtt::async_client *)std::get<0>(*hConnection); if (pconn != nullptr) { pconn->disconnect(); delete pconn; } ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection); if(pfilter != nullptr) delete pfilter; delete hConnection; } //主动订阅主题 SYSIF_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare ) { if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } mqtt::subscribe_options opts; pMqttClient->subscribe(pszTopic, 1, opts); return 0; } //主题订阅取消 SYSIF_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic) { if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } mqtt::subscribe_options opts; auto ret = pMqttClient->unsubscribe(pszTopic); return 2; } //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收 SYSIF_API int PublishMsg(ResDataObject *pCmd, const char* pszTopic, const char* pszSenderName) { char pszClientID[256]; sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X",pszSenderName==nullptr?"ANONYMOUS":pszSenderName, GetCurrentProcessId(), GetCurrentThreadId()); ccos_mqtt_connection *connObj = NewConnection(pszClientID, []( ResDataObject*, const char*) { }); mqtt::async_client* pConn = (mqtt::async_client*)std::get<0>(*connObj); PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID); pConn->publish(pszTopic, pCmd->encode()); CloseConnection(connObj); return 0; } //往指定主题发送CCOS协议包整包,使用已创建的连接发送 SYSIF_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection ) { if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, pMqttClient->get_client_id().c_str()); pMqttClient->publish(pszTopic, pAction->encode()); return 2; } //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理 SYSIF_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection ) { if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } ResDataObject req; //TODO 这里应该添加 组包逻辑,待完善 //PacketAnalizer::MakeActionRequest(req, ); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str()); pMqttClient->publish(pszTopic, req.encode()); return 2; } /// /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp, /// 超时没收到应答返回失败, /// 复用链接时须小心,该函数会接管回调函数,结束后恢复 /// /// 要发送的命令Action名 /// 命令携带的参数 /// 要发送的目标topic /// 本次请求的应答接收Topic /// 应答返回的参数结果 /// 等待超时时间 /// 复用的MQTT链接句柄 /// 复用的链接的消息处理函数 /// 成功返回2,其他返回错误码 SYSIF_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection,const char* pAction,ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime) { if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //SubscribeTopic() auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](mqtt::const_message_ptr msg) { string topic = msg->get_topic().c_str(); if (strcmp(pszRespTopic, topic.c_str()) == 0) { //应答到了,处理 ResDataObject req; req.decode(msg->get_payload_str().c_str()); PacketAnalizer::GetPacketContext(&req, resObj); SetEvent(hEvent); //处理结束 将函数指针换回去 auto oldFuncPointer = std::get<1>(*hConnection); pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer); } else { mqtt::async_client::message_handler orgfunc = *(mqtt::async_client::message_handler*)std::get<1>(*hConnection); orgfunc(msg); } }; pMqttClient->set_message_callback(action_func); pMqttClient->subscribe(pszRespTopic, 1); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str()); pMqttClient->publish(pszTopic, req.encode()); DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 return 2; } return 0; } /// /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败, /// /// /// /// /// /// /// /// /// SYSIF_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection,const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime ) { if (hConnection == nullptr) { return 0; } mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); if (!pMqttClient->is_connected()) { //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return 0; } HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection); std::get<1>(*pfilter) = hEvent; std::get<2>(*pfilter) = &resObj; std::get<0>(*pfilter) = [pAction]( ResDataObject* rsp) -> bool { if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES && strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0) return true; return false; }; PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str()); pMqttClient->publish(pszTopic, req.encode()); DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 return 2; } return 0; } /// /// 新建MQTT连接发送Ation并等待应答 /// /// /// /// /// /// /// /// /// SYSIF_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime, const char* pszSenderName) { ResDataObject req; if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); //临时创建连接并发送和接收应答 char pszClientID[256]; sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId()); HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if(hEvent == NULL) return 0; ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic) { //应答到了,处理 PacketAnalizer::GetPacketContext(req, resObj); SetEvent(hEvent); }); //发布消息,并等待应答 PublishAction(&req, pszTopic, connObj); DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 CloseConnection(connObj); return 2; } CloseConnection(connObj); return 0; } SYSIF_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection ) { mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection); return 0; } */ ///////////////////////////////////////////////////////////////////////////// // Callbacks for the success or failures of requested actions. // This could be used to initiate further action, but here we just log the // results to the console. class action_listener : public virtual mqtt::iaction_listener { std::string name_; void on_failure(const mqtt::token& tok) override { std::cout << name_ << " failure"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; std::cout << std::endl; } void on_success(const mqtt::token& tok) override { std::cout << name_ << " success"; if (tok.get_message_id() != 0) std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl; auto top = tok.get_topics(); if (top && !top->empty()) std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl; std::cout << std::endl; } public: action_listener(const std::string& name) : name_(name) {} }; ///////////////////////////////////////////////////////////////////////////// /** * Local callback & listener class for use with the client connection. * This is primarily intended to receive messages, but it will also monitor * the connection to the broker. If the connection is lost, it will attempt * to restore the connection and re-subscribe to the topic. */ /* class callback : public virtual mqtt::callback, public virtual mqtt::iaction_listener { // Counter for the number of connection retries int nretry_; // The MQTT client mqtt::async_client& cli_; // Options to use if we need to reconnect mqtt::connect_options& connOpts_; // An action listener to display the result of actions. action_listener subListener_; bool bConnected; std::function onmsg_; // This deomonstrates manually reconnecting to the broker by calling // connect() again. This is a possibility for an application that keeps // a copy of it's original connect_options, or if the app wants to // reconnect with different options. // Another way this can be done manually, if using the same options, is // to just call the async_client::reconnect() method. void reconnect() { std::this_thread::sleep_for(std::chrono::milliseconds(2500)); try { cli_.connect(connOpts_, nullptr, *this); } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; { //exit(1); } } catch (...) { std::cout << "Connect Some Error " << std::endl; } } // Re-connection failure void on_failure(const mqtt::token& tok) override { std::cout << "Connection attempt failed" << std::endl; if (++nretry_ > N_RETRY_ATTEMPTS) { //exit(1); } reconnect(); } // (Re)connection success // Either this or connected() can be used for callbacks. void on_success(const mqtt::token& tok) override {} // (Re)connection success void connected(const std::string& cause) override { bConnected = true; std::cout << "\nConnection success" << std::endl; std::cout << "\nSubscribing to topic '" << "'\n" << "\tfor client " << DEVICE_ID << " using QoS" << QOS << "\n" << "\nPress Q to quit\n" << std::endl; //cli_.subscribe(TOPIC, QOS, nullptr, subListener_); //g_pDPCDeviceObject->ReSubscribe(); } // Callback for when the connection is lost. // This will initiate the attempt to manually reconnect. void connection_lost(const std::string& cause) override { std::cout << "\nConnection lost" << std::endl; if (!cause.empty()) std::cout << "\tcause: " << cause << std::endl; std::cout << "Reconnecting..." << std::endl; nretry_ = 0; bConnected = false; reconnect(); } // Callback for when a message arrives. void message_arrived(mqtt::const_message_ptr msg) override { std::cout << "Message arrived" << std::endl; std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl; std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl; //订阅的主题到了 ResDataObject req; req.decode(msg->get_payload_str().c_str()); //onmsg_(req, msg->get_topic().c_str()); //g_pDPCDeviceObject->NOTIFY(msg->get_topic(), msg->to_string()); } void delivery_complete(mqtt::delivery_token_ptr token) override { } public: callback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::function onmsg) : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), onmsg_(onmsg) { bConnected = false; } };*/ /* callback* g_pCallback = NULL; string TOPIC_PREFIX; string ACTION_TOPIC_PREFIX;*/ /* void LogicDevice::NOTIFY(string topic, string context) { Logger* pDevLog = GetLogHandle(); PRINTA_INFO(pDevLog, "NotifyTopic Come %s :%s", topic, context); if (ACTION_TOPIC_PREFIX.length() > 0 && topic._Starts_with(ACTION_TOPIC_PREFIX)) { // 本设备的Action请求 std::cout << "Action from Topic " << topic << "request " << context << std::endl; // TODO ... 这里应该改成 一个工作线程来处理Action请求 DispatchAction(topic, context); } else { ResDataObject notify; notify.add("topic", topic.c_str()); notify.add("context", context.c_str()); string out; //m_pMidObject->Action("NotifyTopic", notify.encode(), out); } }*/ /* void LogicDevice::ReSubscribe() { Logger* pDevLog = GetLogHandle(); if (m_pMqttClient->is_connected()) { for each (string var in m_strSubTopics) { PRINTA_INFO(pDevLog, "ReSubscribe topic %s", var); m_pMqttClient->subscribe(var, QOS, nullptr, g_pCallback->subListener_); } } else { PRINTA_ERROR(pDevLog, " MQTT connection is not available "); std::cout << " LogicDevice::ReSubscribe() MQTT connection is not available " << std::endl; } }*/ /* void LogicDevice::SubscribeOne(string topic, bool unSubscribe) { Logger* pDevLog = GetLogHandle(); if (!m_pMqttClient->is_connected()) { PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic); return; } bool bFind = false; for each (string var in m_strSubTopics) { if (var == topic) { bFind = true; break; } } if (!bFind && !unSubscribe) { PRINTA_INFO(pDevLog, "Subscribe topic %s", topic); m_strSubTopics.push_back(topic); m_pMqttClient->subscribe(topic, QOS, nullptr, g_pCallback->subListener_); } if (bFind && unSubscribe) { PRINTA_INFO(pDevLog, "UnSubscribe topic %s", topic); m_strSubTopics.erase(std::remove(m_strSubTopics.begin(), m_strSubTopics.end(), topic), m_strSubTopics.end()); m_pMqttClient->unsubscribe(topic); } } */ /* void LogicDevice::SubscribeAction() { if (ACTION_TOPIC_PREFIX.length() <= 0) { ostringstream ostopic; ostopic << TOPIC_PREFIX << "Action/"; ACTION_TOPIC_PREFIX = ostopic.str(); } Logger* pDevLog = GetLogHandle(); if (m_pMqttClient->is_connected()) { // Action Topic // Detector/1234xx/Action string DevRes = m_pMidObject->GetResource(); //if (m_pMidObject->GetDeviceResource(DevRes)) { ResDataObject LowLayerRes; if (LowLayerRes.decode(DevRes.c_str())) { //attr int Idx = LowLayerRes.GetFirstOf("Action"); if (Idx >= 0) { ResDataObject actions = LowLayerRes[Idx]; for (size_t i = 0; i < actions.size(); i++) { ostringstream ostopic; ostopic << ACTION_TOPIC_PREFIX << (const char*)actions.GetKey(i); SubscribeOne(ostopic.str()); } } else { PRINTA_INFO(pDevLog, "There is No Action to Subscribe ???? ."); } } } //m_pMidObject->Action } }*/ /////////////////////////////////////////////////////////////////////////////