// LogicDevice.cpp : 定义 DLL 应用程序的导出函数。 // #include "StdAfx.h" #include "objbase.h" #include "LogicDevice.h" #include "PacketAnalizer.h" #include "MessageInfo.h" #include "common_api.h" #include "LocalConfig.h" #include "Base64.h" #include "SystemLogger.hpp" #include #include //#include "matt/async_client.h" //#include "mqtt/async_client.h" //#include "MQTTAsync.h" //#include "MQTTAsync.h" Log4CPP::Logger* //mLog::gLogger = nullptr; //#include "mqttclient.h" //void msgarrivd(void* client, message_data_t* message); std::string CurrentDateTime(); //-------------Logic Device SysIF-------------------------- LogicDeviceSysIF::LogicDeviceSysIF(void) { } LogicDeviceSysIF::~LogicDeviceSysIF(void) { } //init void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p) { m_pLogicDev = p; p->SetSysLogicDevice(this); }; LogicDevice* LogicDeviceSysIF::GetLogicDevice() { return m_pLogicDev; } //Command In and Out //notify from lower layer RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd) { return RET_FAILED; }; //notify to lower layer RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd) { if (m_pLogicDev) { return m_pLogicDev->CmdToLogicDev(pCmd); } return RET_NOSUPPORT; }; const std::string SERVER_ADDRESS("127.0.0.1"); /* string DEVICE_ID; //AS CLIENT_ID //const std::string CLIENT_ID("paho_cpp_async_subcribe"); //const std::string TOPIC("hello"); mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象 const int QOS = 1; const int N_RETRY_ATTEMPTS = 5;*/ using namespace std; LogicDevice* g_pDPCDeviceObject = NULL; string LogHost = ""; //string DEVICE_ID; //-------------Data Logic Device-------------------------- LogicDevice::LogicDevice(void) { m_pLogger = NULL; m_pSysLogic = NULL; m_pDevInstance = new char[40]; m_pResErrorList = new ResDataObject(); GUID guid; string strDevInstance; CoCreateGuid(&guid); guid_2_string(guid, strDevInstance); sprintf_s(m_pDevInstance,40, "%s", strDevInstance.c_str()); m_EvtNotify = CreateEvent(0, 0, 0, 0); m_SemphRequest = CreateSemaphore(NULL, 0, 100, "LogicDevice_Reqquest"); m_pDrvDPC = NULL; g_pDPCDeviceObject = this; m_pMqttConntion = nullptr; //m_strServer = "tcp://localhost:1883"; m_strServer = SERVER_ADDRESS;// "192.168.2.225"; m_strServerPort = "1883"; m_bMqttUseSSL = false; m_strMqttUser = ""; m_strMqttPassword = ""; m_strEBusRoot = ""; m_strCCOSRoot = ""; m_pParent = nullptr; m_topicFilter = nullptr; m_pPacketReceivedQue = new MsgQueue(); int x = 0; for ( x = 0; x < sizeof(szPad); x++) szPad[x] = 'A' + x % 26; szPad[x-1] = 0; memset(szPad2, 0, sizeof(szPad2)); } LogicDevice::~LogicDevice(void) { delete []m_pDevInstance; delete m_pResErrorList; CloseHandle(m_EvtNotify); CloseHandle(m_SemphRequest); m_EvtNotify = NULL; delete m_pPacketReceivedQue; } void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) { if (pszEBusRoot[0] == '/') { m_strEBusRoot = pszEBusRoot + 1; } else { m_strEBusRoot = pszEBusRoot; } char szKeys[256]; strcpy(szKeys, pszEBusRoot); char* pt = szKeys; while (*pt != 0) { if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-') *pt = '_'; pt++; } m_strClientID = CCOS_CLIENT_ID_PREFIX; m_strClientID += szKeys; if(pszCCOSRoot != nullptr) m_strCCOSRoot = pszCCOSRoot; //mLog::FINFO("Set MQTT ClientName {$} ", m_strClientID); SetName(m_strClientID.c_str()); OnSetClientID(); } void LogicDevice::OnSetClientID() { } void LogicDevice::SubscribeSelf() { if (m_strDevicePath.length() > 0) if (m_strDevicePath.c_str()[0] != '/') m_strDevicePath = "/" + m_strDevicePath; //mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion); //std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl; if (m_strEBusRoot.length() > 0) { SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str()); } if (m_strCCOSRoot.length() > 0) { SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot + m_strDevicePath).c_str()); //订阅 } SubscribeActions(); } void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe) { if (bSubscribe) SubscribeTopic(m_pMqttConntion, pszTopic); else UnSubscribe(m_pMqttConntion, pszTopic); } void LogicDevice::NotifyDrvThread() { if (m_EvtNotify) { SetEvent(m_EvtNotify); } } HANDLE LogicDevice::GetEvtHandle() { return m_EvtNotify; } //1. init part void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic) { m_pSysLogic = pLogic; } void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger) { m_pLogger = pLogger; } Logger *LogicDevice::GetLogHandle() { return m_pLogger; } void LogicDevice::SetDrvDPC(DriverDPC *pDPC) { m_pDrvDPC = pDPC; } DriverDPC *LogicDevice::GetDrvDPC() { return m_pDrvDPC; } RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath) { RET_STATUS ret = RET_FAILED; if (szEbusDevPath != nullptr) { auto search = GetEbusChild(szEbusDevPath); if (search == nullptr) { m_subDevices.push_back(pChild); std::sort(m_subDevices.begin(), m_subDevices.end(), [](LogicDevice* p1, LogicDevice* p2) { return p1->GetRootPath() < p2->GetRootPath(); }); } ret = RET_SUCCEED; //return RET_SUCCEED; } return ret; } RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath) { RET_STATUS ret = RET_FAILED; if (szCcosDevPath != nullptr) { if (strncmp(m_strCCOSRoot.c_str(), szCcosDevPath, m_strCCOSRoot.length()) == 0) { auto search = GetCcosChild(szCcosDevPath); if (search == nullptr) { m_subCcosDevices.push_back(pChild); std::sort(m_subCcosDevices.begin(), m_subCcosDevices.end(), [](LogicDevice* p1, LogicDevice* p2) { return p1->GetCcosRootPath() < p2->GetCcosRootPath(); }); } return RET_SUCCEED; } } return ret; } LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath) { //std::binary_search(m_subDevices.begin(), m_subDevices.end(), szEbusDevPath, [szEbusDevPath]()->bool { // return // }); for each (LogicDevice* var in m_subDevices) { if (var->GetRootPath().compare(szEbusDevPath) == 0) return var; } return nullptr; } LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath) { for each (LogicDevice * var in m_subDevices) { if (var->GetCcosRootPath().compare(szCcosDevPath) == 0) return var; } return nullptr; } void SYSTEM_CALL LogicDevice::CompleteInit() { if (//mLog::gLogger == nullptr) { string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; LogHost = ((string)getLogRootpath()).c_str(); if (LogHost.length() <= 1) { char szName[256]; sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId()); LogHost = szName; } Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice"); //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1); auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice"); } else { string strRoot = ((string)getLogRootpath()).c_str(); if (strRoot.length() > 1 && strRoot != LogHost) { string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; LogHost = strRoot; Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice"); //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1); auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice"); } } //string version; //if (GetVersion(version, hMyModule)) // //mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str()); //else //mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n"); //mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID); if (m_strClientID.length() <= 0) { //mLog::FINFO("No Client name ......" ); //std::cout << "No Client name ......." << endl; return; } m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(), [this](ResDataObject* req, const char* topic, void* conn) { //这里只处理当前层次设备的请求,下一层的直接靠URI来路由 //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答 // 消息处理如果耗时较长,则需要开线程来处理 if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 || strncmp(topic, m_strCCOSRoot.c_str(), m_strCCOSRoot.length()) == 0) { //主题以ebus或者ccos开头,给我发的消息,进行处理 ProcessSubscribeRequest(req); } else { //我主动订阅的外部模块的消息 ProcessSubscribeMsg(req); } CmdToLogicDev(req); }); if (m_pMqttConntion != nullptr) { StartThread(); //启动Request线程 SubscribeSelf(); } } RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) { PACKET_TYPE type = PacketAnalizer::GetPacketType(req); PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req); switch (type) { case PACKET_TYPE_REQ: ProcessRequest(req, cmd);//请求 break; case PACKET_TYPE_RES: ProcessResponse(req, cmd);//应答 break; case PACKET_TYPE_NOTIFY: ProcessNotify(req, cmd); //通知 break; } return RET_FAILED; } RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd) { ProcessSubscribeRequest(pCmd); return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd) { //Open命令 if (cmd == PACKET_CMD_OPEN) { /* { "IDX": "40", "TYPE" : "0", "CMD" : "0", "HANDLE" : { "ROUTE": "1", "FLAGS" : "63", "LANG" : "en-US", "HANDLEID" : "0", "OWNERID" : { "EBUSID": "ImageSave", "MACHINEID" : "DESKTOP-FVD53H8", "PROCID" : "35408", "ADDR" : "2631366957696" }, "DEVID": { "EBUSID": "ccosChannel", "MACHINEID" : "", "PROCID" : "0", "ADDR" : "0" } }, "KEY": "\/ccosChannel" ResDataObject resRes, resResponse; ResDataObject resTopic; PacketAnalizer::GetContextTopic(pCmd, resTopic); if (cmd == PACKET_CMD_OPEN ) { //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl; PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion); return RET_SUCCEED; } if (cmd == PACKET_CMD_CLOSE) { //CLOSE 客户端主动断开 } } */ ResDataObject resRes, resResponse; GetDeviceResource(&resRes); LogicDevice::GetDeviceResource(&resRes); //std::cout << "For Open Result" << resRes.encode() << endl; PacketAnalizer::MakeOpenResponse(*pCmd, resResponse, resRes); PacketAnalizer::UpdateDeviceNotifyResponse(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)m_pMqttConntion); ResDataObject resTopic; PacketAnalizer::GetContextTopic(pCmd, resTopic); PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion); //这里先直接给1,后面取真实状态 ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE, "ConnectionStatus", "1"); //CmdFromLogicDev(&NotifyData); PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); return RET_SUCCEED; } else if (cmd == PACKET_CMD_CLOSE) { } else { PacketArrived(pCmd); } return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd) { return RET_SUCCEED; } RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd) { return RET_SUCCEED; } void LogicDevice::PacketArrived(ResDataObject* pRequest) { //m_pPacketReceivedQue->Lock(); m_pPacketReceivedQue->InQueue(*pRequest); //SetEvent(m_EvtNotify);//notify to user ReleaseSemaphore(m_SemphRequest, 1, NULL); //m_pPacketReceivedQue->UnLock(); } bool LogicDevice::OnStartThread() { return true; } bool LogicDevice::OnEndThread() { return true; } bool LogicDevice::Exec(void) { HANDLE hWait[2]; hWait[0] = m_ExitFlag; hWait[1] = m_SemphRequest; DWORD dwRet = WaitForMultipleObjects(2, hWait, FALSE, 3); if (dwRet == WAIT_OBJECT_0) { return false; } //else if (dwRet == WAIT_OBJECT_0 + 1) { //请求到了 ResDataObject req; while (m_pPacketReceivedQue->DeQueue(req)) { ResDataObject resResource, resResponse, resPacket; ResDataObject resTopic; PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req); string keystr = PacketAnalizer::GetPacketKey(&req); RET_STATUS ret = RET_FAILED; PacketAnalizer::MakeResponseByReq(resPacket, req, ret); //mLog::FINFO(" {$} Got {$} Request by {$}", GetCurrentThreadId(), keystr, m_strClientID); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Got " << keystr << " Request by " << m_strClientID << endl; if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") { ResDataObject devRes; if ((ret = GetDeviceResource(&devRes)) == RET_SUCCEED) { LogicDevice::GetDeviceResource(&devRes); PacketAnalizer::UpdatePacketContext(resPacket, devRes); PacketAnalizer::MakeRetCode(RET_SUCCEED, &resPacket); } else { PacketAnalizer::MakeRetCode(RET_FAILED, &resPacket); } } else { ret = Request(&req, &resPacket); PacketAnalizer::MakeRetCode(ret, &resPacket); PacketAnalizer::GetPacketRetCode(&resPacket, ret); } //PacketAnalizer::MakeResponseByReq(resPacket, req, ret); //PacketAnalizer::UpdatePacketContext(resPacket, resResponse); //PacketAnalizer::MakeRetCode(ret, &resPacket); PacketAnalizer::GetContextTopic(&req, resTopic); //mLog::FINFO("Request result {$}", resPacket.encode()); //std::cout << " Request(pCmd, &resResponse) result" << resPacket.encode() << endl; PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion); //mLog::FINFO(" {$} Send {$} Resp {$}", GetCurrentThreadId(), keystr, m_strClientID); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Send "<< keystr <<" Resp" << m_strClientID << endl; } return true; } return true; } void SYSTEM_CALL LogicDevice::CompleteUnInit() { StopThread(); } int LogicDevice::GetDevice_Thread_Priority() { return THREAD_PRIORITY_NONE; } RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource) { //Get Unit Type (Unit GUID) if (pDeviceResource->GetFirstOf("LogicDevInstance")<0) { pDeviceResource->add("LogicDevInstance", m_pDevInstance); } //// size_t idx = (*pDeviceResource)["Attribute"].size(); if (idx > 0) { int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList"); if (erroridx < 0) { (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList); } else { (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList; } } else { ResDataObject Attribute; Attribute.add("ErrorList", *m_pResErrorList); pDeviceResource->add("Attribute", Attribute); } return RET_SUCCEED; } //notify from lower layer RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd) { PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd); PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd); if (type == PACKET_TYPE_NOTIFY) { PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } return RET_SUCCEED; /* if (pCmd && m_pSysLogic) { return m_pSysLogic->CmdFromLogicDev(pCmd); } //put log here return RET_FAILED;*/ } std::wstring mb2wc_a(const char* mbstr) { std::wstring strVal; int size = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, NULL, 0); wchar_t* wcstr = new wchar_t[size + 1]; if (wcstr) { memset(wcstr, 0, size * sizeof(wchar_t)); int ret = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, wcstr, size); if (ret != 0) // MultiByteToWideChar returns 0 if it does not succeed. { strVal = wcstr; } delete[] wcstr; wcstr = NULL; } return strVal; } RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType) { string ResBase64, DesBase64; wstring wResUTF = ResInfo; wstring wDesUTF = Description; CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64); CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64); return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType); } RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId) { string ResBase64,DesBase64; wstring wResUTF = mb2wc_a(ResInfo); wstring wDesUTF = mb2wc_a(Description); CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64); CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64); return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId); } RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId) { //int ret = 1; if (Code == 0 || (string)ResInfo == "") { //mLog::FERROR("Code or ResInfo is empty"); return RET_FAILED; } MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; string strDevInstanceCode = DevInstance; strDevInstanceCode += Code; for (size_t i = 0; i < m_pResErrorList->size(); i++) { string strInstancekey = m_pResErrorList->GetKey(i); if (strInstancekey == strDevInstanceCode) { //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++) //{ // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j); // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"]; // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType) // { //ret = 0; //mLog::FDEBUG("Same Code:%s with MessageType:%d already Exist", Code,nMessageType); return RET_SUCCEED; // } //} //ret = 2; //break; } } //if (ret==1) { ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/; //info.GetResDataObject(tempInfo); //ErrorInfo.update(Code, tempInfo); info.GetResDataObject(ErrorInfo); SYSTEMTIME st; GetLocalTime(&st); string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); ErrorInfo.add("CreationTime", TimeTag.c_str()); ErrorInfo.add("AppId", pAppId); ErrorInfo.add("InstanceId", DevInstance); if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可 { m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo); } ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo); PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify); //mLog::FDEBUG( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode()); //CmdFromLogicDev(&NotifyData); PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } //else if (ret == 2) //{ // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo; // info.GetResDataObject(tempInfo); // ErrorInfo.update(Code, tempInfo); // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可 // { // (*m_pResErrorList)[DevInstance].update(Code, tempInfo); // } // ResNotify.update(DevInstance, ErrorInfo); // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify); // RES_//mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType); // CmdFromLogicDev(&NotifyData); //} //put log here return RET_SUCCEED; } RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId) { AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId); //put log here return RET_FAILED; } RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType) { int index = -1; bool m_bClearAll = false; //trim empty code string string CodeStr = Code; if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "") { CodeStr = " "; Code = CodeStr.c_str(); m_bClearAll = true; } //if ((string)Code == "0" || (string)Code == "") //{ // m_bClearAll = true; //} string strDevInstanceCode = DevInstance; strDevInstanceCode += Code; if (m_bClearAll) { m_pResErrorList->clear(); } else { MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; for (size_t i = 0; i < m_pResErrorList->size(); i++) { string strInstancekey = m_pResErrorList->GetKey(i); if (strInstancekey == strDevInstanceCode) { //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++) //{ // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j); // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"]; // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType) // { // index = (INT)j; // break; // } //} index = (INT)i; break; } } } MessageInfo info; info.CodeID = Code; info.Type = nMessageType; info.Level = Level; info.Resouceinfo = ResInfo; info.Description = Description; ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/; //info.GetResDataObject(tempInfo); //ErrorInfo.add(Code, tempInfo); info.GetResDataObject(ErrorInfo); ErrorInfo.add("InstanceId", DevInstance); bool result = false; if (index>=0) { //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index); result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str()); } if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE) { ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo); PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify); //mLog::FDEBUG( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode()); //CmdFromLogicDev(&NotifyData); PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } return RET_SUCCEED; } RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType) { DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType); //put log here return RET_FAILED; } RET_STATUS LogicDevice::EvtProcedure() { return RET_FAILED; } bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId) { //mLog::FERROR("NOT FINISHED YET"); return false; } RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId) { std::string strResult = ""; //组Context包 if (m_pLogger) { wstring wContect = mb2wc_a(pContext); CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult); } //Thread_Lock(); //if (NULL != fmt) //{ // va_list marker = NULL; // va_start(marker, fmt); // size_t nLength = _vscprintf(fmt, marker) + 1; // std::vector vBuffer(nLength, '\0'); // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker); // if (nWritten > 0) // { // strResult = &vBuffer[0]; // } // va_end(marker); //} //Thread_UnLock(); ResDataObject SysLogNode; string guidstr; GUID DeviceGuid; //组Log包 if (GetDeviceType(DeviceGuid)) { guid_2_string(DeviceGuid, guidstr); SysLogNode.add("Module", guidstr.c_str()); SysLogNode.add("AppId", pAppId); SysLogNode.add("ThreadId", GetCurrentThreadId()); if (pCode) { SysLogNode.add("BusinessKey", pCode); } else { SysLogNode.add("BusinessKey", ""); } SysLogNode.add("IP", (const char*)getLocalIpAddress()); SYSTEMTIME st; GetLocalTime(&st); string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); SysLogNode.add("CreationTime", TimeTag.c_str()); string strLevel = SysLogLevel2str(Level); SysLogNode.add("Level", strLevel.c_str()); SysLogNode.add("HostName", (const char*)getLocalMachineId()); SysLogNode.add("ProcessName", (const char*)GetModuleTitle()); SysLogNode.add("FreeText", strResult.c_str()); SysLogNode.add("Context", pCode); ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode); //CmdFromLogicDev(&NotifyData); PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); } else { PRINTA_ERROR(m_pLogger, "no Guid??"); return RET_FAILED; } //打印LOG switch (Level) { case Syslog_Debug: //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog"); //mLog::FDEBUG("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Information: ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog"); //mLog::FINFO("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Warning: //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog"); //mLog::Warn("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Error: //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog"); //mLog::FERROR("SysLog {$} ", SysLogNode.encode()); break; case Syslog_Fatal: //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog"); //mLog::FINFO("SysLog {$} ", SysLogNode.encode()); break; default: //mLog::FINFO("SysLog {$} " ,SysLogNode.encode() ); break; } return RET_SUCCEED; } RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...) { std::string strResult = ""; //组Context包 if (m_pLogger) { m_pLogger->Thread_Lock(); if (NULL != fmt) { va_list marker = NULL; va_start(marker, fmt); size_t nLength = _vscprintf(fmt, marker) + 1; std::vector vBuffer(nLength, '\0'); int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker); if (nWritten > 0) { strResult = &vBuffer[0]; } va_end(marker); } m_pLogger->Thread_UnLock(); } else { Thread_Lock(); if (NULL != fmt) { va_list marker = NULL; va_start(marker, fmt); size_t nLength = _vscprintf(fmt, marker) + 1; std::vector vBuffer(nLength, '\0'); int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker); if (nWritten > 0) { strResult = &vBuffer[0]; } va_end(marker); } Thread_UnLock(); } ResDataObject SysLogNode; string guidstr; GUID DeviceGuid; //组Log包 if (GetDeviceType(DeviceGuid)) { guid_2_string(DeviceGuid, guidstr); SysLogNode.add("Module", guidstr.c_str()); SysLogNode.add("AppId", ""); SysLogNode.add("ThreadId", GetCurrentThreadId()); if (pCode) { SysLogNode.add("BusinessKey", pCode); } else { SysLogNode.add("BusinessKey", ""); } SysLogNode.add("IP", (const char *)getLocalIpAddress()); SYSTEMTIME st; GetLocalTime(&st); string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); SysLogNode.add("CreationTime", TimeTag.c_str()); SysLogNode.add("Level", Level); SysLogNode.add("HostName", (const char *)getLocalMachineId()); SysLogNode.add("ProcessName", (const char *)GetModuleTitle()); SysLogNode.add("FreeText", strResult.c_str()); ResDataObject NotifyData; PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode); //CmdFromLogicDev(&NotifyData); PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion); if(m_pParent != nullptr) NotifyParent(&NotifyData, "/Notify"); } else { PRINTA_ERROR(m_pLogger,"no Guid??"); return RET_FAILED; } //打印LOG switch (Level) { case Syslog_Debug: //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog"); //mLog::FDEBUG("SysLog {$}", SysLogNode.encode()); break; case Syslog_Information: //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog"); //mLog::FINFO("SysLog {$}", SysLogNode.encode()); break; case Syslog_Warning: //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog"); //mLog::Warn("SysLog {$}", SysLogNode.encode()); break; case Syslog_Error: //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog"); //mLog::FERROR("SysLog {$}", SysLogNode.encode()); break; case Syslog_Fatal: //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog"); //mLog::FINFO("SysLog {$}", SysLogNode.encode()); break; default: //mLog::FINFO( "SysLog {$}", SysLogNode.encode()); break; } return RET_SUCCEED; } ///////////////////////////////////////////////////////////////////////////// ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg) { return nullptr; } std::string CurrentDateTime() { // 获取当前时间点 auto now = std::chrono::system_clock::now(); // 将时间长度转换为微秒数 auto now_us = std::chrono::duration_cast(now.time_since_epoch()); // 再转成tm格式 auto now_time_t = std::chrono::system_clock::to_time_t(now); auto now_tm = std::localtime(&now_time_t); // 可以直接输出到标准输出 // std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl; // 格式化字符,年月日时分秒 std::string now_time_str; char buf[64]; std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm); now_time_str += buf; // 格式化微秒 snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000); now_time_str += buf; //printf("%s\n", now_time_str.c_str()); return now_time_str; } static string CurrentDateTime2() { std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); char buf[100] = { 0 }; std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now)); return buf; } bool LogicDevice::GetActions(ResDataObject& resAction) { for (int x = 0; x < m_Actions.size(); x++) { resAction.update(m_Actions.GetKey(x), ""); } return true; } void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection) { if (m_pParent != nullptr) { if (hConnection == nullptr) { hConnection = m_pParent->m_pMqttConntion; } PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection); } } const mqtt_qos_t MQTT_QOS = QOS2; void LogicDevice::SubscribeActions() { if (nullptr == m_pMqttConntion) return; int num = m_Actions.size(); mqtt_client* pMqttClient = (mqtt_client*)std::get(*m_pMqttConntion); mqtt_topic_list* pTopicList = std::get(*m_pMqttConntion); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return ; //} if (num <= 0) { ResDataObject res; GetDeviceResource(&res); //std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl; m_Actions = res["Action"]; num = m_Actions.size(); } std::string pszAction; int ret = 0; //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; //*/ for (size_t i = 0; i < num; i++) { pszAction = m_strEBusRoot; pszAction += "/Action/"; pszAction += (m_Actions.GetKey(i)); //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; //pMqttClient->subscribe(pszAction, 1, opts); pTopicList->push_back(pszAction); ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd); //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret); if (m_strEBusRoot != m_strCCOSRoot) { pszAction = m_strCCOSRoot + m_strDevicePath; pszAction += "/Action/"; pszAction += (m_Actions.GetKey(i)); //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; //pMqttClient->subscribe(pszAction, 1, opts); pTopicList->push_back(pszAction); ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd); //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret); } } pszAction = m_strEBusRoot; pszAction += "/Action/UpdateDeviceResource"; //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; pTopicList->push_back(pszAction); mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd); //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret); //*/ //不能订阅#,会造成收两遍请求 //pszAction = m_strEBusRoot; //pszAction += "/Action/#"; ////pTopicList->push_back(pszAction); ////mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd); //SubScribeTopic(pszAction.c_str(), true); //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; //if (m_strEBusRoot != m_strCCOSRoot) //{ // pszAction = m_strCCOSRoot + m_strDevicePath; // pszAction += "/Action/#"; // //pTopicList->push_back(pszAction); // //mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd); // SubScribeTopic(pszAction.c_str(), true); // std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl; //} } /* void connlost(void* context, char* cause) { //连接中断 ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; std::cout << "Connection Lost .....[" << std::get(*connection) <<"] why?" << endl; printf("\nConnection lost\n"); printf(" cause: %s\n", cause); std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl; } void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc) { ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; std::cout << "Connection disconnected .....[" << std::get(*connection) << "] why?" << endl; } */ //发送成功 //void delivered(void* context, MQTTAsync_token dt) //{ // printf("Message with token value %d delivery confirmed\n", dt); // //deliveredtoken = dt; //} //void connlost(void* context, char* cause) //{ // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause); // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl; // // //连接中断 // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl; // return; // // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; // int rc; // // printf("Reconnecting\n"); // conn_opts.keepAliveInterval = 20; // conn_opts.cleansession = 1; // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) // { // printf("Failed to start connect, return code %d\n", rc); // //finished = 1; // } //} //重连成功 //void onReconnected(void* context, char* cause) //{ // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause); // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl; // // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // mqtt_topic_list* pTopicList = std::get(*connection); // const char* client_id = std::get(*connection); // int rc; // // //printf(" %s \n", std::get(*connection)); // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl; // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl; // // 重新订阅,暂不重新订阅,看看 // // ///* // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" // // "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); // //opts.onSuccess = onSubscribe; // //opts.onFailure = onSubscribeFailure; // //opts.context = client; // auto it = pTopicList->begin(); // while(it != pTopicList->end()) // { // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts); // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc); // it++; // } // //*/ // //} // ////MQTT 连接主动断开连接失败 //void onDisconnectFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Disconnect failed, rc %d\n", response->code); // //disc_finished = 1; //} // ////MQTT 连接主动断开连接成功 //void onDisconnect(void* context, MQTTAsync_successData* response) //{ // //printf("Successful disconnection\n"); // //disc_finished = 1; //} // //void onSubscribe(void* context, MQTTAsync_successData* response) //{ // printf("Subscribe succeeded\n"); // //subscribed = 1; //} // //void onSubscribeFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Subscribe failed, rc %d\n", response->code); // //finished = 1; //} // // //void onConnectFailure(void* context, MQTTAsync_failureData* response) //{ // printf("Connect failed, rc %d\n", response->code); // //finished = 1; //} //void onConnect(void* context, MQTTAsync_successData* response) //{ // // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context; // mqtt_client* client = (mqtt_client*)std::get(*connection); // // std::cout << "[" << std::get(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl; // // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; // int rc; // // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection)); // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection); // if (hConnected != NULL) // SetEvent(hConnected); // // /* // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" // "Press Q to quit\n\n", TOPIC, CLIENTID, QOS); // opts.onSuccess = onSubscribe; // opts.onFailure = onSubscribeFailure; // opts.context = client; // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) // { // printf("Failed to start subscribe, return code %d\n", rc); // finished = 1; // }*/ //} //MQTT消息抵达 //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) void msgarrivd(void* client, message_data_t* message) { if (client == nullptr) { printf("************************ somthing happend..."); return; } //消息抵达 ResDataObject req; string topic = message->topic_name; //topicName; //msg->get_topic().c_str(); mqtt_client* pClient = (mqtt_client*)client; ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context; string client_id = std::get(*connection); std::string payload((const char*)message->message->payload, message->message->payloadlen); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl; //std::cout << " msg Body " << payload << endl; //mLog::FINFO("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload); const char* pclient_id = client_id.c_str(); req.decode(payload.c_str()); //取消息钩子 void* pHook = std::get(*connection); if (pHook != nullptr) { ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook); ccos_mqtt_msg_filter_func func = std::get(*filter); //std::get(*filter) = nullptr; //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl; //mLog::FINFO("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic); //消息钩子函数存在 if (func != nullptr) { //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc; //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl; //mLog::FINFO("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic); ResDataObject* resObj = std::get(*filter); HANDLE hEvent = std::get(*filter); if (func(&req)) { //勾住了,是该钩子的消息,则通知 std::get(*filter) = nullptr; //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl; //mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} ", GetCurrentThreadId(), client_id, topic); PacketAnalizer::GetPacketContext(&req, *resObj); SetEvent(hEvent); //MQTTAsync_freeMessage(&message); //MQTTAsync_free(topicName); return ; } } } //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl; ccos_mqtt_callback onmsg = std::get(*connection); if (onmsg != nullptr) (onmsg)(&req, topic.c_str(), connection); else { //mLog::FINFO("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get(*connection), topic); //cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get(*connection) << " When processing " << topic << endl; } //MQTTAsync_freeMessage(&message); //MQTTAsync_free(topicName); return ; } ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg) { //Logger* pDevLog = GetLogHandle(); //std::cout << CurrentDateTime() << "======= LogicDevice::NewConnection ============ " << pszClientID << std::endl; //mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword); //连接MQTT broker DWORD dwTick = GetTickCount(); ccos_mqtt_connection* connection = new ccos_mqtt_connection; HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL); //int rc = MQTTAsync_create(&pMqttClient, pszServerUri, pszClientID, MQTTCLIENT_PERSISTENCE_NONE, NULL); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5)); //if (MQTTASYNC_SUCCESS != rc) { // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl; // delete connection; // return nullptr; //} mqtt_client* pMqttClient = nullptr; pMqttClient = mqtt_lease(); //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT " << endl; std::get(*connection) = pMqttClient; mqtt_topic_list* pTopicList = new std::list; std::get(*connection) = pTopicList; CcosLock* pLock = new CcosLock(); std::get(*connection) = pLock; pLock->Thread_Lock(); pLock->Thread_UnLock(); ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter; std::get(*pfilter) = nullptr; std::get(*connection) = pfilter; std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl; std::get(*connection) = pszClientID; std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg; //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd; std::get(*connection) = hConneted; pMqttClient->mqtt_conn_context = connection; //设置MQTT连接参数 //MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; mqtt_set_port(pMqttClient, (char*)pszServerPort); mqtt_set_host(pMqttClient, (char*)pszServer); //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc); mqtt_set_client_id(pMqttClient, (char*)pszClientID); mqtt_set_user_name(pMqttClient, (char*)pszUserName); mqtt_set_password(pMqttClient, (char*)pszPassword); mqtt_set_clean_session(pMqttClient, 1); //mqtt_set_version(pMqttClient, 5); //mqtt_set_write_buf_size(pMqttClient, 8192); //mqtt_set_read_buf_size(pMqttClient, 8192); mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); //if (pszUserName != nullptr && strlen(pszUserName) > 0) // conn_opts.username = pszUserName; //if (pszPassword != nullptr && strlen(pszPassword) > 0) // conn_opts.password = pszPassword; //设置回调函数 //MQTTAsync_setCallbacks(pMqttClient, connection, connlost, msgarrvd, delivered); //if ((rc = MQTTAsync_setConnected(pMqttClient, connection, onReconnected)) != MQTTASYNC_SUCCESS) //{ //} //conn_opts.keepAliveInterval = 20; //conn_opts.cleansession = 1; //conn_opts.onSuccess = onConnect; //conn_opts.onFailure = onConnectFailure; //conn_opts.context = connection; //conn_opts.automaticReconnect = 1; //conn_opts.minRetryInterval = 2; //seconds //conn_opts.maxRetryInterval = 365 * 24 * 60 * 60; //连接Broker //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Connecting 1 to the MQTT server..." << pszClientID << endl; /*if ((rc = MQTTAsync_connect(pMqttClient, &conn_opts)) != MQTTASYNC_SUCCESS)*/ int rc = 0; dwTick = GetTickCount(); while (true) { int rc = mqtt_connect(pMqttClient); if (rc != MQTT_SUCCESS_ERROR) { //默认10s内尝试 if (GetTickCount() - dwTick > 10000) { //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server...【" << pszClientID << "】error : " << rc << endl; //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode); delete connection; delete pfilter; return nullptr; } else { //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl; Sleep(2000); } } else { break; } } //DWORD dwWaitTick = GetTickCount(); //DWORD ret = WaitForSingleObject(hConneted, 20000); DWORD dwWaitTick = GetTickCount() - dwTick; //if (ret == WAIT_OBJECT_0) //{ //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ******[" << GetTickCount()-dwTick << "ms]*** wait:["<< dwWaitTick <<"ms]*** TID [" << GetCurrentThreadId() << std::endl; //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId()); return connection; //} //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl; //delete connection; //delete pfilter; //return nullptr; } //创建额外连接,需要提供回调函数 LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async) { //Logger* pDevLog = GetLogHandle(); //std::cout << CurrentDateTime() << "======= MQTT NewConnection ============" << pszClientID << std::endl; DWORD dwTick = GetTickCount(); //PRINTA_INFO(pDevLog, "=======Driver Module Init ============"); //连接MQTT broker // 开启接受通知 线程 //mqtt_client* pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID, mqtt::create_options(MQTTVERSION_5)); //ccos_mqtt_connection* connection = new ccos_mqtt_connection; //mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883); BUSC::MessageReceiver* pMqttClient = nullptr; pMqttClient = new BUSC::MessageReceiver(BUSID::HashFrom(eSTR::DString(string(pszClientID)))); //MQTTClient_createOptions cropts = MQTTClient_createOptions_initializer; //cropts.MQTTVersion = MQTTVERSION_5; ccos_mqtt_connection* connection = new ccos_mqtt_connection; HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL); //int rc = MQTTAsync_create(&pMqttClient, SERVER_ADDRESS.c_str(), pszClientID, MQTTCLIENT_PERSISTENCE_NONE, // connection); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5)); //if (MQTTASYNC_SUCCESS != rc) { // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl; // delete connection; // return nullptr; //} std::get(*connection) = pMqttClient; //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT 2" << endl; mqtt_topic_list* pTopicList = new std::list; std::get(*connection) = pTopicList; CcosLock* pLock = new CcosLock(); std::get(*connection) = pLock; pLock->Thread_Lock(); pLock->Thread_UnLock(); ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter; std::get(*pfilter) = nullptr; std::get(*connection) = pfilter; std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl; std::get(*connection) = pszClientID; std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg; //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd; std::get(*connection) = hConneted; //pMqttClient->mqtt_conn_context = connection; //设置MQTT连接参数 //MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; //conn_opts.keepAliveInterval = 20; //conn_opts.cleansession = 1; //conn_opts.MQTTVersion = 5; //设置回调函数 /* mqtt_set_port(pMqttClient, (char*)"1883"); mqtt_set_host(pMqttClient, (char*)SERVER_ADDRESS.c_str()); //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc); mqtt_set_client_id(pMqttClient, (char*)pszClientID); //mqtt_set_user_name(client, user_name); // mqtt_set_password(client, password); mqtt_set_clean_session(pMqttClient, 1); //mqtt_set_write_buf_size(pMqttClient, 8192); //mqtt_set_read_buf_size(pMqttClient, 8192); mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE); //mqtt_set_version(pMqttClient, 5); */ //连接Broker //std::cout << CurrentDateTime() << " TID ["<< GetCurrentThreadId() << "] Connecting 2 to the MQTT server..." << pszClientID << endl; int rc = 0; dwTick = GetTickCount(); while (true) { rc = mqtt_connect(pMqttClient); if (rc != MQTT_SUCCESS_ERROR) { if (GetTickCount() - dwTick > dwOpenTimeout) { //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl; //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode); delete connection; delete pfilter; return nullptr; } else { //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc); //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl; Sleep(2000); } } else { break; } } //dwWaitTick = GetTickCount(); //DWORD ret = WaitForSingleObject(hConneted, dwOpenTimeout); DWORD dwWaitTick = GetTickCount() - dwTick; //if (ret == WAIT_OBJECT_0) //{ //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId()); //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ***[" << GetTickCount()-dwTick << "ms]*** wait:[" << dwWaitTick << "ms]*** TID [" << GetCurrentThreadId() << std::endl; return connection; //} //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl; //delete connection; //delete pfilter; //return nullptr; } //关闭并释放连接 LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection) { if (hConnection == nullptr) { return; } //mLog::FINFO(" Close Mqtt Connection..", std::get(*hConnection)); //std::cout << CurrentDateTime() << std::get(*hConnection) << " Close Mqtt Connection.." << endl; mqtt_client* pconn = (mqtt_client*)std::get(*hConnection); if (pconn != nullptr) { //pconn->disconnect(); //MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; //disc_opts.onSuccess = onDisconnect; //disc_opts.onFailure = onDisconnectFailure; mqtt_disconnect(pconn); //delete pconn; } //Sleep(300); //Sleep(300); ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get(*hConnection); std::cout << "Free Connection [" << (UINT64)hConnection << "] filter [" << (UINT64)pfilter << "] .." << endl; if (pfilter != nullptr) delete pfilter; //未知问题导致 释放出现异常,先注释调,后面加上,避免内存泄漏 delete hConnection; mqtt_release(pconn); } //主动订阅主题 LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare) { if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); mqtt_topic_list* pTopicList = std::get(*hConnection); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer; //MQTTProperties prop = MQTTProperties_initializer; //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); //pMqttClient->subscribe(pszTopic, 1, opts); if (pTopicList->size() < 1) { int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd); //mLog::FINFO("mqtt {$} Subscribe First {$} return {$}", std::get(*hConnection), pszTopic, ret); //std::cout << "mqtt 【" << std::get(*hConnection) << " 】Subscribe First " << pszTopic << endl; } else { int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd); //mLog::FINFO("mqtt {$} Subscribe ReUse {$} ", std::get(*hConnection), pszTopic, ret); //std::cout << "mqtt 【" << std::get(*hConnection) << " 】Subscribe ReUse " << pszTopic << endl; } pTopicList->push_back(pszTopic); pLock->Thread_UnLock(); return 0; } //主题订阅取消 LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic) { if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer; //auto ret = pMqttClient->unsubscribe(pszTopic); mqtt_topic_list* pTopicList = std::get(*hConnection); CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); //MQTTAsync_responseOptions resp; pTopicList->remove(pszTopic); int ret = mqtt_unsubscribe(pMqttClient, pszTopic); //mLog::FINFO("mqtt {$} Unsubscribe {$} return {$}", std::get(*hConnection), pszTopic, ret); pLock->Thread_UnLock(); return 2; } //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收 LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName) { char pszClientID[256]; sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId()); ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) { }); mqtt_client* pConn = (mqtt_client*)std::get(*connObj); PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID); //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer; const char* pLoad = pCmd->encode(); mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; int rc = mqtt_publish(pConn, pszTopic, &msg ); //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl; //mLog::FINFO("CLT {$} Publish to {$} result {$}", pszClientID, pszTopic, rc); CloseConnection(connObj); return 0; } //往指定主题发送CCOS协议包整包,使用已创建的连接发送 LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection) { if (hConnection == nullptr) { //mLog::FERROR("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode()); //std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode() return 0; } //mLog::FINFO("{$} Publish Action to {$} Action Body: {$}", std::get(*hConnection), pszTopic, pAction->encode() ); //std::cout << CurrentDateTime() << std::get(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode() mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} std::string client_id = std::get(*hConnection); CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() ); //std::cout << "try publish " << pAction->encode() << endl; //pMqttClient->publish(pszTopic, pAction->encode()); //pConn->publish(pszTopic, pCmd->encode()); //std::cout << "try publish " << pAction->encode() << endl; const char* pLoad = pAction->encode(); int len = strlen(pLoad); //MQTTAsync_responseOptions resp; //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer; mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; msg.payloadlen = len; int rc = mqtt_publish(pMqttClient, pszTopic, &msg); //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp); //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl; if (rc < 0) { //mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad); //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl; } //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL); //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl; pLock->Thread_UnLock(); return 2; } //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理 LOGICDEVICE_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection) { if (hConnection == nullptr) { //mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, pAction, pContext->encode()); //std::cout << CurrentDateTime() << "Who ????? " << "Publish2 to " << pszTopic << " Action Body: " << pAction << endl; //"\n context : " << pContext->encode() << return 0; } //std::cout << CurrentDateTime() << std::get(*hConnection) << " Publish Action 2 to " << pszTopic << " Action Body: " << endl; //<< pAction << "\n context : " << pContext->encode() //mLog::FINFO("{$} Publish Action 2 to {$} Action {$} Body: {$}", std::get(*hConnection), pszTopic, pAction , pContext->encode()); mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); ResDataObject req; //TODO 这里应该添加 组包逻辑,待完善 //PacketAnalizer::MakeActionRequest(req, ); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get(*hConnection)); const char* pLoad = req.encode(); int len = strlen(pLoad); mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; int rc = mqtt_publish(pMqttClient, pszTopic, &msg); pLock->Thread_UnLock(); //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer; //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp); if (rc < 0) { //mLog::FERROR("{$} PublishAction failed {$} body: {$}", std::get(*hConnection), pszTopic, rc, pLoad); //std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << " Publish 2 to [" << pszTopic << "] Send result " << rc << endl; } return 2; } /* /// /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp, /// 超时没收到应答返回失败, /// 复用链接时须小心,该函数会接管回调函数,结束后恢复 /// /// 要发送的命令Action名 /// 命令携带的参数 /// 要发送的目标topic /// 本次请求的应答接收Topic /// 应答返回的参数结果 /// 等待超时时间 /// 复用的MQTT链接句柄 /// 复用的链接的消息处理函数 /// 成功返回2,其他返回错误码 LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime) { std::cout << CurrentDateTime() << std::get(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode() if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); if (hConnection == nullptr) { return 0; } mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); //if (!pMqttClient->is_connected()) { // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic); // return 0; //} HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //SubscribeTopic() auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) { string topic = topicName;//msg->get_topic().c_str(); if (strcmp(pszRespTopic, topic.c_str()) == 0) { std::cout << CurrentDateTime() << std::get(*hConnection) << " get Right Resp by " << topic << endl; //应答到了,处理 ResDataObject req; req.decode((const char*)message->payload); PacketAnalizer::GetPacketContext(&req, resObj); SetEvent(hEvent); //处理结束 将函数指针换回去 void* oldFuncPointer = std::get(*hConnection); //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer); MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL); } else { //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get(*hConnection); (msgarrvd)(context, topicName, topicLen, message); } }; //接管回调 // pMqttClient->set_message_callback(action_func); MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered); //pMqttClient->subscribe(pszRespTopic, 1); MQTTSubscribe_options opts = MQTTSubscribe_options_initializer; MQTTProperties prop = MQTTProperties_initializer; std::cout << "mqtt 【" << std::get(*hConnection) << " 】Subscribe " << pszTopic << endl; //pMqttClient->subscribe(pszTopic, 1, opts); MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL); PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get(*hConnection)); //pMqttClient->publish(pszTopic, req.encode()); MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_message* m = NULL; MQTTClient_deliveryToken dt; MQTTProperty property; property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; property.value.data.data = "test user property"; property.value.data.len = (int)strlen(property.value.data.data); property.value.value.data = "test user property value"; property.value.value.len = (int)strlen(property.value.value.data); MQTTProperties properties = MQTTProperties_initializer; MQTTProperties_add(&properties, &property); //pConn->publish(pszTopic, pCmd->encode()); const char* pLoad = req.encode(); MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt); std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl; DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 return 2; } return 0; } */ /// /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败, /// /// /// /// /// /// /// /// /// LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime) { Sleep(1); //mLog::FINFO("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get(*hConnection), pAction, pszTopic, pContext->encode()); //std::cout << CurrentDateTime() << std::get(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode() if (hConnection == nullptr) { return 0; } DWORD dwTick = GetTickCount(); mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); ResDataObject resContext; string strResObject; char* pszPad = 0; char* pszPad2 = 0; auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2](ResDataObject* rsp) -> bool { //mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp)); //std::cout << CurrentDateTime() << std::get(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl; if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES && strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0) { //mLog::FINFO("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode()); //std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl; resObj = *rsp; return true; } //mLog::FINFO("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode()); //std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl; return false; }; CcosLock* pLock = std::get(*hConnection); pLock->Thread_Lock(); ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get(*hConnection); std::get(*pfilter) = hEvent; std::get(*pfilter) = &resContext; std::get(*pfilter) = func; PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get(*hConnection)); const char* pLoad = req.encode(); mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); msg.payload = (void*)pLoad; msg.qos = MQTT_QOS; dwTick = GetTickCount(); int rc = mqtt_publish(pMqttClient, pszTopic, &msg); pLock->Thread_UnLock(); dwTick = GetTickCount() - dwTick; //mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc); //std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl; dwTick = GetTickCount(); DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 dwTick = GetTickCount() - dwTick; //std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl; //mLog::FINFO("CLT {$} try {$} getresp ok Use Time {$}", std::get(*hConnection), pszTopic, dwTick); std::get(*pfilter) = nullptr; CloseHandle(hEvent); return 2; } std::get(*pfilter) = nullptr; //resObj.decode(strResObject.c_str()); //mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get(*hConnection), pszTopic ); //std::cout << "CLT [" << std::get(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl; std::get(*pfilter) = nullptr; CloseHandle(hEvent); return 0; } /// /// 新建MQTT连接发送Ation并等待应答 /// /// /// /// /// /// /// /// /// LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj, DWORD dwWaitTime, const char* pszSenderName) { ResDataObject req; if (pszRespTopic != nullptr) PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic); //临时创建连接并发送和接收应答 char pszClientID[256]; sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId()); HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if (hEvent == NULL) return 0; ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic, void* conn) { //应答到了,处理 PacketAnalizer::GetPacketContext(req, resObj); SetEvent(hEvent); }); //发布消息,并等待应答 PublishAction(&req, pszTopic, connObj); DWORD ret = WaitForSingleObject(hEvent, dwWaitTime); if (ret == WAIT_OBJECT_0) { //等到应答了 CloseConnection(connObj); CloseHandle(hEvent); return 2; } CloseConnection(connObj); CloseHandle(hEvent); return 0; } LOGICDEVICE_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection) { mqtt_client* pMqttClient = (mqtt_client*)std::get(*hConnection); return 0; }