// LogicDevice.cpp : 定义 DLL 应用程序的导出函数。 // #include #include #include #include "LogicDevice.h" #include // 用于 offsetof #include "PacketAnalizer.h" #include "MessageInfo.h" #include "common_api.h" #include "LocalConfig.h" #include "Base64.h" #include "SystemLogger.hpp" #include "mqttclient.h" #include "LinuxEvent.h" //Log4CPP::Logger* ////mLog::gLogger = nullptr; void msgarrivd(void* client, message_data_t* message); std::string CurrentDateTime(); //-------------Logic Device SysIF-------------------------- LogicDeviceSysIF::LogicDeviceSysIF(void) { } LogicDeviceSysIF::~LogicDeviceSysIF(void) { } //init void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p) { m_pLogicDev = p; //p->SetSysLogicDevice(this); }; LogicDevice* LogicDeviceSysIF::GetLogicDevice() { return m_pLogicDev; } //Command In and Out //notify from lower layer RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd) { return RET_FAILED; }; //notify to lower layer RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd) { if (m_pLogicDev) { return m_pLogicDev->CmdToLogicDev(pCmd); } return RET_NOSUPPORT; }; const std::string SERVER_ADDRESS("127.0.0.1"); /* string DEVICE_ID; //AS CLIENT_ID //const std::string CLIENT_ID("paho_cpp_async_subcribe"); //const std::string TOPIC("hello"); mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象 const int QOS = 1; const int N_RETRY_ATTEMPTS = 5;*/ using namespace std; LogicDevice* g_pDPCDeviceObject = NULL; string LogHost = ""; //string DEVICE_ID; //-------------Data Logic Device-------------------------- LogicDevice::LogicDevice(void) { m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); uuid_t uuid; char uuid_str[37]; uuid_generate_random(uuid); uuid_unparse(uuid, uuid_str); m_pDevInstance = new char[40]; strncpy(m_pDevInstance, uuid_str, 40); m_pResErrorList = new ResDataObject(); sem_init(&m_SemphRequest, 0, 0); sem_init(&m_SemphPublish, 0, 0); m_pDrvDPC = NULL; g_pDPCDeviceObject = this; m_pMqttConntion = nullptr; //m_strServer = "tcp://localhost:1883"; m_strServer = SERVER_ADDRESS;// "192.168.2.225"; m_strServerPort = "1883"; m_bMqttUseSSL = false; m_strMqttUser = ""; m_strMqttPassword = ""; m_strEBusRoot = ""; m_strCCOSDevicePath = ""; m_pParent = nullptr; m_topicFilter = nullptr; m_pPacketReceivedQue = new MsgQueue(); m_pPacketSendingQue = new MsgQueue(); int x = 0; for ( x = 0; x < sizeof(szPad); x++) szPad[x] = 'A' + x % 26; szPad[x-1] = 0; memset(szPad2, 0, sizeof(szPad2)); m_dwLastPacket = GetTickCount(); } LogicDevice::~LogicDevice(void) { delete []m_pDevInstance; delete m_pResErrorList; sem_destroy(&m_SemphRequest); sem_destroy(&m_SemphPublish); //m_EvtNotify = NULL; delete m_pPacketReceivedQue; delete m_pPacketSendingQue; } void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) { if (m_strClientID.length() > 0) { ////mLog::FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID); return; } ////mLog::FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot); if (pszEBusRoot[0] == '/') { m_strEBusRoot = pszEBusRoot + 1; } else { m_strEBusRoot = pszEBusRoot; } char szKeys[256]; strcpy(szKeys, pszEBusRoot); char* pt = szKeys; while (*pt != 0) { if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-') *pt = '_'; pt++; } m_strClientID = CCOS_CLIENT_ID_PREFIX; if (szKeys[0] == '_') m_strClientID += (szKeys + 1); else m_strClientID += szKeys; if (pszCCOSRoot != nullptr) { ////mLog::FINFO("Set CCOS path: {$}", pszCCOSRoot); m_strCCOSDevicePath = pszCCOSRoot; int nPos = m_strCCOSDevicePath.find('/'); if (nPos != string::npos) { //CCOS/ nPos = m_strCCOSDevicePath.find('/', nPos + 1); //CCOS/DEVICE/ if (nPos != string::npos) { m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos); //CCOS/DEVICE/Generator nPos = m_strCCOSDevicePath.find('/', nPos + 1); if (nPos != string::npos) { m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos); } } } ////mLog::FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath); } ////mLog::FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath); SetName(m_strClientID.c_str()); OnSetClientID(); } void LogicDevice::OnSetClientID() { } void LogicDevice::SubscribeSelf() { if (m_strDevicePath.length() > 0) if (m_strDevicePath.c_str()[0] != '/') m_strDevicePath = "/" + m_strDevicePath; ////mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion); std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl; if (m_strEBusRoot.length() > 0) { SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str()); } if (m_strCCOSDevicePath.length() > 0) { ////mLog::FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath); SubscribeTopic(m_pMqttConntion, (m_strCCOSDevicePath + m_strDevicePath).c_str()); //订阅 } ////mLog::FINFO("AbstractPath [{$}] ", m_strAbstractPath); if (m_strAbstractPath.length() > 0) { SubscribeTopic(m_pMqttConntion, (m_strAbstractPath).c_str()); if(m_strDevicePath.length() > 0) SubscribeTopic(m_pMqttConntion, (m_strAbstractPath + m_strDevicePath).c_str()); } ////mLog::FINFO("CCOSRoot [{$}] ", m_strCCOSRoot); if (m_strCCOSRoot.length() > 0) { SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot).c_str()); //订阅 } SubscribeActions(); } void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe) { if (bSubscribe) SubscribeTopic(m_pMqttConntion, pszTopic); else UnSubscribe(m_pMqttConntion, pszTopic); } void LogicDevice::NotifyDrvThread() { m_EvtNotify->SetEvent(); } std::shared_ptr LogicDevice::GetEvtHandle() { return m_EvtNotify; } //1. init part //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic) //{ // m_pSysLogic = pLogic; //} //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger) //{ // m_pLogger = pLogger; //} // //Logger *LogicDevice::GetLogHandle() //{ // return m_pLogger; //} void LogicDevice::SetDrvDPC(DriverDPC *pDPC) { m_pDrvDPC = pDPC; } DriverDPC *LogicDevice::GetDrvDPC() { return m_pDrvDPC; } RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath) { if (szEbusDevPath == nullptr) return RET_FAILED; for (auto dev : m_subDevices) { if (dev->GetRootPath() == szEbusDevPath) return RET_SUCCEED; } m_subDevices.push_back(pChild); return RET_SUCCEED; } RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath) { if (szCcosDevPath == nullptr) return RET_FAILED; for (auto dev : m_subCcosDevices) { if (dev->GetCcosRootPath() == szCcosDevPath) return RET_SUCCEED; } m_subCcosDevices.push_back(pChild); return RET_SUCCEED; } LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath) { for (auto dev : m_subDevices) { if (dev->GetRootPath() == szEbusDevPath) return dev; } return nullptr; } LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath) { for (auto dev : m_subCcosDevices) { if (dev->GetCcosRootPath() == szCcosDevPath) return dev; } return nullptr; } void SYSTEM_CALL LogicDevice::CompleteInit() { //if (//mLog::gLogger == nullptr) //{ // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; // LogHost = ((string)getLogRootpath()).c_str(); // if (LogHost.length() <= 1) // { // char szName[256]; // sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId()); // LogHost = szName; // } // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice"); // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1); // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice"); // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__); //} //else //{ // string strRoot = ((string)getLogRootpath()).c_str(); // if (strRoot.length() > 1 && strRoot != LogHost) // { // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; // LogHost = strRoot; // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice"); // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1); // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice"); // } //} //string version; //if (GetVersion(version, hMyModule)) // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str()); //else ////mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n"); ////mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID); if (m_strClientID.length() <= 0) { ////mLog::FWARN("No Client name ......" ); std::cout << "No Client name ......." << endl; return; } std::cout << "CompleteInit->NewConnection ......." << endl; m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(), [this](ResDataObject* req, const char* topic, void* conn) { //这里只处理当前层次设备的请求,下一层的直接靠URI来路由 //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答 // 消息处理如果耗时较长,则需要开线程来处理 if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 || strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0) { //主题以ebus或者ccos开头,给我发的消息,进行处理 ProcessSubscribeRequest(req); } else { //我主动订阅的外部模块的消息 ProcessSubscribeMsg(req); } //CmdToLogicDev(req); }); if (m_pMqttConntion != nullptr) { StartThread(); //启动Request线程 SubscribeSelf(); //DWORD dwThreadID = 0; //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID); //////mLog::FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID); } } RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) { PACKET_TYPE type = PacketAnalizer::GetPacketType(req); PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req); switch (type) { case PACKET_TYPE_REQ: PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction); ProcessRequest(req, cmd);//请求 break; case PACKET_TYPE_RES: ProcessResponse(req, cmd);//应答 break; case PACKET_TYPE_NOTIFY: ProcessNotify(req, cmd); //通知 break; } return RET_FAILED; } RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd) { ProcessSubscribeRequest(pCmd); return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd) { //Open命令 if (cmd == PACKET_CMD_OPEN) { /* { "IDX": "40", "TYPE" : "0", "CMD" : "0", "HANDLE" : { "ROUTE": "1", "FLAGS" : "63", "LANG" : "en-US", "HANDLEID" : "0", "OWNERID" : { "EBUSID": "ImageSave", "MACHINEID" : "DESKTOP-FVD53H8", "PROCID" : "35408", "ADDR" : "2631366957696" }, "DEVID": { "EBUSID": "ccosChannel", "MACHINEID" : "", "PROCID" : "0", "ADDR" : "0" } }, "KEY": "\/ccosChannel" ResDataObject resRes, resResponse; ResDataObject resTopic; PacketAnalizer::GetContextTopic(pCmd, resTopic); if (cmd == PACKET_CMD_OPEN ) { //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl; PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion); return RET_SUCCEED; } if (cmd == PACKET_CMD_CLOSE) { //CLOSE 客户端主动断开 } } */ PacketArrived(pCmd); return RET_SUCCEED; } else if (cmd == PACKET_CMD_CLOSE) { ResDataObject resp; InnerOpenClose(*pCmd, resp, false); } else { PacketArrived(pCmd); } return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd) { return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd) { PacketArrived(pCmd); return RET_SUCCEED; } void LogicDevice::PacketArrived(ResDataObject* pRequest) { //m_pPacketReceivedQue->Lock(); ////mLog::FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest)); //if (Thread_Lock(5000) != WAIT_OBJECT_0) //{ // ////mLog::FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest)); // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath); // return ; //} m_pPacketReceivedQue->InQueue(*pRequest); ////mLog::FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest)); //SetEvent(m_EvtNotify);//notify to user long nLast = 0; int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程 if (released <= 0) { ////mLog::FERROR("PacketArrived ReleaseSemaphore failed {$} ", GetLastError()); } //Thread_UnLock(); //m_pPacketReceivedQue->UnLock(); } bool LogicDevice::OnStartThread() { return true; } bool LogicDevice::OnEndThread() { return true; } //单一发送线程,暂时未用 DWORD LogicDevice::Thread_Publish_Thread(void* pPara) { INT ret = 0; int waitevent = 0; LogicDevice* handle = (LogicDevice*)pPara; //prev work usleep(30000); ////mLog::FINFO("Thread Start [{$}]", handle->m_strClientID); while (!handle->m_ExitFlag->Wait(1)) { //do work ResDataObject resSend; DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100); if (wait != WAIT_OBJECT_0) continue; bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend); if (bHasPacket) { PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend); PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend); ////mLog::FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd); if (PACKET_CMD_NONE == cmd) { ////mLog::FDEBUG(" [{$}] Publish what packet {$} ", handle->m_strClientID, resSend.encode()); } if (type == PACKET_TYPE_NOTIFY) { CcosDevFileHandle* pHandle = new CcosDevFileHandle; PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle); ////mLog::FINFO("Notify Transaction: {$}", handle->m_strCurTransaction); PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction); PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion); ////mLog::FDEBUG("Notify: {$}", resSend.encode()); //printf("--> %s \n", pCmd->encode()); PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion); PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion); delete pHandle; } else { ResDataObject resTopic; PacketAnalizer::GetPacketTopic(&resSend, resTopic); //PacketAnalizer::GetPacketTopicGetPacketTopic //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic); ////mLog::FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic); PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion); } } } return 0; } RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons) { ResDataObject req,reqParam; reqParam = pszGroup; PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam); return InnerOpenClose(req, resRespons, true); } RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons) { ResDataObject req, reqParam; reqParam = pszGroup; PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam); return InnerOpenClose(req, resRespons, false); } /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons) { ResDataObject req,reqParam; PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam); return Request(&req, &resRespons); } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszValueSet != nullptr && pszValueSet[0] != 0) { if (pszValueSet[0] == '{') { reqParam.decode(pszValueSet); } else { reqParam = pszValueSet; } } PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam); return Request(&req, &resRespons); } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) { if (pszValueUpdate[0] == '{') { reqParam.decode(pszValueUpdate); } else { reqParam = pszValueUpdate; } } PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam); return Request(&req, &resRespons); } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszValueAdd != nullptr && pszValueAdd[0] != 0) { if (pszValueAdd[0] == '{') { reqParam.decode(pszValueAdd); } else { reqParam = pszValueAdd; } } PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam); return Request(&req, &resRespons); } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszValueDel != nullptr && pszValueDel[0] != 0) { if (pszValueDel[0] == '{') { reqParam.decode(pszValueDel); } else { reqParam = pszValueDel; } } PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam); return Request(&req, &resRespons); } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszParams != nullptr && pszParams[0] != 0) { if (pszParams[0] == '{') { reqParam.decode(pszParams); } else { reqParam = pszParams; } } PacketAnalizer::MakeRequest(req,pszActionName , PACKET_CMD_EXE, &reqParam); ResDataObject resResult; RET_STATUS ret = Request(&req, &resResult); PacketAnalizer::GetPacketContext(&resResult, resRespons); return ret; } /// /// /// /// /// /// /// /// RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons) { ResDataObject req, reqParam; if (pszMessageValue != nullptr && pszMessageValue[0] != 0) { if (pszMessageValue[0] == '{') { reqParam.decode(pszMessageValue); } else { reqParam = pszMessageValue; } } PacketAnalizer::MakeRequest(req,pszTopic , PACKET_CMD_MSG, &reqParam); return Request(&req, &resRespons); } RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose) { if (openOrClose) { ResDataObject resContext; GetDeviceResource(&resp); LogicDevice::GetDeviceResource(&resp); PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction); if (PacketAnalizer::GetPacketContext(&req, resContext)) { //如果有上线参数 if (resContext.GetFirstOf("Online") >= 0) { ////mLog::FINFO("Got Online Request {$}", resContext.encode()); int idx = resContext.GetFirstOf("Online"); //如果有上线参数 if (idx >= 0) { do { string wsName = resContext[idx].encode(); ////mLog::FINFO("SubscribeGroupActions [{$}] ", wsName); SubscribeGroupActions(wsName, true); idx = resContext.GetNextOf("Online", idx); } while (idx >= 0); } resp.update("Online", m_rsOnlineGroup); } } } else { ResDataObject context; if (PacketAnalizer::GetPacketContext(&req, context)) { int idx = context.GetFirstOf("Offline"); //如果有上线参数 if (idx >= 0) { do { string wsName = context[idx].encode(); SubscribeGroupActions(wsName, false); idx = context.GetNextOf("Offline", idx); } while (idx >= 0); } resp.update("Online", m_rsOnlineGroup); } } return RET_SUCCEED; } bool LogicDevice::Exec(void) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 3; // 等待退出信号(3秒超时) DWORD dwRet = 0; dwRet = m_ExitFlag->Wait(3); if (dwRet == WAIT_OBJECT_0) { return false; } // 等待请求信号(3秒超时) int ret = sem_timedwait(&m_SemphRequest, &ts); if (ret == 0) { ////mLog::FDEBUG("[{$}] Got Packet Event ", m_strClientID); ResDataObject req; bool got = true; do { got = m_pPacketReceivedQue->DeQueue(req); if (got) { m_dwLastPacket = GetTickCount(); PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req); std::string keystr = PacketAnalizer::GetPacketKey(&req); ////mLog::FINFO(" {$} Got Request key {$} transaction {$}", // m_strClientID, keystr, m_strCurTransaction); if (cmd == PACKET_CMD_OPEN) { ResDataObject resRes, resResponse; InnerOpenClose(req, resRes, true); PacketAnalizer::MakeOpenResponse(req, resResponse, resRes); resResponse.update("Online", m_rsOnlineGroup); PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction); ResDataObject resTopic; PacketAnalizer::GetContextTopic(&req, resTopic); PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str()); if (keystr.substr(0, 4) == "CCOS") { PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str()); } else { PacketAnalizer::UpdateDeviceNotifyResponse(resResponse, getLocalMachineId(), getLocalEbusId(), (uint64_t)getpid(), (uint64_t)m_pMqttConntion); } PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion); // 发送连接状态通知 ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE, "ConnectionStatus", "1"); PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction); CmdFromLogicDev(&NotifyData); } else { PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction); ResDataObject resPacket; RET_STATUS retStatus = RET_FAILED; if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") { ResDataObject devRes; if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) { LogicDevice::GetDeviceResource(&devRes); PacketAnalizer::UpdatePacketContext(resPacket, devRes); } } else { retStatus = Request(&req, &resPacket); } PacketAnalizer::MakeRetCode(retStatus, &resPacket); PacketAnalizer::CloneTransaction(&req, &resPacket); ResDataObject resTopic; PacketAnalizer::GetContextTopic(&req, resTopic); PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str()); PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion); } } } while (got); // 检查心跳(10分钟无数据发送心跳) if (GetTickCount() - m_dwLastPacket > 600000) { m_dwLastPacket = GetTickCount(); ResDataObject heartBeat; PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE, "HeartBeat", m_strClientID.c_str()); PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion); } return true; } // 检查心跳(即使没有请求) if (GetTickCount() - m_dwLastPacket > 600000) { m_dwLastPacket = GetTickCount(); ResDataObject heartBeat; PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE, "HeartBeat", m_strClientID.c_str()); PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion); } return true; } void SYSTEM_CALL LogicDevice::CompleteUnInit() { StopThread(); } int LogicDevice::GetDevice_Thread_Priority() { return THREAD_PRIORITY_NONE; } RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource) { //pDeviceResource->update("ClientType", DPC_UnitClient); GUID guid; string name; GetDeviceType(guid); guid_2_string(guid, name); pDeviceResource->update("DeviceType", name.c_str()); //Get Unit Type (Unit GUID) if (pDeviceResource->GetFirstOf("LogicDevInstance")<0) { pDeviceResource->add("LogicDevInstance", m_pDevInstance); } //// size_t idx = (*pDeviceResource)["Attribute"].size(); if (idx > 0) { int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList"); if (erroridx < 0) { (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList); } else { (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList; } } else { ResDataObject Attribute; Attribute.add("ErrorList", *m_pResErrorList); pDeviceResource->add("Attribute", Attribute); } // (*pDeviceResource)["Action"].size(); if (pDeviceResource->GetFirstOf("Action") < 0) { pDeviceResource->add("Action", m_Actions); } return RET_SUCCEED; } //notify from lower layer RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd) { PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd); PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd); if (type == PACKET_TYPE_NOTIFY) { CcosDevFileHandle* pHandle = new CcosDevFileHandle; PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle); ////mLog::FDEBUG("Notify Transaction: {$}", m_strCurTransaction); PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction); ; //if (m_strEBusRoot.length() <= 0) //{ // //////mLog::FWARN("EBusRoot is null"); //} PacketAnalizer::UpdateDeviceNotifyResponse( *pCmd, getLocalMachineId(), getLocalEbusId(), static_cast(getpid()), // Linux 上获取进程 ID reinterpret_cast(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型 ); ////mLog::FDEBUG("[{$}] Notify: {$}", m_strClientID, pCmd->encode()); //printf("--> %s \n", pCmd->encode()); PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str()); PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd); PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str()); PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion); if (m_strAbstractPath.length() > 0) { PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion); string realPath,devPath; devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length()); for (int x = 0; x < m_rsOnlineGroup.size(); x++) { if ((int)m_rsOnlineGroup[x] == 1) { realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath; PublishAction(pCmd, realPath.c_str(), m_pMqttConntion); } } } //m_pPacketSendingQue->InQueue(*pCmd); delete pHandle; } return RET_SUCCEED; /* if (pCmd && m_pSysLogic) { return m_pSysLogic->CmdFromLogicDev(pCmd); } //put log here return RET_FAILED;*/ } std::wstring mb2wc_a(const char* mbstr) { std::wstring strVal; // 获取输入字符串的长度 size_t mbstr_len = strlen(mbstr); // 计算转换后宽字符字符串的长度 size_t size = mbstr_len + 1; // 需要多一个字符来存储结尾的 '\0' wchar_t* wcstr = new wchar_t[size]; if (wcstr) { memset(wcstr, 0, size * sizeof(wchar_t)); // mbsrtowcs 返回转换后的字符数 size_t ret = mbsrtowcs(wcstr, &mbstr, size, nullptr); if (ret != (size_t)-1) // mbsrtowcs 返回 (size_t)-1 表示错误 { strVal = wcstr; } delete[] wcstr; } return strVal; } RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType) { string ResBase64, DesBase64; wstring wResUTF = ResInfo; wstring wDesUTF = Description; CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64); CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64); return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType); } RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId) { string ResBase64,DesBase64; wstring wResUTF = mb2wc_a(ResInfo); wstring wDesUTF = mb2wc_a(Description); CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64); CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64); return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId); } RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId) { //int ret = 1; if (Code == 0 || (string)ResInfo == "") { ////mLog::FERROR("Code or ResInfo is empty"); return RET_FAILED; } MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; string strDevInstanceCode = DevInstance; strDevInstanceCode += Code; for (size_t i = 0; i < m_pResErrorList->size(); i++) { string strInstancekey = m_pResErrorList->GetKey(i); if (strInstancekey == strDevInstanceCode) { //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++) //{ // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j); // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"]; // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType) // { //ret = 0; ////mLog::FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType); return RET_SUCCEED; // } //} //ret = 2; //break; } } //if (ret==1) { ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/; //info.GetResDataObject(tempInfo); //ErrorInfo.update(Code, tempInfo); info.GetResDataObject(ErrorInfo); struct timeval tv; struct tm* tm_info; char TimeTag[30]; // 获取当前时间 gettimeofday(&tv, NULL); // 转换为本地时间 tm_info = localtime(&tv.tv_sec); // 格式化时间为字符串 snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld", tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday, tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000); ErrorInfo.add("CreationTime", TimeTag); ErrorInfo.add("AppId", pAppId); ErrorInfo.add("InstanceId", DevInstance); if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可 { m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo); } ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo); PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify); ////mLog::FWARN( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode()); CmdFromLogicDev(&NotifyData); //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } //else if (ret == 2) //{ // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo; // info.GetResDataObject(tempInfo); // ErrorInfo.update(Code, tempInfo); // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可 // { // (*m_pResErrorList)[DevInstance].update(Code, tempInfo); // } // ResNotify.update(DevInstance, ErrorInfo); // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify); // RES_////mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType); // CmdFromLogicDev(&NotifyData); //} //put log here return RET_SUCCEED; } RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId) { AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId); //put log here return RET_FAILED; } RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType) { int index = -1; bool m_bClearAll = false; //trim empty code string string CodeStr = Code; if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "") { CodeStr = ""; Code = CodeStr.c_str(); m_bClearAll = true; } //if ((string)Code == "0" || (string)Code == "") //{ // m_bClearAll = true; //} string strDevInstanceCode = DevInstance; strDevInstanceCode += Code; if (m_bClearAll) { m_pResErrorList->clear(); } else { MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; for (size_t i = 0; i < m_pResErrorList->size(); i++) { string strInstancekey = m_pResErrorList->GetKey(i); if (strInstancekey == strDevInstanceCode) { //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++) //{ // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j); // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"]; // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType) // { // index = (INT)j; // break; // } //} index = (int)i; break; } } } MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/; //info.GetResDataObject(tempInfo); //ErrorInfo.add(Code, tempInfo); info.GetResDataObject(ErrorInfo); ErrorInfo.add("InstanceId", DevInstance); bool result = false; if (index>=0) { //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index); result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str()); } if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE) { ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo); PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify); ////mLog::FWARN( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode()); CmdFromLogicDev(&NotifyData); //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } return RET_SUCCEED; } RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType) { DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType); //put log here return RET_FAILED; } RET_STATUS LogicDevice::EvtProcedure() { return RET_FAILED; } bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId) { ////mLog::FERROR("NOT FINISHED YET"); return false; } RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId) { std::string strResult = ""; //组Context包 //if (m_pLogger) { wstring wContect = mb2wc_a(pContext); CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult); } //Thread_Lock(); //if (NULL != fmt) //{ // va_list marker = NULL; // va_start(marker, fmt); // size_t nLength = _vscprintf(fmt, marker) + 1; // std::vector vBuffer(nLength, '\0'); // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker); // if (nWritten > 0) // { // strResult = &vBuffer[0]; // } // va_end(marker); //} //Thread_UnLock(); ResDataObject SysLogNode; string guidstr; GUID DeviceGuid; //组Log包 if (GetDeviceType(DeviceGuid)) { guid_2_string(DeviceGuid, guidstr); SysLogNode.add("Module", guidstr.c_str()); SysLogNode.add("AppId", pAppId); SysLogNode.add("ThreadId", GetCurrentThreadId()); if (pCode) { SysLogNode.add("BusinessKey", pCode); } else { SysLogNode.add("BusinessKey", ""); } SysLogNode.add("IP", (const char*)getLocalIpAddress()); struct timeval tv; struct tm* tm_info; char TimeTag[30]; // 获取当前时间 gettimeofday(&tv, NULL); // 转换为本地时间 tm_info = localtime(&tv.tv_sec); // 格式化时间为字符串 snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld", tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday, tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000); SysLogNode.add("CreationTime", TimeTag); string strLevel = SysLogLevel2str(Level); SysLogNode.add("Level", strLevel.c_str()); SysLogNode.add("HostName", (const char*)getLocalMachineId()); SysLogNode.add("ProcessName", (const char*)GetModuleTitle()); SysLogNode.add("FreeText", strResult.c_str()); SysLogNode.add("Context", pCode); ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode); CmdFromLogicDev(&NotifyData); //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } else { ////mLog::FWARN("no Guid??"); return RET_FAILED; } //打印LOG switch (Level) { case Syslog_Debug: //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog"); ////mLog::FDEBUG("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Information: ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog"); ////mLog::FINFO("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Warning: //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog"); ////mLog::Warn("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Error: //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog"); ////mLog::FERROR("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Fatal: //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog"); ////mLog::FINFO("SysLog {$} ", SysLogNode.encode()); break; default: ////mLog::FINFO("SysLog {$} " ,SysLogNode.encode() ); break; } return RET_SUCCEED; } RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...) { std::string strResult = ""; //组Context包 //if (m_pLogger) //{ // m_pLogger->Thread_Lock(); // if (NULL != fmt) // { // va_list marker = NULL; // va_start(marker, fmt); // size_t nLength = _vscprintf(fmt, marker) + 1; // std::vector vBuffer(nLength, '\0'); // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker); // if (nWritten > 0) // { // strResult = &vBuffer[0]; // } // va_end(marker); // } // m_pLogger->Thread_UnLock(); //} //else { Thread_Lock(); if (fmt != NULL) { va_list marker; va_start(marker, fmt); // 获取格式化字符串所需的大小(不包括终止符) int nLength = vsnprintf(NULL, 0, fmt, marker); if (nLength >= 0) { std::vector vBuffer(nLength + 1, '\0'); // +1 为终止符 vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker); strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串 } va_end(marker); } Thread_UnLock(); } ResDataObject SysLogNode; string guidstr; GUID DeviceGuid; //组Log包 if (GetDeviceType(DeviceGuid)) { guid_2_string(DeviceGuid, guidstr); SysLogNode.add("Module", guidstr.c_str()); SysLogNode.add("AppId", ""); SysLogNode.add("ThreadId", GetCurrentThreadId()); if (pCode) { SysLogNode.add("BusinessKey", pCode); } else { SysLogNode.add("BusinessKey", ""); } SysLogNode.add("IP", (const char *)getLocalIpAddress()); struct timeval tv; struct tm* tm_info; char TimeTag[30]; // 获取当前时间 gettimeofday(&tv, NULL); // 转换为本地时间 tm_info = localtime(&tv.tv_sec); // 格式化时间为字符串 snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld", tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday, tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000); SysLogNode.add("CreationTime", TimeTag); SysLogNode.add("Level", Level); SysLogNode.add("HostName", (const char *)getLocalMachineId()); SysLogNode.add("ProcessName", (const char *)GetModuleTitle()); SysLogNode.add("FreeText", strResult.c_str()); ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode); CmdFromLogicDev(&NotifyData); //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); if(m_pParent != nullptr) NotifyParent(&NotifyData, "/Notify"); } else { ////mLog::FERROR("no Guid??"); return RET_FAILED; } //打印LOG switch (Level) { case Syslog_Debug: //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog"); ////mLog::FDEBUG("SysLog {$}", SysLogNode.encode()); break; case Syslog_Information: //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog"); ////mLog::FINFO("SysLog {$}", SysLogNode.encode()); break; case Syslog_Warning: //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog"); ////mLog::Warn("SysLog {$}", SysLogNode.encode()); break; case Syslog_Error: //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog"); ////mLog::FERROR("SysLog {$}", SysLogNode.encode()); break; case Syslog_Fatal: //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog"); ////mLog::FINFO("SysLog {$}", SysLogNode.encode()); break; default: ////mLog::FINFO( "SysLog {$}", SysLogNode.encode()); break; } return RET_SUCCEED; } ///////////////////////////////////////////////////////////////////////////// ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg) { return nullptr; } std::string CurrentDateTime() { auto now = std::chrono::system_clock::now(); auto now_us = std::chrono::duration_cast(now.time_since_epoch()); auto now_time_t = std::chrono::system_clock::to_time_t(now); std::tm now_tm{}; localtime_r(&now_time_t, &now_tm); // 线程安全版本 char buf[64]; std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm); std::string now_time_str = buf; // 格式化微秒部分并追加 snprintf(buf, sizeof(buf), ".%06lld", static_cast(now_us.count() % 1000000)); now_time_str += buf; return now_time_str; } static string CurrentDateTime2() { std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); char buf[100] = { 0 }; std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now)); return buf; } bool LogicDevice::GetActions(ResDataObject& resAction) { for (int x = 0; x < m_Actions.size(); x++) { resAction.update(m_Actions.GetKey(x), ""); } return true; } void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection) { if (m_pParent != nullptr) { if (hConnection == nullptr) { hConnection = m_pParent->m_pMqttConntion; } PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection); } } const mqtt_qos_t MQTT_QOS = QOS2; /// /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector /// CCOS/Demo/Detector /// /// void LogicDevice::SubscribeGroupActions(string wsName, int bOnline) { if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0) { int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()]; if (bOnline == lastOnline) { ////mLog::FWARN("Group Abstract is already ? Online {$}", bOnline); return; } } if ( m_strAbstractPath.length() > 0) { std::string pszAction, gpPath; int ret = 0; int num = m_Actions.size(); ////mLog::FINFO("Group Device onLine or offLine {$}", bOnline); gpPath = m_strAbstractPath; gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName); pszAction = gpPath + m_strDevicePath; pszAction += "/Action/+"; SubScribeTopic(pszAction.c_str(), bOnline == 1); if (bOnline) { ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction); } else { ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction); } //for (size_t i = 0; i < num; i++) //{ // pszAction = gpPath + m_strDevicePath; // pszAction += "/Action/"; // pszAction += (m_Actions.GetKey(i)); // SubScribeTopic(pszAction.c_str(), bOnline == 1); // if (bOnline) // { // ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction); // } // else // { // ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction); // } //} m_rsOnlineGroup.update(wsName.c_str(), bOnline); } else { ////mLog::FWARN("try Online but AbstractPath is none {$}", m_strClientID); } } void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase) { string pszAction = devuribBase ; pszAction += "/Get/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Update/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Set/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Add/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Del/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Action/+"; SubscribeTopic(pconn, pszAction.c_str()); pszAction = devuribBase; pszAction += "/Message/+"; SubscribeTopic(pconn, pszAction.c_str()); } void LogicDevice::SubscribeActions() { ////mLog::FINFO("Begain"); if (nullptr == m_pMqttConntion) return; int num = m_Actions.size(); mqtt_client* pMqttClient = (mqtt_client*)std::get(*m_pMqttConntion); mqtt_topic_list* pTopicList = std::get(*m_pMqttConntion); { ResDataObject res,resAction; GetDeviceResource(&res); std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl; resAction = res["Action"]; ////mLog::FINFO("Actions from deviceresource [{$}]", resAction.size()); for (int x = 0; x < resAction.size(); x++) { string action = (string)resAction.GetKey(x); if (action.length() > 0) { ////mLog::FINFO("Action from DeviceResource [{$}]", action); m_Actions.update(action.c_str(), ""); } } num = m_Actions.size(); } std::string pszAction; int ret = 0; SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str()); ////mLog::FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); if (m_strEBusRoot != m_strCCOSDevicePath) { pszAction = m_strCCOSDevicePath + m_strDevicePath; SubscribeModule(m_pMqttConntion, pszAction.c_str()); ////mLog::FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); } if (m_strAbstractPath.length() > 0) { pszAction = m_strAbstractPath + m_strDevicePath; SubscribeModule(m_pMqttConntion, pszAction.c_str()); ////mLog::FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); } //for (size_t i = 0; i < num; i++) //{ // //订阅ebus的路径 // pszAction = m_strEBusRoot; // pszAction += "/Action/"; // pszAction += (m_Actions.GetKey(i)); // SubscribeTopic(m_pMqttConntion, pszAction.c_str()); // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); // // //订阅CCOS设备路径 // if (m_strEBusRoot != m_strCCOSDevicePath) // { // pszAction = m_strCCOSDevicePath + m_strDevicePath; // pszAction += "/Action/"; // pszAction += (m_Actions.GetKey(i)); // SubscribeTopic(m_pMqttConntion, pszAction.c_str()); // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); // } // //抽象设备路径要一直存在 // //订阅抽象设备路径 // if ( m_strAbstractPath.length() > 0) // { // pszAction = m_strAbstractPath + m_strDevicePath; // pszAction += "/Action/"; // pszAction += (m_Actions.GetKey(i)); // SubscribeTopic(m_pMqttConntion, pszAction.c_str()); // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); // } //} //pszAction = m_strEBusRoot; //pszAction += "/Action/UpdateDeviceResource"; ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; //pTopicList->push_back(pszAction); //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd); //////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size()); //*/ } /* void connlost(void* context, char* cause) { //连接中断 ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; std::cout << "Connection Lost .....[" << std::get(*connection) <<"] why?" << endl; printf("\nConnection lost\n"); printf(" cause: %s\n", cause); std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl; } void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc) { ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; std::cout << "Connection disconnected .....[" << std::get(*connection) << "] why?" << endl; } */ //发送成功 //void delivered(void* context, MQTTAsync_token dt) //{ // printf("Message with token value %d delivery confirmed\n", dt); // //deliveredtoken = dt; //} //void connlost(void* context, char* cause) //{ // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause); // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl; // // //连接中断 // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl; // return; // // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; // int rc; // // printf("Reconnecting\n"); // conn_opts.keepAliveInterval = 20; // conn_opts.cleansession = 1; // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) // { // printf("Failed to start connect, return code %d\n", rc); // //finished = 1; // } //} //重连成功 //void onReconnected(void* context, char* cause) //{ // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause); // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl; // // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // mqtt_topic_list* pTopicList = std::get(*connection); // const char* client_id = std::get(*connection); // int rc; // // //printf(" %s \n", std::get(*connection)); // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl; // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl; // // 重新订阅,暂不重新订阅,看看 // // ///* // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" // // "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); // //opts.onSuccess = onSubscribe; // //opts.onFailure = onSubscribeFailure; // //opts.context = client; // auto it = pTopicList->begin(); // while(it != pTopicList->end()) // { // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts); // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc); // it++; // } // //*/ // //} // ////MQTT 连接主动断开连接失败 //void onDisconnectFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Disconnect failed, rc %d\n", response->code); // //disc_finished = 1; //} // ////MQTT 连接主动断开连接成功 //void onDisconnect(void* context, MQTTAsync_successData* response) //{ // //printf("Successful disconnection\n"); // //disc_finished = 1; //} // //void onSubscribe(void* context, MQTTAsync_successData* response) //{ // printf("Subscribe succeeded\n"); // //subscribed = 1; //} // //void onSubscribeFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Subscribe failed, rc %d\n", response->code); // //finished = 1; //} // // //void onConnectFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Connect failed, rc %d\n", response->code); // //finished = 1; //} //void onConnect(void* context, MQTTAsync_successData* response) //{ // // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // // std::cout << "[" << std::get(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl; // // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // int rc; // // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection)); // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection); // if (hConnected != NULL) // SetEvent(hConnected); // // /* // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" // "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); // opts.onSuccess = onSubscribe; // opts.onFailure = onSubscribeFailure; // opts.context = client; // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) // { // printf("Failed to start subscribe, return code %d\n", rc); // finished = 1; // }*/ //} //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理 int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS); void resubscribe_topic(void* client, void* reconnect_date) { mqtt_client* pClient = (mqtt_client*)client; ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context; mqtt_topic_list* pTopicList = std::get(*connection); string client_id = std::get(*connection); ////mLog::FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size()); CcosLock* pLock = std::get(*connection); pLock->Thread_Lock(); for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void { ////mLog::FWARN("Resubscribe {$}", str); mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd); }); pLock->Thread_UnLock(); } //MQTT消息抵达 //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) void msgarrivd(void* client, message_data_t* message) { if (client == nullptr) { //printf("************************ somthing happend..."); return; } //消息抵达 ResDataObject req; string topic = strlen( message->topic_name) > sizeof(message->topic_name)-1 ? string(message->topic_name, sizeof(message->topic_name) - 1) : message->topic_name; //topicName; //msg->get_topic().c_str(); mqtt_client* pClient = (mqtt_client*)client; ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context; CcosLock* pLock = std::get(*connection); if (pLock == nullptr) return; pLock->Thread_Lock(); //再次取Lock,如果发现是空了,什么都不干 CcosLock* pLockTemp = std::get(*connection); if (pLockTemp == nullptr) { pLock->Thread_UnLock(); return; } string client_id = std::get(*connection); void* pc = std::get(*connection); if (pc == nullptr) { //printf("mqtt is closed now..\n"); pLock->Thread_UnLock(); return; } std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl; //std::cout << " msg Body " << payload << endl; if ((topic.length() >= MQTT_TOPIC_LEN_MAX && message->message->payloadlen > 16380) || message->message->payloadlen > 512*1024) { //std::string payloads((const char*)message->message->payload, 128); ////mLog::FWARN("TID {$} : {$} Get Bad Msg Len[{$}] from {$}", GetCurrentThreadId(), client_id, message->message->payloadlen, topic.substr(0, 255)); pLock->Thread_UnLock(); return; } if((int)message->message->payloadlen > 16380) std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get big Msg from " << topic << " msg Len " << message->message->payloadlen << /*" msg Body " << payload <<*/ endl; ////mLog::FWARN("TID {$} : {$} Get big Msg from {$} msg Len {$}", GetCurrentThreadId(), client_id, topic, message->message->payloadlen); ////mLog::FDEBUG("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload.substr(0, 1024)); const char* pclient_id = client_id.c_str(); try { std::string strPayload((const char*)message->message->payload, message->message->payloadlen); req.decode(strPayload.c_str()); } catch (...) { pLock->Thread_UnLock(); ////mLog::FWARN("Decode Exception.. {$}", payload); return; } //取消息钩子 void* pHook = std::get(*connection); if (pHook != nullptr) { ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook); ccos_mqtt_msg_filter_func func = std::get(*filter); //std::get(*filter) = nullptr; std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl; ////mLog::FDEBUG("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic); //消息钩子函数存在 ResDataObject *pRequests = std::get(*filter); for(int x=0;xsize();x++) { //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc; std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl; ////mLog::FDEBUG("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic); string keyAction = pRequests->GetKey(x); ResDataObject resObj = (*pRequests)[x]; std::shared_ptr hEvent = std::get(*filter); if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES && keyAction == PacketAnalizer::GetPacketKey(&req)) { //勾住了,是该钩子的消息,则通知 //std::get(*filter) = nullptr; std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl; hEvent = LinuxEvent::OpenEvent(string(resObj).c_str()); ////mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} Notify Event [{$}] Handle [{$}]", GetCurrentThreadId(), client_id, topic, string(resObj), hEvent); //PacketAnalizer::GetPacketContext(&req, *resObj); ResDataObject* pResponse = std::get(*filter); pResponse->add(keyAction.c_str(), req); if (!hEvent) { std::cerr << "Failed to open shared event" << std::endl; } else { hEvent->SetEvent(); } //CloseHandle(hEvent); pRequests->eraseOneOf(keyAction.c_str(), 0); pLock->Thread_UnLock(); //MQTTAsync_freeMessage(&message); //MQTTAsync_free(topicName); return ; } } } std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl; pLock->Thread_UnLock(); ccos_mqtt_callback onmsg = std::get(*connection); if (onmsg != nullptr) (onmsg)(&req, topic.c_str(), connection); else { ////mLog::FWARN("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get(*connection), topic); cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get(*connection) << " When processing " << topic << endl; } //MQTTAsync_freeMessage(&message); //MQTTAsync_free(topicName); return ; } void* MqttSendThreadFunc(void* pPara) { ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara; CcosLock* pLock = std::get(*connection); mqtt_msg_list* pList = std::get(*connection); mqtt_client* pConn = (mqtt_client*)std::get(*connection); std::string client_id = std::get(*connection); sem_t* hSemaphore = std::get(*connection); // 线程启动日志 std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl; while (true) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; // 等待信号量前的日志 //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl; if (sem_timedwait(hSemaphore, &ts) == 0) { std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl; pLock->Thread_Lock(); std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl; if (pList->empty()) { std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl; pLock->Thread_UnLock(); continue; } ResDataObject* pMsg = pList->front(); pList->pop_front(); std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl; for (int x = 0; x < pMsg->size(); x++) { const char* pTopic = pMsg->GetKey(x); std::string message = (std::string)(*pMsg)[pTopic]; std::cout << "[" << client_id << "] Prepare to publish messages to the topic: " << pTopic << ", Message length: " << message.length() << "Byte" << std::endl; mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)message.c_str(); msg.payloadlen = message.length(); msg.qos = QOS1; // 发布消息 int publishResult = mqtt_publish(pConn, pTopic, &msg); if (publishResult == 0) { std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl; } else { std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic << ", Error code: " << publishResult << std::endl; } } delete pMsg; std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl; pLock->Thread_UnLock(); std::cout << "[" << client_id << "] Release the sending lock" << std::endl; } else if (errno == ETIMEDOUT) { //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl; } else { // 其他错误情况 std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl; } } return NULL; } // 安全遍历MQTT消息处理程序的宏 #define mqtt_list_for_each_entry_safe(pos, n, head, member) \ for (pos = container_of((head)->next, typeof(*pos), member), \ n = container_of(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = n, n = container_of(n->member.next, typeof(*n), member)) // 通过成员指针获取包含结构的指针 #define container_of(ptr, type, member) ({ \ const typeof(((type*)0)->member)* __mptr = (ptr); \ (type*)((char*)__mptr - offsetof(type, member)); }) // 重连回调函数 - 实现自动重订阅 static void reconnect_handler(void* client, void* reconnect_data) { mqtt_client_t* c = (mqtt_client_t*)client; message_handlers_t* pos, * n; ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context; // 遍历所有已订阅的主题并重新订阅 CcosLock* pLock = std::get(*connection); pLock->Thread_Lock(); mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) { mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler); } pLock->Thread_UnLock(); } mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) { std::cout << "[MQTT] Acquiring MQTT client instance..." << std::endl; mqtt_client_t* pMqttClient = mqtt_lease(); if (!pMqttClient) { std::cerr << "[ERROR] Failed to acquire MQTT client instance" << std::endl; return nullptr; } std::cout << "[SUCCESS] MQTT client acquired: " << pMqttClient << std::endl; // 确保 connection 指针有效(避免后续访问悬空指针) if (!connection) { std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl; mqtt_release(pMqttClient); // 释放客户端 return nullptr; } std::get(*connection) = pMqttClient; pMqttClient->mqtt_conn_context = connection; const char* pszClientID = std::get(*connection); std::cout << "[CONFIG] Setting connection parameters:" << "\n\tClient ID: " << (pszClientID ? pszClientID : "null") << "\n\tHost: " << SERVER_ADDRESS << "\n\tPort: 1883" << std::endl; // 设置MQTT连接参数 std::string host_str = SERVER_ADDRESS; char* host_buf = new char[host_str.size() + 1]; strncpy(host_buf, host_str.c_str(), host_str.size() + 1); mqtt_set_host(pMqttClient, host_buf); char port_buf[6] = "1883"; uint8_t clean_session = 1; uint16_t keep_alive = 50; mqtt_set_port(pMqttClient, port_buf); mqtt_set_user_name(pMqttClient, const_cast("zskkcc")); mqtt_set_password(pMqttClient, const_cast("zskk1234")); mqtt_set_client_id(pMqttClient, (char*)pszClientID); mqtt_set_clean_session(pMqttClient, 0); mqtt_set_keep_alive_interval(pMqttClient, keep_alive); mqtt_set_cmd_timeout(pMqttClient, 5000); mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); // 设置重连回调(替代mqtt_set_resubscribe_handler) //mqtt_subscribe(); mqtt_set_reconnect_handler(pMqttClient, reconnect_handler); int rc = 0; uint64_t start_time = GetTickCount(); const uint64_t timeout_ms = 10000; // 10秒超时 int attempt_count = 0; std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl; try { while (true) { attempt_count++; std::cout << "[ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl; rc = mqtt_connect(pMqttClient); if (rc != MQTT_SUCCESS_ERROR) { uint64_t elapsed = GetTickCount() - start_time; std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl; if (elapsed > timeout_ms) { // 超时处理 std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl; delete[] host_buf; mqtt_release(pMqttClient); return nullptr; } // 计算剩余时间 uint64_t remaining = timeout_ms - elapsed; uint64_t wait_time = (remaining > 2000) ? 2000 : remaining; std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl; usleep(wait_time * 1000); } else { uint64_t total_time = GetTickCount() - start_time; std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl; break; } } } catch (const std::exception& e) { std::cerr << "[ERROR] " << e.what() << std::endl; delete[] host_buf; mqtt_release(pMqttClient); return nullptr; } return pMqttClient; } ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg) { ////mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword); std::cout << "TID " << GetCurrentThreadId() << " : " << pszClientID << " NewConnection " << pszServer << ":" << pszServerPort << " user: " << pszUserName << " password " << pszPassword << std::endl; //连接MQTT broker DWORD dwTick = GetTickCount(); ccos_mqtt_connection* connection = new ccos_mqtt_connection; // 初始化元组字段 std::get(*connection) = new mqtt_topic_list; std::get(*connection) = pszClientID; std::get(*connection) = onmsg; CcosLock* pLock = new CcosLock(); std::get(*connection) = pLock; pLock->Thread_Lock(); ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter; std::get(*pfilter) = nullptr; std::get(*pfilter) = new ResDataObject; std::get(*pfilter) = new ResDataObject; std::get(*connection) = pfilter; std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl; mqtt_client* pMqttClient = InnerConnect(connection); if (pMqttClient == nullptr) { pLock->Thread_UnLock(); delete connection; delete pfilter; return nullptr; } std::get(*connection) = pMqttClient; //pMqttClient->mqtt_conn_context = connection; //设置MQTT连接参数 //mqtt_set_port(pMqttClient, (char*)pszServerPort); //mqtt_set_host(pMqttClient, (char*)pszServer); //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc); //mqtt_set_client_id(pMqttClient, (char*)pszClientID); //mqtt_set_user_name(pMqttClient, (char*)pszUserName); //mqtt_set_password(pMqttClient, (char*)pszPassword); //mqtt_set_clean_session(pMqttClient, 1); //mqtt_set_version(pMqttClient, 5); //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic); //mqtt_set_write_buf_size(pMqttClient, 8192); //mqtt_set_read_buf_size(pMqttClient, 8192); //mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); //mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); //if (pszUserName != nullptr && strlen(pszUserName) > 0) // conn_opts.username = pszUserName; //if (pszPassword != nullptr && strlen(pszPassword) > 0) // conn_opts.password = pszPassword; //设置回调函数 /* int rc = 0; dwTick = GetTickCount(); while (true) { int rc = mqtt_connect(pMqttClient); if (rc != MQTT_SUCCESS_ERROR) { //默认10s内尝试 if (GetTickCount() - dwTick > 10000) { ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc); pLock->Thread_UnLock(); delete connection; delete pfilter; return nullptr; } else { ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl; Sleep(2000); } } else { break; } } */ std::get(*connection) = new mqtt_msg_list; // 创建POSIX信号量 sem_t* semaphore = new sem_t; sem_init(semaphore, 0, 0); std::get(*connection) = semaphore; // 创建发送线程 pthread_t threadId; if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) { std::cerr << "Failed to create MQTT send thread" << std::endl; // 错误处理... sem_destroy(semaphore); delete semaphore; pLock->Thread_UnLock(); delete connection; return nullptr; } std::get(*connection) = threadId; std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl; pLock->Thread_UnLock(); uint64_t dwWaitTick = GetTickCount() - dwTick; std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: " << dwWaitTick << " ms" << std::endl; return connection; } //创建额外连接,需要提供回调函数 LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async) { DWORD dwTick = GetTickCount(); ////mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883); //mqtt_client* pMqttClient = nullptr; //pMqttClient = mqtt_lease(); ccos_mqtt_connection* connection = new ccos_mqtt_connection; //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL); //std::get(*connection) = pMqttClient; mqtt_topic_list* pTopicList = new std::list; std::get(*connection) = pTopicList; CcosLock* pLock = new CcosLock(); std::get(*connection) = pLock; pLock->Thread_Lock(); ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter; std::get(*pfilter) = nullptr; std::get(*connection) = pfilter; std::get(*pfilter) = new ResDataObject; std::get(*pfilter) = new ResDataObject; std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl; std::get(*connection) = pszClientID; std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg; mqtt_client* pMqttClient = InnerConnect(connection); if (pMqttClient == nullptr) { pLock->Thread_UnLock(); delete connection; delete pfilter; return nullptr; } std::get(*connection) = pMqttClient; //pMqttClient->mqtt_conn_context = connection; //设置回调函数 //设置MQTT连接参数 //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic); /* //连接Broker int rc = 0; dwTick = GetTickCount(); while (true) { rc = mqtt_connect(pMqttClient); if (rc != MQTT_SUCCESS_ERROR) { if (GetTickCount() - dwTick > dwOpenTimeout) { ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc); pLock->Thread_UnLock(); delete connection; delete pfilter; return nullptr; } else { ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl; Sleep(20); } } else { break; } }*/ DWORD dwWaitTick = GetTickCount() - dwTick; ////mLog::FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId()); std::get(*connection) = new mqtt_msg_list; // 创建POSIX信号量 sem_t* semaphore = new sem_t; sem_init(semaphore, 0, 0); std::get(*connection) = semaphore; // 创建发送线程 pthread_t threadId; if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) { std::cerr << "Failed to create MQTT send thread" << std::endl; // 错误处理... sem_destroy(semaphore); delete semaphore; pLock->Thread_UnLock(); delete connection; return nullptr; } std::get(*connection) = threadId; std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl; pLock->Thread_UnLock(); std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: " << dwWaitTick << " ms" << std::endl; return connection; } //重置连接 LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection) { if (hConnection == nullptr) { return; } ////mLog::FWARN(" Reset Mqtt Connection..", std::get(*hConnection)); std::cout << CurrentDateTime() << std::get(*hConnection) << " Close Mqtt Connection.." << endl; mqtt_client* pconn = (mqtt_client*)std::get(*hConnection); if (pconn != nullptr) { CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); //mqtt_do_reconnect(pconn); mqtt_disconnect(pconn); /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic); mqtt_connect(pconn);*/ mqtt_release(pconn); pconn = InnerConnect(hConnection); int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn); if (pconn != nullptr) { //std::get(*hConnection) = pconn; //resubscribe_topic(pconn, pconn->mqtt_conn_context); //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get(*hConnection)); } pLock->Thread_UnLock(); } } //关闭并释放连接 LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection) { if (!hConnection) return; auto clientId = std::get(*hConnection); std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl; CcosLock* pLock = std::get(*hConnection); if (nullptr != pLock) pLock->Thread_Lock(); else return; mqtt_client* pconn = (mqtt_client*)std::get(*hConnection); sem_t* semaphore = (sem_t*)std::get(*hConnection); std::get(*hConnection) = nullptr; std::get(*hConnection) = nullptr; ccos_mqtt_msg_filter* pfilter = static_cast(std::get(*hConnection)); mqtt_msg_list* pList = static_cast(std::get(*hConnection)); pthread_t thread = std::get(*hConnection); ////mLog::FINFO(" Close Mqtt Connection..", std::get(*hConnection)); if (pconn) { // 优雅停止代替强制取消 mqtt_disconnect(pconn); // 设置停止标志,等待线程退出 if (thread) { pthread_cancel(thread); //pthread_join(thread, nullptr); // 等待线程结束 } } if (pconn) { mqtt_release(pconn); } if (pfilter) { std::cout << "Free Connection filter [" << (UINT64)pfilter << "] .." << endl; // 释放filter内部成员 delete std::get(*pfilter); delete std::get(*pfilter); delete pfilter; } if (semaphore) { sem_destroy(semaphore); // Destroy semaphore if applicable delete semaphore; } if (pList) { delete pList; } std::get(*hConnection) = nullptr; if (nullptr != pLock) pLock->Thread_UnLock(); delete hConnection; std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl; } //主动订阅主题 LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare) { if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); mqtt_topic_list* pTopicList = std::get(*hConnection); CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); pTopicList->push_back(pszTopic); //pMqttClient->subscribe(pszTopic, 1, opts); int rc = MQTT_SUCCESS_ERROR; DWORD dwTick = GetTickCount(); int nTryTimes = 0; do { if (pTopicList->size() < 1) { int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd); ////mLog::FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get(*hConnection), pszTopic, ret, pTopicList->size()); std::cout << "mqtt [" << std::get(*hConnection) << " ]Subscribe First " << pszTopic << endl; } else { int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd); ////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get(*hConnection), pszTopic, ret, pTopicList->size()); std::cout << "mqtt [" << std::get(*hConnection) << " ]Subscribe ReUse " << pszTopic << endl; } if (rc != MQTT_SUCCESS_ERROR) { ////mLog::FWARN("try do mqtt reconnect .."); //mqtt_do_reconnect(pMqttClient); mqtt_disconnect(pMqttClient); //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic); mqtt_release(pMqttClient); pMqttClient = InnerConnect(hConnection); int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn); if (pMqttClient != nullptr) { //std::get(*hConnection) = pMqttClient; //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context); //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get(*hConnection)); } usleep(2000 * 1000); } } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100); pLock->Thread_UnLock(); return 0; } //主题订阅取消 LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic) { if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer; //auto ret = pMqttClient->unsubscribe(pszTopic); mqtt_topic_list* pTopicList = std::get(*hConnection); CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); //MQTTAsync_responseOptions resp; pTopicList->remove(pszTopic); int rc = MQTT_SUCCESS_ERROR; DWORD dwTick = GetTickCount(); int nTryTimes = 0; do { int ret = mqtt_unsubscribe(pMqttClient, pszTopic); ////mLog::FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get(*hConnection), pszTopic, ret); } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100); pLock->Thread_UnLock(); return 2; } //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收 LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout) { char pszClientID[256]; snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX", (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName, getpid(), GetCurrentThreadId()); ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) { }); mqtt_client* pConn = (mqtt_client*)std::get(*connObj); PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID); //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer; string pLoad = pCmd->encode(); int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName); std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl; ////mLog::FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc); CloseConnection(connObj); return rc; } //往指定主题发送CCOS协议包整包,使用已创建的连接发送 LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout) { if (hConnection == nullptr) { ////mLog::FDEBUG("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode()); std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode() return 0; } std::cout << CurrentDateTime() << std::get(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode() string topic = pszTopic; if (topic.length() <= 0) { ////mLog::FWARN("ignore empty topic packet {$}", pAction->encode()); return 2; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} std::string client_id = std::get(*hConnection); CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); ////mLog::FDEBUG("{$} Publish Action to {$} Action Body: {$}", std::get(*hConnection), pszTopic, pAction->encode()); //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction); //if(org_publisher.length() <= 0) PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() ); ResDataObject* pPacket = new ResDataObject(); mqtt_msg_list* pList = std::get(*hConnection); pPacket->add(pszTopic, pAction->encode()); ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode()); pList->push_back(pPacket); sem_post(std::get(*hConnection)); std::cout << "try publish " << pAction->encode() << endl; //pMqttClient->publish(pszTopic, pAction->encode()); /* //pConn->publish(pszTopic, pCmd->encode()); std::cout << "try publish " << pAction->encode() << endl; const char* pLoad = pAction->encode(); int len = strlen(pLoad); //MQTTAsync_responseOptions resp; //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer; mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; msg.payloadlen = len; int rc = MQTT_SUCCESS_ERROR; DWORD dwTick = GetTickCount(); int nTryTimes = 0; do { rc = mqtt_publish(pMqttClient, pszTopic, &msg); nTryTimes++; if (rc != MQTT_SUCCESS_ERROR) { ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id); //mqtt_do_reconnect(pMqttClient); mqtt_disconnect(pMqttClient); rc = mqtt_connect(pMqttClient); if (rc == MQTT_SUCCESS_ERROR) { resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context); ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id); } Sleep(2); } } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout); //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl; ////mLog::FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc); //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp); //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl; if (rc < 0) { ////mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad); //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl; pLock->Thread_UnLock(); return rc; }*/ //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL); //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl; ////mLog::FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size()); pLock->Thread_UnLock(); return 2; } //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理 int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos) { const int dwTimeout = 500; if (pMqttClient == nullptr) { ////mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message); return 0; } ////mLog::FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message); const char* pLoad = message.c_str(); int len = message.length(); mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.payloadlen = len; msg.qos = qos; int rc = MQTT_SUCCESS_ERROR; DWORD dwTick = GetTickCount(); int nTryTimes = 0; do { rc = mqtt_publish(pMqttClient, pszTopic, &msg); nTryTimes++; if (rc != MQTT_SUCCESS_ERROR) { ////mLog::FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id); usleep(10000 * 1000); //mqtt_do_reconnect(pMqttClient); ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context; mqtt_disconnect(pMqttClient); /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic); mqtt_connect(pMqttClient);*/ mqtt_release(pMqttClient); pMqttClient = InnerConnect(hConnection); //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn); if (pMqttClient != nullptr) { //std::get(*hConnection) = pMqttClient; //resubscribe_topic(pMqttClient, hConnection); //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get(*hConnection)); } } } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout)); if(nTryTimes > 1) ////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc); ////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc); if (rc < 0) { ////mLog::FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad); return rc; } return 2; } /* /// /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp, /// 超时没收到应答返回失败, /// 复用链接时须小心,该函数会接管回调函数,结束后恢复 /// /// 要发送的命令Action名 /// 命令携带的参数 /// 要发送的目标topic /// 本次请求的应答接收Topic /// 应答返回的参数结果 /// 等待超时时间 /// 复用的MQTT链接句柄 /// 复用的链接的消息处理函数 /// 成功返回2,其他返回错误码 LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime) { std::cout << CurrentDateTime() << std::get(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode() if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //SubscribeTopic() auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) { string topic = topicName;//msg->get_topic().c_str(); if (strcmp(pszRespTopic, topic.c_str()) == 0) { std::cout << CurrentDateTime() << std::get(*hConnection) << " get Right Resp by " << topic << endl; //应答到了,处理 ResDataObject req; req.decode((const char*)message->payload); PacketAnalizer::GetPacketContext(&req, resObj); SetEvent(hEvent); //处理结束 将函数指针换回去 void* oldFuncPointer = std::get(*hConnection); //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer); MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL); } else { //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get(*hConnection); (msgarrvd)(context, topicName, topicLen, message); } }; //接管回调 // pMqttClient->set_message_callback(action_func); MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered); //pMqttClient->subscribe(pszRespTopic, 1); MQTTSubscribe_options opts = MQTTSubscribe_options_initializer; MQTTProperties prop = MQTTProperties_initializer; std::cout << "mqtt 【" << std::get(*hConnection) << " 】Subscribe " << pszTopic << endl; //pMqttClient->subscribe(pszTopic, 1, opts); MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get(*hConnection)); //pMqttClient->publish(pszTopic, req.encode()); MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_message* m = NULL; MQTTClient_deliveryToken dt; MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; property.value.data.data = "test user property"; property.value.data.len = (int)strlen(property.value.data.data); property.value.value.data = "test user property value"; property.value.value.len = (int)strlen(property.value.value.data); MQTTProperties properties = MQTTProperties_initializer; MQTTProperties_add(&properties, &property); //pConn->publish(pszTopic, pCmd->encode()); const char* pLoad = req.encode(); MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt); std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl; DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 return 2; } return 0; } */ /// /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败, /// /// /// /// /// /// /// /// /// LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime) { usleep(1000 * 1000); std::cout << CurrentDateTime() << std::get(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode() if (hConnection == nullptr) { return 0; } DWORD dwTick = GetTickCount(); mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); sem_t hEvent; sem_init(&hEvent, 0, 0); // 初始化信号量为0 ResDataObject resContext; string strResObject; char* pszPad = 0; char* pszPad2 = 0; auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool { ////mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp)); std::cout << CurrentDateTime() << std::get(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl; if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES && strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0) { if (GetTickCount() - dwTick < dwWaitTime) { ////mLog::FDEBUG("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode()); resObj = *rsp; return true; } else { ////mLog::FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode()); return false; } std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl; } ////mLog::FDEBUG("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode()); std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl; return false; }; CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); ////mLog::FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get(*hConnection), pAction, pszTopic, pContext->encode()); ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get(*hConnection); ResDataObject *resActions = std::get(*pfilter); resActions->add(pAction, pszTopic); //std::get(*pfilter) = hEvent; std::get(*pfilter) = func; std::string client_id = std::get(*hConnection); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get(*hConnection)); const char* pLoad = req.encode(); ResDataObject* pPacket = new ResDataObject(); mqtt_msg_list* pList = std::get(*hConnection); pPacket->add(pszTopic, pLoad); ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode()); pList->push_back(pPacket); sem_post(std::get(*hConnection)); /* mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; msg.payloadlen = strlen(pLoad); //int rc = mqtt_publish(pMqttClient, pszTopic, &msg); int rc = MQTT_SUCCESS_ERROR; dwTick = GetTickCount(); int nTryTimes = 0; do { rc = mqtt_publish(pMqttClient, pszTopic, &msg); nTryTimes++; if (rc != MQTT_SUCCESS_ERROR) { ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id); //mqtt_do_reconnect(pMqttClient); mqtt_disconnect(pMqttClient); rc = mqtt_connect(pMqttClient); if (rc == MQTT_SUCCESS_ERROR) { resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context); ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id); } Sleep(2); } } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3); if (nTryTimes >= 1) ////mLog::FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get(*hConnection), pszTopic, nTryTimes, rc); */ pLock->Thread_UnLock(); dwTick = GetTickCount() - dwTick; //////mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc); std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl; dwTick = GetTickCount(); DWORD ret = sem_wait(&hEvent); if (ret == WAIT_OBJECT_0) { //等到应答了 dwTick = GetTickCount() - dwTick; pLock->Thread_Lock(); ResDataObject* pResp = std::get(*pfilter); for (int x = 0; x < pResp->size(); x++) { ResDataObject r = (*pResp)[x]; if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r)) { resObj = r; pResp->eraseOneOf(pAction, x); break; } } pLock->Thread_UnLock(); std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl; ////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get(*hConnection), pszTopic, dwTick); //std::get(*pfilter) = nullptr; sem_destroy(&hEvent); return 2; } //std::get(*pfilter) = nullptr; //resObj.decode(strResObject.c_str()); ////mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get(*hConnection), pszTopic ); std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl; //std::get(*pfilter) = nullptr; sem_destroy(&hEvent); return 0; } /// /// 新建MQTT连接发送Ation并等待应答 /// /// /// /// /// /// /// /// /// LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime, const char* pszSenderName) { ResDataObject req; if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); //临时创建连接并发送和接收应答 char pszClientID[256]; snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX", (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName, getpid(), GetCurrentThreadId()); sem_t sem; if (sem_init(&sem, 0, 0) != 0) { return 0; // 如果初始化失败,返回0 } std::cout << " ActionAndResp->NewConnection " << endl; ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) { //应答到了,处理 PacketAnalizer::GetPacketContext(req, resObj); sem_post(&sem); }); ////发布消息,并等待应答 PublishAction(&req, pszTopic, connObj); // 等待应答或超时 struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += dwWaitTime / 1000; // 转换为秒 ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒 // 等待信号量或者超时 int ret = 0; while (ret == 0) { ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量 if (ret == -1 && errno == ETIMEDOUT) { // 超时 sem_destroy(&sem); CloseConnection(connObj); return 0; // 超时返回0 } } // 等到应答了 sem_destroy(&sem); // 销毁信号量 CloseConnection(connObj); return 2; // 返回2表示应答成功 }