123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946 |
- #include "StdAfx.h"
- #include "SysIF.h"
- #include "CDInterface.h"
- #include "CcosLock.h"
- #include "LocalConfig.h"
- #include "CcosFileHandle.h"
- #include "PacketAnalizer.h"
- #include "Logger.h"
- #include "mqtt/async_client.h"
- SysIF::SysIF(void)
- {
- m_Lock = (HANDLE)new CcosLock();
- m_InternalLock = (HANDLE)new CcosLock();
- }
- SysIF::~SysIF(void)
- {
- delete (CcosLock*)m_Lock;
- m_Lock = NULL;
- delete (CcosLock*)m_InternalLock;
- m_InternalLock = NULL;
- }
- bool SysIF::Lock(DWORD timeout)
- {
-
- if (((CcosLock*)m_Lock)->Thread_Lock(timeout) == WAIT_OBJECT_0)
- {
- return true;
- }
- return false;
- }
- void SysIF::UnLock()
- {
- ((CcosLock*)m_Lock)->Thread_UnLock();
- }
- bool SysIF::InternalLock(DWORD timeout)
- {
- if (((CcosLock*)m_InternalLock)->Thread_Lock(timeout) == WAIT_OBJECT_0)
- {
- return true;
- }
- return false;
- }
- void SysIF::InternalUnLock()
- {
- ((CcosLock*)m_InternalLock)->Thread_UnLock();
- }
- RET_STATUS HW_ACTION SysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- assert(0);//supposed not call in
- return RET_NOSUPPORT;
- }
- /*
- //-----------------------------Client SYSIF----------------------------------------
- ClientSysIF::ClientSysIF(void)
- {
- m_pRequestList = new MsgVector<DWORD>();
- }
- ClientSysIF::~ClientSysIF(void)
- {
- delete m_pRequestList;
- }
- void ClientSysIF::RegistClientToCDI()
- {
- CDInterface *pcdi = CDInterface::GetCDI();
- if (pcdi)
- {
- LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
- //pcdi->RegistClient((UINT64)pSys);
- }
- }
- void ClientSysIF::UnRegistClientFromCDI()
- {
- CDInterface *pcdi = CDInterface::GetCDI();
- if (pcdi)
- {
- LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
- //pcdi->UnRegistClient((UINT64)pSys);
- }
- }
- RET_STATUS HW_ACTION ClientSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- //kick notify&response
- PACKET_TYPE Type = PacketAnalizer::GetPacketType(pCmd);
- if (Type != PACKET_TYPE_REQ)
- {
- return RET_FAILED;
- }
- //regist packet as req
- DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd);
- InternalLock();
- m_pRequestList->PushBack(Idx);
- //GPRINTA_DEBUG("FromClient Idx: %d", Idx);
- InternalUnLock();
- //find CDI
- CDInterface *pcdi = CDInterface::GetCDI();
- if (pcdi)
- {
- //if (pcdi->ReceivedFromClient(*pCmd))
- {
- return RET_SUCCEED;
- }
- }
- #
- return RET_FAILED;
- }
- //notify to lower layer
- RET_STATUS ClientSysIF::CmdToLogicDev(ResDataObject *pCmd)
- {
- if (m_pLogicDev)
- {
- PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd);
- if (type1 == PACKET_TYPE_RES)
- {
- DWORD Idx = PacketAnalizer::GetPacketIdx(pCmd);
- InternalLock();
- if (m_pRequestList->FindAndClear(Idx) == false)
- {
- //some log here
- InternalUnLock();
- return RET_FAILED;
- }
- InternalUnLock();
- //GPRINTA_DEBUG("ToClient Idx: %d", Idx);
- }
- else if (type1 == PACKET_TYPE_REQ)
- {
- //some log here
- //GPRINTA_DEBUG("ToClient Idx is REQ");
- return RET_FAILED;
- }
- //GPRINTA_DEBUG("ToClient Cmd OK");
- return m_pLogicDev->CmdToLogicDev(pCmd);
- }
- return RET_NOSUPPORT;
- }
- bool ClientSysIF::IsAllRequestFinished()
- {
-
- InternalLock();
- bool ret = (m_pRequestList->Size() == 0);
- InternalUnLock();
- return ret;
- }
- void ClientSysIF::Clear()
- {
- InternalLock();
- m_pRequestList->Clear();
- InternalUnLock();
- }
- */
- //-----------------------------Server SYSIF----------------------------------------
- ServerSysIF::ServerSysIF(void)
- {
- m_pDeviceReqThread = NULL;
- m_pDeviceResThread = NULL;
- }
- ServerSysIF::~ServerSysIF(void)
- {
- }
- RET_STATUS HW_ACTION ServerSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- //all notify
- PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
- if (type == PACKET_TYPE_NOTIFY)
- {
- LogicDeviceSysIF *pSys = (LogicDeviceSysIF*)this;
- //fill up the device info
- if (PacketAnalizer::GetPacketHandleExistance(pCmd) == false)
- {
- if (PacketAnalizer::UpdateDeviceNotifyResponse((*pCmd),
- getLocalMachineId(),
- getLocalEbusId(),
- (UINT64)GetCurrentProcessId(),
- (UINT64)pSys) == false)
- {
- return RET_FAILED;
- }
- }
- //send Packet
- if (m_pDeviceResThread)
- {
- if (m_pDeviceResThread->PushResDataObject(*pCmd))
- {
- return RET_SUCCEED;
- }
- }
- /*
- if (m_pDeviceThread)
- {
- if (m_pDeviceThread->PushResDataObject(*pCmd))
- {
- return RET_SUCCEED;
- }
- }*/
- }
- return RET_FAILED;
- }
- //notify to lower layer
- RET_STATUS ServerSysIF::CmdToLogicDev(ResDataObject *pCmd)
- {
- if (m_pLogicDev)
- {
- //do not use CmdToLogicDev,it's use less for the device
- //only act request packet
- PACKET_TYPE type1 = PacketAnalizer::GetPacketType(pCmd);
- if (type1 == PACKET_TYPE_REQ)
- {
- //do request
- //ResDataObject response;
- //RET_STATUS ret = m_pLogicDev->Request(pCmd, &response);
- //send Packet
- if (m_pDeviceReqThread->PushReqDataObject(*pCmd))
- {
- return RET_SUCCEED;
- }
- /*
- if (m_pDeviceThread->PushReqDataObject(*pCmd))
- {
- return RET_SUCCEED;
- }*/
- }
- }
- return RET_FAILED;
- }
- /*
- void ServerSysIF::SetWorkThread(Dual_Driver_Thread *p)
- {
- m_pDeviceThread = p;
- }*/
- void ServerSysIF::SetWorkThread(Work_Thread* pDeviceReqThread, Work_Thread* pDeviceResThread)
- {
- m_pDeviceReqThread = pDeviceReqThread;
- m_pDeviceResThread = pDeviceResThread;
- }
- /*
- string SERVER_ADDRESS;
- //创建额外连接,需要提供回调函数
- SYSIF_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
- {
- //Logger* pDevLog = GetLogHandle();
- std::cout << "=======Driver Module Init ============" << std::endl;
- //PRINTA_INFO(pDevLog, "=======Driver Module Init ============");
- //连接MQTT broker
- // 开启接受通知 线程
- mqtt::async_client *pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID);
- ccos_mqtt_connection* connection = new ccos_mqtt_connection;
- std::get<0>(*connection) = pMqttClient;
- ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
- std::get<0>(*pfilter) = nullptr;
- std::get<2>(*connection) = pfilter;
- mqtt::connect_options connOpts;
- connOpts.set_clean_session(false);
- connOpts.set_automatic_reconnect(true);
- // Install the callback(s) before connecting.
- //callback *pCallback = new callback(*pMqttClient, connOpts, onmsg);
- //pMqttClient->set_callback(*pCallback);
- auto func = [onmsg,&connection] (mqtt::const_message_ptr msg) -> void {
- ResDataObject req;
- req.decode(msg->get_payload_str().c_str());
- string topic = msg->get_topic().c_str();
- //取消息钩子
- const void* pHook = std::get<2>(*connection);
- if (pHook != nullptr) {
- ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
- ccos_mqtt_msg_filter_func func = std::get<0>(*filter);
- //消息钩子函数存在
- if (func != nullptr) {
- ResDataObject* resObj = std::get<2>(*filter);
- HANDLE hEvent = std::get<1>(*filter);
- if (func(&req)) {
- //勾住了,是该钩子的消息,则通知
- PacketAnalizer::GetPacketContext(&req, *resObj);
- SetEvent(hEvent);
- return ;
- }
- }
- }
- onmsg(&req, topic.c_str());
- };
- std::get<1>(*connection) = &func;
- pMqttClient->set_message_callback( func );
- //std::function<void(const string& cause)>
- pMqttClient->set_connected_handler([](const string& cause) {
- std::cout << "Connecting to the MQTT server Succeeded.." << endl;
- });
- //std::function<void(const properties&, ReasonCode)>
- pMqttClient->set_disconnected_handler([](const mqtt::properties&, mqtt::ReasonCode) {
- std::cout << "MQTT Server Connection lost...." << endl;
- });
- // Start the connection.
- // When completed, the callback will subscribe to topic.
- std::cout << "=======Connecting to the MQTT ============" << std::endl;
- try {
- std::cout << "Connecting to the MQTT server..." << std::flush;
- //PRINTA_INFO(pDevLog, "Connecting to the MQTT server...");
- //
- pMqttClient->connect(connOpts);
- return connection;
- }
- catch (const mqtt::exception& exc) {
- //PRINTA_ERROR(pDevLog, " Unable to connect to MQTT server: %s ", SERVER_ADDRESS);
- std::cerr << "\nERROR: Unable to connect to MQTT server: '"
- << SERVER_ADDRESS << "'" << exc << std::endl;
- }
- delete pfilter;
- delete connection;
- delete pMqttClient;
- return nullptr;
- }
- //关闭并释放连接
- SYSIF_API void CloseConnection(ccos_mqtt_connection* hConnection)
- {
- if (hConnection == nullptr)
- {
- return;
- }
- mqtt::async_client* pconn = (mqtt::async_client *)std::get<0>(*hConnection);
- if (pconn != nullptr) {
- pconn->disconnect();
- delete pconn;
- }
- ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection);
- if(pfilter != nullptr)
- delete pfilter;
-
- delete hConnection;
- }
- //主动订阅主题
- SYSIF_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare )
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- mqtt::subscribe_options opts;
-
- pMqttClient->subscribe(pszTopic, 1, opts);
-
- return 0;
- }
- //主题订阅取消
- SYSIF_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- mqtt::subscribe_options opts;
- auto ret = pMqttClient->unsubscribe(pszTopic);
- return 2;
- }
- //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
- SYSIF_API int PublishMsg(ResDataObject *pCmd, const char* pszTopic, const char* pszSenderName)
- {
- char pszClientID[256];
- sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X",pszSenderName==nullptr?"ANONYMOUS":pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
- ccos_mqtt_connection *connObj = NewConnection(pszClientID, []( ResDataObject*, const char*) {
- });
- mqtt::async_client* pConn = (mqtt::async_client*)std::get<0>(*connObj);
- PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
- pConn->publish(pszTopic, pCmd->encode());
- CloseConnection(connObj);
- return 0;
- }
- //往指定主题发送CCOS协议包整包,使用已创建的连接发送
- SYSIF_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection )
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, pMqttClient->get_client_id().c_str());
-
- pMqttClient->publish(pszTopic, pAction->encode());
- return 2;
- }
- //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
- SYSIF_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection )
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- ResDataObject req;
- //TODO 这里应该添加 组包逻辑,待完善
- //PacketAnalizer::MakeActionRequest(req, );
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
- pMqttClient->publish(pszTopic, req.encode());
- return 2;
- }
- /// <summary>
- /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
- /// 超时没收到应答返回失败,
- /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
- /// </summary>
- /// <param name="pAction">要发送的命令Action名</param>
- /// <param name="pContext">命令携带的参数</param>
- /// <param name="pszTopic">要发送的目标topic</param>
- /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
- /// <param name="resObj">应答返回的参数结果</param>
- /// <param name="dwWaitTime">等待超时时间</param>
- /// <param name="hConnection">复用的MQTT链接句柄</param>
- /// <param name="onmsg">复用的链接的消息处理函数</param>
- /// <returns>成功返回2,其他返回错误码</returns>
- SYSIF_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection,const char* pAction,ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
- ResDataObject& resObj, DWORD dwWaitTime)
- {
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- if (hConnection == nullptr)
- {
- return 0;
- }
-
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- //SubscribeTopic()
- auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](mqtt::const_message_ptr msg) {
- string topic = msg->get_topic().c_str();
- if (strcmp(pszRespTopic, topic.c_str()) == 0) {
- //应答到了,处理
- ResDataObject req;
- req.decode(msg->get_payload_str().c_str());
- PacketAnalizer::GetPacketContext(&req, resObj);
- SetEvent(hEvent);
- //处理结束 将函数指针换回去
- auto oldFuncPointer = std::get<1>(*hConnection);
- pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
- }
- else
- {
- mqtt::async_client::message_handler orgfunc = *(mqtt::async_client::message_handler*)std::get<1>(*hConnection);
- orgfunc(msg);
- }
- };
- pMqttClient->set_message_callback(action_func);
- pMqttClient->subscribe(pszRespTopic, 1);
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
- pMqttClient->publish(pszTopic, req.encode());
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- return 2;
- }
- return 0;
- }
- /// <summary>
- /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
- /// </summary>
- /// <param name="hConnection"></param>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="hEvent"></param>
- /// <param name="dwWaitTime"></param>
- /// <returns></returns>
- SYSIF_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection,const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime )
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- if (!pMqttClient->is_connected()) {
- //PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return 0;
- }
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<2>(*hConnection);
- std::get<1>(*pfilter) = hEvent;
- std::get<2>(*pfilter) = &resObj;
- std::get<0>(*pfilter) = [pAction]( ResDataObject* rsp) -> bool {
- if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
- strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
- return true;
- return false;
- };
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, pMqttClient->get_client_id().c_str());
- pMqttClient->publish(pszTopic, req.encode());
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- return 2;
- }
- return 0;
- }
- /// <summary>
- /// 新建MQTT连接发送Ation并等待应答
- /// </summary>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="pszRespTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="dwWaitTime"></param>
- /// <param name="pszSenderName"></param>
- /// <returns></returns>
- SYSIF_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
- DWORD dwWaitTime, const char* pszSenderName)
- {
- ResDataObject req;
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- //临时创建连接并发送和接收应答
- char pszClientID[256];
- sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if(hEvent == NULL)
- return 0;
- ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic) {
- //应答到了,处理
- PacketAnalizer::GetPacketContext(req, resObj);
- SetEvent(hEvent);
- });
- //发布消息,并等待应答
- PublishAction(&req, pszTopic, connObj);
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- CloseConnection(connObj);
- return 2;
- }
-
- CloseConnection(connObj);
- return 0;
- }
- SYSIF_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection )
- {
- mqtt::async_client* pMqttClient = (mqtt::async_client*)std::get<0>(*hConnection);
- return 0;
- }
- */
- /////////////////////////////////////////////////////////////////////////////
- // Callbacks for the success or failures of requested actions.
- // This could be used to initiate further action, but here we just log the
- // results to the console.
- class action_listener : public virtual mqtt::iaction_listener
- {
- std::string name_;
- void on_failure(const mqtt::token& tok) override {
- std::cout << name_ << " failure";
- if (tok.get_message_id() != 0)
- std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
- std::cout << std::endl;
- }
- void on_success(const mqtt::token& tok) override {
- std::cout << name_ << " success";
- if (tok.get_message_id() != 0)
- std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
- auto top = tok.get_topics();
- if (top && !top->empty())
- std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
- std::cout << std::endl;
- }
- public:
- action_listener(const std::string& name) : name_(name) {}
- };
- /////////////////////////////////////////////////////////////////////////////
- /**
- * Local callback & listener class for use with the client connection.
- * This is primarily intended to receive messages, but it will also monitor
- * the connection to the broker. If the connection is lost, it will attempt
- * to restore the connection and re-subscribe to the topic.
- */
- /*
- class callback : public virtual mqtt::callback,
- public virtual mqtt::iaction_listener
- {
- // Counter for the number of connection retries
- int nretry_;
- // The MQTT client
- mqtt::async_client& cli_;
- // Options to use if we need to reconnect
- mqtt::connect_options& connOpts_;
- // An action listener to display the result of actions.
- action_listener subListener_;
- bool bConnected;
- std::function<void(const ResDataObject*, char*)> onmsg_;
- // This deomonstrates manually reconnecting to the broker by calling
- // connect() again. This is a possibility for an application that keeps
- // a copy of it's original connect_options, or if the app wants to
- // reconnect with different options.
- // Another way this can be done manually, if using the same options, is
- // to just call the async_client::reconnect() method.
- void reconnect() {
- std::this_thread::sleep_for(std::chrono::milliseconds(2500));
- try {
- cli_.connect(connOpts_, nullptr, *this);
- }
- catch (const mqtt::exception& exc) {
- std::cerr << "Error: " << exc.what() << std::endl; {
- //exit(1);
- }
- }
- catch (...) {
- std::cout << "Connect Some Error " << std::endl;
- }
- }
- // Re-connection failure
- void on_failure(const mqtt::token& tok) override {
- std::cout << "Connection attempt failed" << std::endl;
- if (++nretry_ > N_RETRY_ATTEMPTS) {
- //exit(1);
- }
- reconnect();
- }
- // (Re)connection success
- // Either this or connected() can be used for callbacks.
- void on_success(const mqtt::token& tok) override {}
- // (Re)connection success
- void connected(const std::string& cause) override {
- bConnected = true;
- std::cout << "\nConnection success" << std::endl;
- std::cout << "\nSubscribing to topic '" << "'\n"
- << "\tfor client " << DEVICE_ID
- << " using QoS" << QOS << "\n"
- << "\nPress Q<Enter> to quit\n" << std::endl;
- //cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
- //g_pDPCDeviceObject->ReSubscribe();
- }
- // Callback for when the connection is lost.
- // This will initiate the attempt to manually reconnect.
- void connection_lost(const std::string& cause) override {
- std::cout << "\nConnection lost" << std::endl;
- if (!cause.empty())
- std::cout << "\tcause: " << cause << std::endl;
- std::cout << "Reconnecting..." << std::endl;
- nretry_ = 0;
- bConnected = false;
- reconnect();
- }
- // Callback for when a message arrives.
- void message_arrived(mqtt::const_message_ptr msg) override {
- std::cout << "Message arrived" << std::endl;
- std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
- std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
- //订阅的主题到了
- ResDataObject req;
- req.decode(msg->get_payload_str().c_str());
-
- //onmsg_(req, msg->get_topic().c_str());
- //g_pDPCDeviceObject->NOTIFY(msg->get_topic(), msg->to_string());
- }
- void delivery_complete(mqtt::delivery_token_ptr token) override {
- }
- public:
- callback(mqtt::async_client& cli, mqtt::connect_options& connOpts, std::function<void(const ResDataObject*, char*)> onmsg)
- : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription"), onmsg_(onmsg) {
- bConnected = false;
- }
- };*/
- /*
- callback* g_pCallback = NULL;
- string TOPIC_PREFIX;
- string ACTION_TOPIC_PREFIX;*/
- /*
- void LogicDevice::NOTIFY(string topic, string context) {
- Logger* pDevLog = GetLogHandle();
- PRINTA_INFO(pDevLog, "NotifyTopic Come %s :%s", topic, context);
-
- if (ACTION_TOPIC_PREFIX.length() > 0 && topic._Starts_with(ACTION_TOPIC_PREFIX)) {
- // 本设备的Action请求
- std::cout << "Action from Topic " << topic << "request " << context << std::endl;
- // TODO ... 这里应该改成 一个工作线程来处理Action请求
- DispatchAction(topic, context);
- }
- else {
- ResDataObject notify;
- notify.add("topic", topic.c_str());
- notify.add("context", context.c_str());
- string out;
- //m_pMidObject->Action("NotifyTopic", notify.encode(), out);
- }
- }*/
- /*
- void LogicDevice::ReSubscribe() {
-
- Logger* pDevLog = GetLogHandle();
- if (m_pMqttClient->is_connected()) {
- for each (string var in m_strSubTopics)
- {
- PRINTA_INFO(pDevLog, "ReSubscribe topic %s", var);
- m_pMqttClient->subscribe(var, QOS, nullptr, g_pCallback->subListener_);
- }
- }
- else {
- PRINTA_ERROR(pDevLog, " MQTT connection is not available ");
- std::cout << " LogicDevice::ReSubscribe() MQTT connection is not available " << std::endl;
- }
- }*/
- /*
- void LogicDevice::SubscribeOne(string topic, bool unSubscribe) {
- Logger* pDevLog = GetLogHandle();
- if (!m_pMqttClient->is_connected()) {
- PRINTA_ERROR(pDevLog, " MQTT connection lost at subscribe %s ", topic);
- return;
- }
- bool bFind = false;
- for each (string var in m_strSubTopics)
- {
- if (var == topic) {
- bFind = true;
- break;
- }
- }
- if (!bFind && !unSubscribe) {
- PRINTA_INFO(pDevLog, "Subscribe topic %s", topic);
- m_strSubTopics.push_back(topic);
- m_pMqttClient->subscribe(topic, QOS, nullptr, g_pCallback->subListener_);
- }
- if (bFind && unSubscribe) {
- PRINTA_INFO(pDevLog, "UnSubscribe topic %s", topic);
- m_strSubTopics.erase(std::remove(m_strSubTopics.begin(), m_strSubTopics.end(), topic), m_strSubTopics.end());
- m_pMqttClient->unsubscribe(topic);
- }
- }
- */
- /*
- void LogicDevice::SubscribeAction()
- {
- if (ACTION_TOPIC_PREFIX.length() <= 0) {
- ostringstream ostopic;
- ostopic << TOPIC_PREFIX << "Action/";
- ACTION_TOPIC_PREFIX = ostopic.str();
- }
- Logger* pDevLog = GetLogHandle();
- if (m_pMqttClient->is_connected()) {
- // Action Topic
- // Detector/1234xx/Action
-
- string DevRes = m_pMidObject->GetResource();
- //if (m_pMidObject->GetDeviceResource(DevRes))
- {
- ResDataObject LowLayerRes;
- if (LowLayerRes.decode(DevRes.c_str()))
- {
- //attr
- int Idx = LowLayerRes.GetFirstOf("Action");
- if (Idx >= 0)
- {
- ResDataObject actions = LowLayerRes[Idx];
- for (size_t i = 0; i < actions.size(); i++)
- {
- ostringstream ostopic;
- ostopic << ACTION_TOPIC_PREFIX << (const char*)actions.GetKey(i);
- SubscribeOne(ostopic.str());
- }
- }
- else
- {
- PRINTA_INFO(pDevLog, "There is No Action to Subscribe ???? .");
- }
- }
- }
- //m_pMidObject->Action
- }
- }*/
- /////////////////////////////////////////////////////////////////////////////
|