1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275 |
- // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
- //
- #include "StdAfx.h"
- #include "objbase.h"
- #include "LogicDevice.h"
- #include "PacketAnalizer.h"
- #include "MessageInfo.h"
- #include "common_api.h"
- #include "LocalConfig.h"
- #include "Base64.h"
- #include "SystemLogger.hpp"
- #include <iostream>
- #include<chrono>
- //#include "matt/async_client.h"
- //#include "mqtt/async_client.h"
- //#include "MQTTAsync.h"
- //#include "MQTTAsync.h"
- Log4CPP::Logger* //mLog::gLogger = nullptr;
- //#include "mqttclient.h"
- //void msgarrivd(void* client, message_data_t* message);
- std::string CurrentDateTime();
- //-------------Logic Device SysIF--------------------------
- LogicDeviceSysIF::LogicDeviceSysIF(void)
- {
- }
- LogicDeviceSysIF::~LogicDeviceSysIF(void)
- {
- }
- //init
- void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
- {
- m_pLogicDev = p;
- p->SetSysLogicDevice(this);
- };
- LogicDevice* LogicDeviceSysIF::GetLogicDevice()
- {
- return m_pLogicDev;
- }
- //Command In and Out
- //notify from lower layer
- RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- return RET_FAILED;
- };
- //notify to lower layer
- RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- if (m_pLogicDev)
- {
- return m_pLogicDev->CmdToLogicDev(pCmd);
- }
- return RET_NOSUPPORT;
- };
- const std::string SERVER_ADDRESS("127.0.0.1");
- /*
- string DEVICE_ID; //AS CLIENT_ID
- //const std::string CLIENT_ID("paho_cpp_async_subcribe");
- //const std::string TOPIC("hello");
- mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
- const int QOS = 1;
- const int N_RETRY_ATTEMPTS = 5;*/
- using namespace std;
- LogicDevice* g_pDPCDeviceObject = NULL;
- string LogHost = "";
- //string DEVICE_ID;
- //-------------Data Logic Device--------------------------
- LogicDevice::LogicDevice(void)
- {
- m_pLogger = NULL;
- m_pSysLogic = NULL;
- m_pDevInstance = new char[40];
- m_pResErrorList = new ResDataObject();
- GUID guid;
- string strDevInstance;
- CoCreateGuid(&guid);
- guid_2_string(guid, strDevInstance);
- sprintf_s(m_pDevInstance,40, "%s", strDevInstance.c_str());
- m_EvtNotify = CreateEvent(0, 0, 0, 0);
- m_SemphRequest = CreateSemaphore(NULL, 0, 100, "LogicDevice_Reqquest");
- m_pDrvDPC = NULL;
- g_pDPCDeviceObject = this;
- m_pMqttConntion = nullptr;
- //m_strServer = "tcp://localhost:1883";
- m_strServer = SERVER_ADDRESS;// "192.168.2.225";
- m_strServerPort = "1883";
- m_bMqttUseSSL = false;
- m_strMqttUser = "";
- m_strMqttPassword = "";
- m_strEBusRoot = "";
- m_strCCOSRoot = "";
- m_pParent = nullptr;
- m_topicFilter = nullptr;
- m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
- int x = 0;
- for ( x = 0; x < sizeof(szPad); x++)
- szPad[x] = 'A' + x % 26;
- szPad[x-1] = 0;
- memset(szPad2, 0, sizeof(szPad2));
-
- }
- LogicDevice::~LogicDevice(void)
- {
- delete []m_pDevInstance;
- delete m_pResErrorList;
- CloseHandle(m_EvtNotify);
- CloseHandle(m_SemphRequest);
- m_EvtNotify = NULL;
- delete m_pPacketReceivedQue;
- }
- void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
- if (pszEBusRoot[0] == '/') {
- m_strEBusRoot = pszEBusRoot + 1;
- }
- else {
- m_strEBusRoot = pszEBusRoot;
- }
- char szKeys[256];
- strcpy(szKeys, pszEBusRoot);
- char* pt = szKeys;
- while (*pt != 0)
- {
- if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
- *pt = '_';
- pt++;
- }
- m_strClientID = CCOS_CLIENT_ID_PREFIX;
- m_strClientID += szKeys;
- if(pszCCOSRoot != nullptr)
- m_strCCOSRoot = pszCCOSRoot;
- //mLog::FINFO("Set MQTT ClientName {$} ", m_strClientID);
- SetName(m_strClientID.c_str());
- OnSetClientID();
- }
- void LogicDevice::OnSetClientID() {
- }
- void LogicDevice::SubscribeSelf() {
- if (m_strDevicePath.length() > 0)
- if (m_strDevicePath.c_str()[0] != '/')
- m_strDevicePath = "/" + m_strDevicePath;
- //mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
- //std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
- if (m_strEBusRoot.length() > 0) {
- SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str());
- }
- if (m_strCCOSRoot.length() > 0)
- {
- SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot + m_strDevicePath).c_str());
- //订阅
- }
- SubscribeActions();
- }
- void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
- {
- if (bSubscribe)
- SubscribeTopic(m_pMqttConntion, pszTopic);
- else
- UnSubscribe(m_pMqttConntion, pszTopic);
- }
- void LogicDevice::NotifyDrvThread()
- {
- if (m_EvtNotify)
- {
- SetEvent(m_EvtNotify);
- }
- }
- HANDLE LogicDevice::GetEvtHandle()
- {
- return m_EvtNotify;
- }
- //1. init part
- void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
- {
- m_pSysLogic = pLogic;
- }
- void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
- {
- m_pLogger = pLogger;
- }
- Logger *LogicDevice::GetLogHandle()
- {
- return m_pLogger;
- }
- void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
- {
- m_pDrvDPC = pDPC;
- }
- DriverDPC *LogicDevice::GetDrvDPC()
- {
- return m_pDrvDPC;
- }
- RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
- {
- RET_STATUS ret = RET_FAILED;
- if (szEbusDevPath != nullptr) {
- auto search = GetEbusChild(szEbusDevPath);
- if (search == nullptr)
- {
- m_subDevices.push_back(pChild);
- std::sort(m_subDevices.begin(), m_subDevices.end(), [](LogicDevice* p1, LogicDevice* p2) {
- return p1->GetRootPath() < p2->GetRootPath();
- });
- }
- ret = RET_SUCCEED;
- //return RET_SUCCEED;
- }
-
- return ret;
- }
- RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
- {
- RET_STATUS ret = RET_FAILED;
- if (szCcosDevPath != nullptr)
- {
- if (strncmp(m_strCCOSRoot.c_str(), szCcosDevPath, m_strCCOSRoot.length()) == 0)
- {
- auto search = GetCcosChild(szCcosDevPath);
- if (search == nullptr) {
- m_subCcosDevices.push_back(pChild);
- std::sort(m_subCcosDevices.begin(), m_subCcosDevices.end(), [](LogicDevice* p1, LogicDevice* p2) {
- return p1->GetCcosRootPath() < p2->GetCcosRootPath();
- });
- }
- return RET_SUCCEED;
- }
- }
- return ret;
- }
- LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
- {
-
- //std::binary_search(m_subDevices.begin(), m_subDevices.end(), szEbusDevPath, [szEbusDevPath]()->bool {
- // return
- // });
- for each (LogicDevice* var in m_subDevices)
- {
- if (var->GetRootPath().compare(szEbusDevPath) == 0)
- return var;
- }
- return nullptr;
- }
- LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
- {
- for each (LogicDevice * var in m_subDevices)
- {
- if (var->GetCcosRootPath().compare(szCcosDevPath) == 0)
- return var;
- }
- return nullptr;
- }
- void SYSTEM_CALL LogicDevice::CompleteInit()
- {
- if (//mLog::gLogger == nullptr)
- {
- string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
- LogHost = ((string)getLogRootpath()).c_str();
- if (LogHost.length() <= 1)
- {
- char szName[256];
- sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
- LogHost = szName;
- }
- Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice");
- //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
- Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1);
- auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
- //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
- }
- else
- {
- string strRoot = ((string)getLogRootpath()).c_str();
- if (strRoot.length() > 1 && strRoot != LogHost)
- {
- string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
- LogHost = strRoot;
- Log4CPP::ThreadContext::Map::Set("LogFileName", "LogicDevice");
- //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
- Log4CPP::ThreadContext::Map::Set("LogHost", LogHost.c_str() + 1);
- auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
- //mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
- }
- }
- //string version;
- //if (GetVersion(version, hMyModule))
- // //mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
- //else
- //mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
- //mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
- if (m_strClientID.length() <= 0)
- {
- //mLog::FINFO("No Client name ......" );
- //std::cout << "No Client name ......." << endl;
- return;
- }
- m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
- [this](ResDataObject* req, const char* topic, void* conn) {
- //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
- //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
- // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
- // 消息处理如果耗时较长,则需要开线程来处理
- if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
- strncmp(topic, m_strCCOSRoot.c_str(), m_strCCOSRoot.length()) == 0)
- {
- //主题以ebus或者ccos开头,给我发的消息,进行处理
- ProcessSubscribeRequest(req);
- }
- else
- {
- //我主动订阅的外部模块的消息
- ProcessSubscribeMsg(req);
- }
- CmdToLogicDev(req);
- });
- if (m_pMqttConntion != nullptr)
- {
- StartThread(); //启动Request线程
- SubscribeSelf();
- }
- }
- RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
-
- PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
- switch (type) {
- case PACKET_TYPE_REQ:
- ProcessRequest(req, cmd);//请求
- break;
- case PACKET_TYPE_RES:
- ProcessResponse(req, cmd);//应答
- break;
- case PACKET_TYPE_NOTIFY:
- ProcessNotify(req, cmd);
- //通知
- break;
- }
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
- {
- ProcessSubscribeRequest(pCmd);
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- //Open命令
- if (cmd == PACKET_CMD_OPEN) {
- /* {
- "IDX": "40",
- "TYPE" : "0",
- "CMD" : "0",
- "HANDLE" :
- {
- "ROUTE": "1",
- "FLAGS" : "63",
- "LANG" : "en-US",
- "HANDLEID" : "0",
- "OWNERID" :
- {
- "EBUSID": "ImageSave",
- "MACHINEID" : "DESKTOP-FVD53H8",
- "PROCID" : "35408",
- "ADDR" : "2631366957696"
- },
- "DEVID":
- {
- "EBUSID": "ccosChannel",
- "MACHINEID" : "",
- "PROCID" : "0",
- "ADDR" : "0"
- }
- },
- "KEY": "\/ccosChannel"
- ResDataObject resRes, resResponse;
- ResDataObject resTopic;
- PacketAnalizer::GetContextTopic(pCmd, resTopic);
- if (cmd == PACKET_CMD_OPEN )
- {
- //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
- PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
- return RET_SUCCEED;
- }
- if (cmd == PACKET_CMD_CLOSE)
- {
- //CLOSE 客户端主动断开
- }
-
- } */
- ResDataObject resRes, resResponse;
- GetDeviceResource(&resRes);
- LogicDevice::GetDeviceResource(&resRes);
- //std::cout << "For Open Result" << resRes.encode() << endl;
- PacketAnalizer::MakeOpenResponse(*pCmd, resResponse, resRes);
- PacketAnalizer::UpdateDeviceNotifyResponse(resResponse, getLocalMachineId(), getLocalEbusId(), (UINT64)GetCurrentProcessId(), (UINT64)m_pMqttConntion);
- ResDataObject resTopic;
- PacketAnalizer::GetContextTopic(pCmd, resTopic);
- PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
- //这里先直接给1,后面取真实状态
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE, "ConnectionStatus", "1");
- //CmdFromLogicDev(&NotifyData);
- PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- return RET_SUCCEED;
- }
- else if (cmd == PACKET_CMD_CLOSE)
- {
- }
- else {
-
- PacketArrived(pCmd);
-
- }
- return RET_SUCCEED;
-
- }
- RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- return RET_SUCCEED;
-
- }
- RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- return RET_SUCCEED;
- }
- void LogicDevice::PacketArrived(ResDataObject* pRequest)
- {
- //m_pPacketReceivedQue->Lock();
- m_pPacketReceivedQue->InQueue(*pRequest);
- //SetEvent(m_EvtNotify);//notify to user
- ReleaseSemaphore(m_SemphRequest, 1, NULL);
- //m_pPacketReceivedQue->UnLock();
- }
- bool LogicDevice::OnStartThread()
- {
- return true;
- }
- bool LogicDevice::OnEndThread()
- {
- return true;
- }
- bool LogicDevice::Exec(void)
- {
- HANDLE hWait[2];
- hWait[0] = m_ExitFlag;
- hWait[1] = m_SemphRequest;
- DWORD dwRet = WaitForMultipleObjects(2, hWait, FALSE, 3);
- if (dwRet == WAIT_OBJECT_0)
- {
- return false;
- }
- //else if (dwRet == WAIT_OBJECT_0 + 1)
- {
- //请求到了
- ResDataObject req;
- while (m_pPacketReceivedQue->DeQueue(req))
- {
- ResDataObject resResource, resResponse, resPacket;
- ResDataObject resTopic;
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
- string keystr = PacketAnalizer::GetPacketKey(&req);
- RET_STATUS ret = RET_FAILED;
- PacketAnalizer::MakeResponseByReq(resPacket, req, ret);
- //mLog::FINFO(" {$} Got {$} Request by {$}", GetCurrentThreadId(), keystr, m_strClientID);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Got " << keystr << " Request by " << m_strClientID << endl;
- if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource")
- {
- ResDataObject devRes;
- if ((ret = GetDeviceResource(&devRes)) == RET_SUCCEED)
- {
- LogicDevice::GetDeviceResource(&devRes);
- PacketAnalizer::UpdatePacketContext(resPacket, devRes);
- PacketAnalizer::MakeRetCode(RET_SUCCEED, &resPacket);
-
- }
- else
- {
- PacketAnalizer::MakeRetCode(RET_FAILED, &resPacket);
- }
- }
- else
- {
- ret = Request(&req, &resPacket);
- PacketAnalizer::MakeRetCode(ret, &resPacket);
- PacketAnalizer::GetPacketRetCode(&resPacket, ret);
- }
- //PacketAnalizer::MakeResponseByReq(resPacket, req, ret);
- //PacketAnalizer::UpdatePacketContext(resPacket, resResponse);
- //PacketAnalizer::MakeRetCode(ret, &resPacket);
- PacketAnalizer::GetContextTopic(&req, resTopic);
- //mLog::FINFO("Request result {$}", resPacket.encode());
- //std::cout << " Request(pCmd, &resResponse) result" << resPacket.encode() << endl;
- PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
- //mLog::FINFO(" {$} Send {$} Resp {$}", GetCurrentThreadId(), keystr, m_strClientID);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Send "<< keystr <<" Resp" << m_strClientID << endl;
- }
- return true;
- }
- return true;
- }
- void SYSTEM_CALL LogicDevice::CompleteUnInit()
- {
- StopThread();
- }
- int LogicDevice::GetDevice_Thread_Priority()
- {
- return THREAD_PRIORITY_NONE;
- }
- RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
- {
- //Get Unit Type (Unit GUID)
- if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
- {
- pDeviceResource->add("LogicDevInstance", m_pDevInstance);
- }
- ////
- size_t idx = (*pDeviceResource)["Attribute"].size();
- if (idx > 0)
- {
- int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
- if (erroridx < 0)
- {
- (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
- }
- else
- {
- (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
- }
- }
- else
- {
- ResDataObject Attribute;
- Attribute.add("ErrorList", *m_pResErrorList);
- pDeviceResource->add("Attribute", Attribute);
- }
-
- return RET_SUCCEED;
- }
- //notify from lower layer
- RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
- {
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
- PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
- if (type == PACKET_TYPE_NOTIFY)
- {
- PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- return RET_SUCCEED;
- /*
- if (pCmd && m_pSysLogic)
- {
- return m_pSysLogic->CmdFromLogicDev(pCmd);
- }
- //put log here
- return RET_FAILED;*/
- }
- std::wstring mb2wc_a(const char* mbstr)
- {
- std::wstring strVal;
- int size = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, NULL, 0);
- wchar_t* wcstr = new wchar_t[size + 1];
- if (wcstr)
- {
- memset(wcstr, 0, size * sizeof(wchar_t));
- int ret = MultiByteToWideChar(CP_UTF8, 0, mbstr, -1, wcstr, size);
- if (ret != 0) // MultiByteToWideChar returns 0 if it does not succeed.
- {
- strVal = wcstr;
- }
- delete[] wcstr;
- wcstr = NULL;
- }
- return strVal;
- }
- RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
- {
- string ResBase64, DesBase64;
- wstring wResUTF = ResInfo;
- wstring wDesUTF = Description;
- CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
- CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
- return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
- }
- RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
- {
- string ResBase64,DesBase64;
- wstring wResUTF = mb2wc_a(ResInfo);
- wstring wDesUTF = mb2wc_a(Description);
- CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
- CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
- return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
- }
- RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
- {
- //int ret = 1;
- if (Code == 0 || (string)ResInfo == "")
- {
- //mLog::FERROR("Code or ResInfo is empty");
- return RET_FAILED;
- }
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
- string strDevInstanceCode = DevInstance;
- strDevInstanceCode += Code;
- for (size_t i = 0; i < m_pResErrorList->size(); i++)
- {
- string strInstancekey = m_pResErrorList->GetKey(i);
- if (strInstancekey == strDevInstanceCode)
- {
- //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
- //{
- // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
- // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
- // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
- // {
- //ret = 0;
- //mLog::FDEBUG("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
- return RET_SUCCEED;
- // }
- //}
- //ret = 2;
- //break;
- }
- }
- //if (ret==1)
- {
-
- ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
- //info.GetResDataObject(tempInfo);
- //ErrorInfo.update(Code, tempInfo);
- info.GetResDataObject(ErrorInfo);
- SYSTEMTIME st;
- GetLocalTime(&st);
- string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
- ErrorInfo.add("CreationTime", TimeTag.c_str());
- ErrorInfo.add("AppId", pAppId);
- ErrorInfo.add("InstanceId", DevInstance);
-
- if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
- {
- m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
- }
- ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
- //mLog::FDEBUG( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
- //CmdFromLogicDev(&NotifyData);
- PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- //else if (ret == 2)
- //{
- // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
- // info.GetResDataObject(tempInfo);
- // ErrorInfo.update(Code, tempInfo);
- // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
- // {
- // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
- // }
- // ResNotify.update(DevInstance, ErrorInfo);
- // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
- // RES_//mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
- // CmdFromLogicDev(&NotifyData);
- //}
- //put log here
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
- {
- AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
- //put log here
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
- {
- int index = -1;
- bool m_bClearAll = false;
- //trim empty code string
- string CodeStr = Code;
- if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
- {
- CodeStr = " ";
- Code = CodeStr.c_str();
- m_bClearAll = true;
- }
-
- //if ((string)Code == "0" || (string)Code == "")
- //{
- // m_bClearAll = true;
- //}
- string strDevInstanceCode = DevInstance;
- strDevInstanceCode += Code;
- if (m_bClearAll)
- {
- m_pResErrorList->clear();
- }
- else
- {
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
-
- for (size_t i = 0; i < m_pResErrorList->size(); i++)
- {
- string strInstancekey = m_pResErrorList->GetKey(i);
- if (strInstancekey == strDevInstanceCode)
- {
- //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
- //{
- // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
- // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
- // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
- // {
- // index = (INT)j;
- // break;
- // }
- //}
- index = (INT)i;
- break;
- }
- }
- }
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
- ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
- //info.GetResDataObject(tempInfo);
- //ErrorInfo.add(Code, tempInfo);
- info.GetResDataObject(ErrorInfo);
- ErrorInfo.add("InstanceId", DevInstance);
- bool result = false;
- if (index>=0)
- {
- //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
- result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
-
- }
- if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
- {
- ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
- //mLog::FDEBUG( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
- //CmdFromLogicDev(&NotifyData);
- PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
- {
- DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
- //put log here
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::EvtProcedure()
- {
- return RET_FAILED;
- }
- bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
- {
- //mLog::FERROR("NOT FINISHED YET");
- return false;
- }
- RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
- {
- std::string strResult = "";
- //组Context包
- if (m_pLogger)
- {
- wstring wContect = mb2wc_a(pContext);
- CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
- }
- //Thread_Lock();
- //if (NULL != fmt)
- //{
- // va_list marker = NULL;
- // va_start(marker, fmt);
- // size_t nLength = _vscprintf(fmt, marker) + 1;
- // std::vector<char> vBuffer(nLength, '\0');
- // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
- // if (nWritten > 0)
- // {
- // strResult = &vBuffer[0];
- // }
- // va_end(marker);
- //}
- //Thread_UnLock();
- ResDataObject SysLogNode;
- string guidstr;
- GUID DeviceGuid;
- //组Log包
- if (GetDeviceType(DeviceGuid))
- {
- guid_2_string(DeviceGuid, guidstr);
- SysLogNode.add("Module", guidstr.c_str());
- SysLogNode.add("AppId", pAppId);
- SysLogNode.add("ThreadId", GetCurrentThreadId());
- if (pCode)
- {
- SysLogNode.add("BusinessKey", pCode);
- }
- else
- {
- SysLogNode.add("BusinessKey", "");
- }
- SysLogNode.add("IP", (const char*)getLocalIpAddress());
- SYSTEMTIME st;
- GetLocalTime(&st);
- string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
- SysLogNode.add("CreationTime", TimeTag.c_str());
- string strLevel = SysLogLevel2str(Level);
- SysLogNode.add("Level", strLevel.c_str());
- SysLogNode.add("HostName", (const char*)getLocalMachineId());
- SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
- SysLogNode.add("FreeText", strResult.c_str());
- SysLogNode.add("Context", pCode);
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
- //CmdFromLogicDev(&NotifyData);
- PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- else
- {
- PRINTA_ERROR(m_pLogger, "no Guid??");
- return RET_FAILED;
- }
- //打印LOG
- switch (Level)
- {
- case Syslog_Debug:
- //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
- //mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Information:
- ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
- //mLog::FINFO("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Warning:
- //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
- //mLog::Warn("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Error:
- //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
- //mLog::FERROR("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Fatal:
- //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
- //mLog::FINFO("SysLog {$} ", SysLogNode.encode());
- break;
- default:
- //mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
- break;
- }
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
- {
- std::string strResult = "";
- //组Context包
- if (m_pLogger)
- {
- m_pLogger->Thread_Lock();
- if (NULL != fmt)
- {
- va_list marker = NULL;
- va_start(marker, fmt);
- size_t nLength = _vscprintf(fmt, marker) + 1;
- std::vector<char> vBuffer(nLength, '\0');
- int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
- if (nWritten > 0)
- {
- strResult = &vBuffer[0];
- }
- va_end(marker);
- }
- m_pLogger->Thread_UnLock();
- }
- else
- {
- Thread_Lock();
- if (NULL != fmt)
- {
- va_list marker = NULL;
- va_start(marker, fmt);
- size_t nLength = _vscprintf(fmt, marker) + 1;
- std::vector<char> vBuffer(nLength, '\0');
- int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
- if (nWritten > 0)
- {
- strResult = &vBuffer[0];
- }
- va_end(marker);
- }
- Thread_UnLock();
- }
- ResDataObject SysLogNode;
- string guidstr;
- GUID DeviceGuid;
- //组Log包
- if (GetDeviceType(DeviceGuid))
- {
- guid_2_string(DeviceGuid, guidstr);
- SysLogNode.add("Module", guidstr.c_str());
- SysLogNode.add("AppId", "");
- SysLogNode.add("ThreadId", GetCurrentThreadId());
- if (pCode)
- {
- SysLogNode.add("BusinessKey", pCode);
- }
- else
- {
- SysLogNode.add("BusinessKey", "");
- }
- SysLogNode.add("IP", (const char *)getLocalIpAddress());
- SYSTEMTIME st;
- GetLocalTime(&st);
- string TimeTag = FormatstdString("%04d-%02d-%02d %02d:%02d:%02d.%03u", st.wYear, st.wMonth, st.wDay, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
- SysLogNode.add("CreationTime", TimeTag.c_str());
- SysLogNode.add("Level", Level);
- SysLogNode.add("HostName", (const char *)getLocalMachineId());
- SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
- SysLogNode.add("FreeText", strResult.c_str());
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
- //CmdFromLogicDev(&NotifyData);
- PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- if(m_pParent != nullptr)
- NotifyParent(&NotifyData, "/Notify");
- }
- else
- {
- PRINTA_ERROR(m_pLogger,"no Guid??");
- return RET_FAILED;
- }
- //打印LOG
- switch (Level)
- {
- case Syslog_Debug:
- //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
- //mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Information:
- //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
- //mLog::FINFO("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Warning:
- //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
- //mLog::Warn("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Error:
- //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
- //mLog::FERROR("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Fatal:
- //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
- //mLog::FINFO("SysLog {$}", SysLogNode.encode());
- break;
- default:
- //mLog::FINFO( "SysLog {$}", SysLogNode.encode());
- break;
- }
- return RET_SUCCEED;
- }
- /////////////////////////////////////////////////////////////////////////////
- ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
- {
- return nullptr;
- }
- std::string CurrentDateTime()
- {
- // 获取当前时间点
- auto now = std::chrono::system_clock::now();
- // 将时间长度转换为微秒数
- auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
- // 再转成tm格式
- auto now_time_t = std::chrono::system_clock::to_time_t(now);
- auto now_tm = std::localtime(&now_time_t);
- // 可以直接输出到标准输出
- // std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl;
- // 格式化字符,年月日时分秒
- std::string now_time_str;
- char buf[64];
- std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm);
- now_time_str += buf;
- // 格式化微秒
- snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000);
- now_time_str += buf;
- //printf("%s\n", now_time_str.c_str());
- return now_time_str;
- }
- static string CurrentDateTime2()
- {
- std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
- char buf[100] = { 0 };
- std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
- return buf;
- }
- bool LogicDevice::GetActions(ResDataObject& resAction)
- {
- for (int x = 0; x < m_Actions.size(); x++)
- {
- resAction.update(m_Actions.GetKey(x), "");
- }
- return true;
- }
- void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
- {
- if (m_pParent != nullptr)
- {
- if (hConnection == nullptr)
- {
- hConnection = m_pParent->m_pMqttConntion;
- }
- PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
- }
- }
- const mqtt_qos_t MQTT_QOS = QOS2;
- void LogicDevice::SubscribeActions()
- {
- if (nullptr == m_pMqttConntion)
- return;
- int num = m_Actions.size();
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return ;
- //}
- if (num <= 0) {
- ResDataObject res;
- GetDeviceResource(&res);
- //std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
- m_Actions = res["Action"];
- num = m_Actions.size();
- }
-
- std::string pszAction;
- int ret = 0;
- //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- //*/
- for (size_t i = 0; i < num; i++)
- {
- pszAction = m_strEBusRoot;
- pszAction += "/Action/";
- pszAction += (m_Actions.GetKey(i));
- //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- //pMqttClient->subscribe(pszAction, 1, opts);
- pTopicList->push_back(pszAction);
- ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
- //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
-
- if (m_strEBusRoot != m_strCCOSRoot)
- {
- pszAction = m_strCCOSRoot + m_strDevicePath;
- pszAction += "/Action/";
- pszAction += (m_Actions.GetKey(i));
- //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- //pMqttClient->subscribe(pszAction, 1, opts);
- pTopicList->push_back(pszAction);
- ret = mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
- //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
- }
- }
- pszAction = m_strEBusRoot;
- pszAction += "/Action/UpdateDeviceResource";
- //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- pTopicList->push_back(pszAction);
- mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
- //mLog::FINFO("{$} Subscribe Action {$} return {$}", m_strClientID, pszAction, ret);
- //*/
- //不能订阅#,会造成收两遍请求
- //pszAction = m_strEBusRoot;
- //pszAction += "/Action/#";
- ////pTopicList->push_back(pszAction);
- ////mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd);
- //SubScribeTopic(pszAction.c_str(), true);
- //std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- //if (m_strEBusRoot != m_strCCOSRoot)
- //{
- // pszAction = m_strCCOSRoot + m_strDevicePath;
- // pszAction += "/Action/#";
- // //pTopicList->push_back(pszAction);
- // //mqtt_subscribe(pMqttClient, pszAction.c_str(), QOS1, msgarrivd);
- // SubScribeTopic(pszAction.c_str(), true);
- // std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- //}
- }
- /*
- void connlost(void* context, char* cause)
- {
- //连接中断
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
- printf("\nConnection lost\n");
- printf(" cause: %s\n", cause);
- std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
- }
- void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
- {
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
- }
- */
- //发送成功
- //void delivered(void* context, MQTTAsync_token dt)
- //{
- // printf("Message with token value %d delivery confirmed\n", dt);
- // //deliveredtoken = dt;
- //}
- //void connlost(void* context, char* cause)
- //{
- // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
- // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
- //
- // //连接中断
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
- // return;
- //
- // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- // int rc;
- //
- // printf("Reconnecting\n");
- // conn_opts.keepAliveInterval = 20;
- // conn_opts.cleansession = 1;
- // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
- // {
- // printf("Failed to start connect, return code %d\n", rc);
- // //finished = 1;
- // }
- //}
- //重连成功
- //void onReconnected(void* context, char* cause)
- //{
- // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
- // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
- //
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
- // const char* client_id = std::get<CLINET_ID_ID>(*connection);
- // int rc;
- //
- // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
- // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
- // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
- // // 重新订阅,暂不重新订阅,看看
- //
- // ///*
- // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
- // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
- // //opts.onSuccess = onSubscribe;
- // //opts.onFailure = onSubscribeFailure;
- // //opts.context = client;
- // auto it = pTopicList->begin();
- // while(it != pTopicList->end())
- // {
- // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
- // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
- // it++;
- // }
- // //*/
- //
- //}
- //
- ////MQTT 连接主动断开连接失败
- //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Disconnect failed, rc %d\n", response->code);
- // //disc_finished = 1;
- //}
- //
- ////MQTT 连接主动断开连接成功
- //void onDisconnect(void* context, MQTTAsync_successData* response)
- //{
- // //printf("Successful disconnection\n");
- // //disc_finished = 1;
- //}
- //
- //void onSubscribe(void* context, MQTTAsync_successData* response)
- //{
- // printf("Subscribe succeeded\n");
- // //subscribed = 1;
- //}
- //
- //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Subscribe failed, rc %d\n", response->code);
- // //finished = 1;
- //}
- //
- //
- //void onConnectFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Connect failed, rc %d\n", response->code);
- // //finished = 1;
- //}
- //void onConnect(void* context, MQTTAsync_successData* response)
- //{
- //
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- //
- // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
- //
- // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- // int rc;
- //
- // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
- // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
- // if (hConnected != NULL)
- // SetEvent(hConnected);
- //
- // /*
- // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
- // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
- // opts.onSuccess = onSubscribe;
- // opts.onFailure = onSubscribeFailure;
- // opts.context = client;
- // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
- // {
- // printf("Failed to start subscribe, return code %d\n", rc);
- // finished = 1;
- // }*/
- //}
- //MQTT消息抵达
- //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
- void msgarrivd(void* client, message_data_t* message)
- {
- if (client == nullptr)
- {
- printf("************************ somthing happend...");
- return;
- }
- //消息抵达
- ResDataObject req;
- string topic = message->topic_name; //topicName; //msg->get_topic().c_str();
- mqtt_client* pClient = (mqtt_client*)client;
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
- string client_id = std::get<CLINET_ID_ID>(*connection);
- std::string payload((const char*)message->message->payload, message->message->payloadlen);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl;
- //std::cout << " msg Body " << payload << endl;
- //mLog::FINFO("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload);
- const char* pclient_id = client_id.c_str();
- req.decode(payload.c_str());
- //取消息钩子
- void* pHook = std::get<MSG_HOOK_ID>(*connection);
- if (pHook != nullptr) {
- ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
- ccos_mqtt_msg_filter_func func = std::get<FILTER_FUNC_ID>(*filter);
- //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl;
- //mLog::FINFO("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic);
- //消息钩子函数存在
- if (func != nullptr) {
- //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc;
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl;
- //mLog::FINFO("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic);
- ResDataObject* resObj = std::get<FILTER_RES_OBJ_ID>(*filter);
- HANDLE hEvent = std::get<FILTER_HANDLE_ID>(*filter);
- if (func(&req)) {
- //勾住了,是该钩子的消息,则通知
- std::get<FILTER_FUNC_ID>(*filter) = nullptr;
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl;
- //mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} ", GetCurrentThreadId(), client_id, topic);
- PacketAnalizer::GetPacketContext(&req, *resObj);
- SetEvent(hEvent);
- //MQTTAsync_freeMessage(&message);
- //MQTTAsync_free(topicName);
- return ;
- }
- }
- }
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl;
- ccos_mqtt_callback onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
- if (onmsg != nullptr)
- (onmsg)(&req, topic.c_str(), connection);
- else
- {
- //mLog::FINFO("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get<CLINET_ID_ID>(*connection), topic);
- //cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get<CLINET_ID_ID>(*connection) << " When processing " << topic << endl;
- }
- //MQTTAsync_freeMessage(&message);
- //MQTTAsync_free(topicName);
- return ;
- }
- ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
- {
- //Logger* pDevLog = GetLogHandle();
- //std::cout << CurrentDateTime() << "======= LogicDevice::NewConnection ============ " << pszClientID << std::endl;
- //mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
- //连接MQTT broker
- DWORD dwTick = GetTickCount();
- ccos_mqtt_connection* connection = new ccos_mqtt_connection;
- HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
- //int rc = MQTTAsync_create(&pMqttClient, pszServerUri, pszClientID, MQTTCLIENT_PERSISTENCE_NONE, NULL); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5));
- //if (MQTTASYNC_SUCCESS != rc) {
- // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl;
- // delete connection;
- // return nullptr;
- //}
- mqtt_client* pMqttClient = nullptr;
- pMqttClient = mqtt_lease();
- //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT " << endl;
- std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- mqtt_topic_list* pTopicList = new std::list<string>;
- std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
- CcosLock* pLock = new CcosLock();
- std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
- pLock->Thread_Lock();
- pLock->Thread_UnLock();
- ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
- std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- std::get<MSG_HOOK_ID>(*connection) = pfilter;
- std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
- std::get<CLINET_ID_ID>(*connection) = pszClientID;
- std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
- //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd;
- std::get<CONNECTED_HANDLE_ID>(*connection) = hConneted;
- pMqttClient->mqtt_conn_context = connection;
- //设置MQTT连接参数
- //MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- mqtt_set_port(pMqttClient, (char*)pszServerPort);
- mqtt_set_host(pMqttClient, (char*)pszServer);
- //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
- mqtt_set_client_id(pMqttClient, (char*)pszClientID);
- mqtt_set_user_name(pMqttClient, (char*)pszUserName);
- mqtt_set_password(pMqttClient, (char*)pszPassword);
- mqtt_set_clean_session(pMqttClient, 1);
- //mqtt_set_version(pMqttClient, 5);
- //mqtt_set_write_buf_size(pMqttClient, 8192);
- //mqtt_set_read_buf_size(pMqttClient, 8192);
- mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- //if (pszUserName != nullptr && strlen(pszUserName) > 0)
- // conn_opts.username = pszUserName;
- //if (pszPassword != nullptr && strlen(pszPassword) > 0)
- // conn_opts.password = pszPassword;
- //设置回调函数
- //MQTTAsync_setCallbacks(pMqttClient, connection, connlost, msgarrvd, delivered);
- //if ((rc = MQTTAsync_setConnected(pMqttClient, connection, onReconnected)) != MQTTASYNC_SUCCESS)
- //{
- //}
- //conn_opts.keepAliveInterval = 20;
- //conn_opts.cleansession = 1;
- //conn_opts.onSuccess = onConnect;
- //conn_opts.onFailure = onConnectFailure;
- //conn_opts.context = connection;
- //conn_opts.automaticReconnect = 1;
- //conn_opts.minRetryInterval = 2; //seconds
- //conn_opts.maxRetryInterval = 365 * 24 * 60 * 60;
- //连接Broker
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Connecting 1 to the MQTT server..." << pszClientID << endl;
- /*if ((rc = MQTTAsync_connect(pMqttClient, &conn_opts)) != MQTTASYNC_SUCCESS)*/
- int rc = 0;
- dwTick = GetTickCount();
- while (true)
- {
- int rc = mqtt_connect(pMqttClient);
- if (rc != MQTT_SUCCESS_ERROR)
- {
- //默认10s内尝试
- if (GetTickCount() - dwTick > 10000)
- {
- //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server...【" << pszClientID << "】error : " << rc << endl;
- //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode);
- delete connection;
- delete pfilter;
- return nullptr;
- }
- else
- {
- //mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl;
- Sleep(2000);
- }
- }
- else
- {
- break;
- }
- }
- //DWORD dwWaitTick = GetTickCount();
- //DWORD ret = WaitForSingleObject(hConneted, 20000);
- DWORD dwWaitTick = GetTickCount() - dwTick;
- //if (ret == WAIT_OBJECT_0)
- //{
- //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ******[" << GetTickCount()-dwTick << "ms]*** wait:["<< dwWaitTick <<"ms]*** TID [" << GetCurrentThreadId() << std::endl;
- //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
- return connection;
- //}
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
- //delete connection;
- //delete pfilter;
- //return nullptr;
- }
- //创建额外连接,需要提供回调函数
- LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg,
- DWORD dwOpenTimeout, bool async)
- {
- //Logger* pDevLog = GetLogHandle();
- //std::cout << CurrentDateTime() << "======= MQTT NewConnection ============" << pszClientID << std::endl;
- DWORD dwTick = GetTickCount();
- //PRINTA_INFO(pDevLog, "=======Driver Module Init ============");
- //连接MQTT broker
- // 开启接受通知 线程
- //mqtt_client* pMqttClient = new mqtt::async_client(SERVER_ADDRESS, pszClientID, mqtt::create_options(MQTTVERSION_5));
- //ccos_mqtt_connection* connection = new ccos_mqtt_connection;
- //mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
- BUSC::MessageReceiver* pMqttClient = nullptr;
- pMqttClient = new BUSC::MessageReceiver(BUSID::HashFrom(eSTR::DString(string(pszClientID))));
- //MQTTClient_createOptions cropts = MQTTClient_createOptions_initializer;
- //cropts.MQTTVersion = MQTTVERSION_5;
- ccos_mqtt_connection* connection = new ccos_mqtt_connection;
- HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
- //int rc = MQTTAsync_create(&pMqttClient, SERVER_ADDRESS.c_str(), pszClientID, MQTTCLIENT_PERSISTENCE_NONE,
- // connection); //new mqtt::async_client(pszServerUri, pszClientID, mqtt::create_options(MQTTVERSION_5));
- //if (MQTTASYNC_SUCCESS != rc) {
- // std::cout << "MQTTAsync_create failed. code: " << rc << " with ClientID " << pszClientID << endl;
- // delete connection;
- // return nullptr;
- //}
-
- std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- //std::cout << "[[[[[[[[[[" << pszClientID << "]]]]]]]]]]" << "Connecting MQTT 2" << endl;
- mqtt_topic_list* pTopicList = new std::list<string>;
- std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
- CcosLock* pLock = new CcosLock();
- std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
- pLock->Thread_Lock();
- pLock->Thread_UnLock();
- ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
- std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- std::get<MSG_HOOK_ID>(*connection) = pfilter;
- std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
- std::get<CLINET_ID_ID>(*connection) = pszClientID;
- std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
- //std::get< MQTT_MSG_ARRIVED_ID>(*connection) = msgarrvd;
- std::get<CONNECTED_HANDLE_ID>(*connection) = hConneted;
- //pMqttClient->mqtt_conn_context = connection;
-
- //设置MQTT连接参数
- //MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
- //conn_opts.keepAliveInterval = 20;
- //conn_opts.cleansession = 1;
- //conn_opts.MQTTVersion = 5;
- //设置回调函数
- /*
- mqtt_set_port(pMqttClient, (char*)"1883");
- mqtt_set_host(pMqttClient, (char*)SERVER_ADDRESS.c_str());
- //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
- mqtt_set_client_id(pMqttClient, (char*)pszClientID);
- //mqtt_set_user_name(client, user_name);
- // mqtt_set_password(client, password);
- mqtt_set_clean_session(pMqttClient, 1);
- //mqtt_set_write_buf_size(pMqttClient, 8192);
- //mqtt_set_read_buf_size(pMqttClient, 8192);
- mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- //mqtt_set_version(pMqttClient, 5);
- */
- //连接Broker
- //std::cout << CurrentDateTime() << " TID ["<< GetCurrentThreadId() << "] Connecting 2 to the MQTT server..." << pszClientID << endl;
- int rc = 0;
- dwTick = GetTickCount();
- while (true)
- {
- rc = mqtt_connect(pMqttClient);
- if (rc != MQTT_SUCCESS_ERROR)
- {
- if (GetTickCount() - dwTick > dwOpenTimeout)
- {
- //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
- //printf("Failed to connect, %s return code %d\n", pszClientID, resp.reasonCode);
- delete connection;
- delete pfilter;
- return nullptr;
- }
- else
- {
- //mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
- Sleep(2000);
- }
- }
- else
- {
- break;
- }
- }
- //dwWaitTick = GetTickCount();
- //DWORD ret = WaitForSingleObject(hConneted, dwOpenTimeout);
- DWORD dwWaitTick = GetTickCount() - dwTick;
- //if (ret == WAIT_OBJECT_0)
- //{
- //mLog::FINFO("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
- //std::cout << CurrentDateTime() << "MQTT [" << pszClientID << "] try ConnMqtt Succecced Use Time ***[" << GetTickCount()-dwTick << "ms]*** wait:[" << dwWaitTick << "ms]*** TID [" << GetCurrentThreadId() << std::endl;
- return connection;
- //}
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server..." << pszClientID << endl;
- //delete connection;
- //delete pfilter;
- //return nullptr;
- }
- //关闭并释放连接
- LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
- {
- if (hConnection == nullptr)
- {
- return;
- }
- //mLog::FINFO(" Close Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
- //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
- mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- if (pconn != nullptr) {
- //pconn->disconnect();
- //MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- //disc_opts.onSuccess = onDisconnect;
- //disc_opts.onFailure = onDisconnectFailure;
- mqtt_disconnect(pconn);
- //delete pconn;
- }
- //Sleep(300);
- //Sleep(300);
- ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
- std::cout << "Free Connection [" << (UINT64)hConnection << "] filter [" << (UINT64)pfilter << "] .." << endl;
- if (pfilter != nullptr)
- delete pfilter;
- //未知问题导致 释放出现异常,先注释调,后面加上,避免内存泄漏
- delete hConnection;
- mqtt_release(pconn);
- }
- //主动订阅主题
- LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
- //MQTTProperties prop = MQTTProperties_initializer;
- //MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- //pMqttClient->subscribe(pszTopic, 1, opts);
- if (pTopicList->size() < 1)
- {
- int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
- //mLog::FINFO("mqtt {$} Subscribe First {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
- //std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe First " << pszTopic << endl;
- }
- else
- {
- int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
- //mLog::FINFO("mqtt {$} Subscribe ReUse {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
- //std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe ReUse " << pszTopic << endl;
- }
- pTopicList->push_back(pszTopic);
- pLock->Thread_UnLock();
- return 0;
- }
- //主题订阅取消
- LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
- //auto ret = pMqttClient->unsubscribe(pszTopic);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- //MQTTAsync_responseOptions resp;
- pTopicList->remove(pszTopic);
- int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
- //mLog::FINFO("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
- pLock->Thread_UnLock();
- return 2;
- }
- //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
- LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName)
- {
- char pszClientID[256];
- sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
- ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
- });
- mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
- PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
- //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
- const char* pLoad = pCmd->encode();
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- int rc = mqtt_publish(pConn, pszTopic, &msg );
- //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
- //mLog::FINFO("CLT {$} Publish to {$} result {$}", pszClientID, pszTopic, rc);
- CloseConnection(connObj);
- return 0;
- }
- //往指定主题发送CCOS协议包整包,使用已创建的连接发送
- LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection)
- {
- if (hConnection == nullptr)
- {
- //mLog::FERROR("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
- //std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
- return 0;
- }
- //mLog::FINFO("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode() );
- //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode()
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
-
- //std::cout << "try publish " << pAction->encode() << endl;
- //pMqttClient->publish(pszTopic, pAction->encode());
- //pConn->publish(pszTopic, pCmd->encode());
- //std::cout << "try publish " << pAction->encode() << endl;
- const char* pLoad = pAction->encode();
- int len = strlen(pLoad);
- //MQTTAsync_responseOptions resp;
- //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- msg.payloadlen = len;
- int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
- //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
- if (rc < 0)
- {
- //mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
- //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
- }
- //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
- //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
- pLock->Thread_UnLock();
- return 2;
- }
- //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
- LOGICDEVICE_API int PublishAction(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection)
- {
- if (hConnection == nullptr)
- {
- //mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, pAction, pContext->encode());
- //std::cout << CurrentDateTime() << "Who ????? " << "Publish2 to " << pszTopic << " Action Body: " << pAction << endl; //"\n context : " << pContext->encode() <<
- return 0;
- }
- //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action 2 to " << pszTopic << " Action Body: " << endl; //<< pAction << "\n context : " << pContext->encode()
- //mLog::FINFO("{$} Publish Action 2 to {$} Action {$} Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction , pContext->encode());
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- ResDataObject req;
- //TODO 这里应该添加 组包逻辑,待完善
- //PacketAnalizer::MakeActionRequest(req, );
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
- const char* pLoad = req.encode();
- int len = strlen(pLoad);
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- pLock->Thread_UnLock();
- //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
- //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
- if (rc < 0)
- {
- //mLog::FERROR("{$} PublishAction failed {$} body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, rc, pLoad);
- //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish 2 to [" << pszTopic << "] Send result " << rc << endl;
- }
- return 2;
- }
- /*
- /// <summary>
- /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
- /// 超时没收到应答返回失败,
- /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
- /// </summary>
- /// <param name="pAction">要发送的命令Action名</param>
- /// <param name="pContext">命令携带的参数</param>
- /// <param name="pszTopic">要发送的目标topic</param>
- /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
- /// <param name="resObj">应答返回的参数结果</param>
- /// <param name="dwWaitTime">等待超时时间</param>
- /// <param name="hConnection">复用的MQTT链接句柄</param>
- /// <param name="onmsg">复用的链接的消息处理函数</param>
- /// <returns>成功返回2,其他返回错误码</returns>
- LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
- ResDataObject& resObj, DWORD dwWaitTime)
- {
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // ////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- //SubscribeTopic()
- auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
- string topic = topicName;//msg->get_topic().c_str();
- if (strcmp(pszRespTopic, topic.c_str()) == 0) {
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
- //应答到了,处理
- ResDataObject req;
- req.decode((const char*)message->payload);
- PacketAnalizer::GetPacketContext(&req, resObj);
- SetEvent(hEvent);
- //处理结束 将函数指针换回去
- void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
- //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
- MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
- }
- else
- {
- //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
- (msgarrvd)(context, topicName, topicLen, message);
- }
- };
- //接管回调
- // pMqttClient->set_message_callback(action_func);
- MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
- //pMqttClient->subscribe(pszRespTopic, 1);
- MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
- MQTTProperties prop = MQTTProperties_initializer;
- std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
- //pMqttClient->subscribe(pszTopic, 1, opts);
- MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
- //pMqttClient->publish(pszTopic, req.encode());
- MQTTClient_message pubmsg = MQTTClient_message_initializer;
- MQTTClient_message* m = NULL;
- MQTTClient_deliveryToken dt;
- MQTTProperty property;
- property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
- property.value.data.data = "test user property";
- property.value.data.len = (int)strlen(property.value.data.data);
- property.value.value.data = "test user property value";
- property.value.value.len = (int)strlen(property.value.value.data);
- MQTTProperties properties = MQTTProperties_initializer;
- MQTTProperties_add(&properties, &property);
- //pConn->publish(pszTopic, pCmd->encode());
- const char* pLoad = req.encode();
- MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- return 2;
- }
- return 0;
- }
- */
- /// <summary>
- /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
- /// </summary>
- /// <param name="hConnection"></param>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="hEvent"></param>
- /// <param name="dwWaitTime"></param>
- /// <returns></returns>
- LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
- const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
- {
- Sleep(1);
- //mLog::FINFO("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
- //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
- if (hConnection == nullptr)
- {
- return 0;
- }
- DWORD dwTick = GetTickCount();
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- ResDataObject resContext;
- string strResObject;
- char* pszPad = 0;
- char* pszPad2 = 0;
- auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2](ResDataObject* rsp) -> bool {
- //mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
- //std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
- if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
- strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
- {
- //mLog::FINFO("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
- //std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
- resObj = *rsp;
- return true;
- }
- //mLog::FINFO("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
- //std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
- return false;
- };
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
- std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
- std::get<FILTER_RES_OBJ_ID>(*pfilter) = &resContext;
- std::get<FILTER_FUNC_ID>(*pfilter) = func;
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
- const char* pLoad = req.encode();
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- dwTick = GetTickCount();
- int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- pLock->Thread_UnLock();
- dwTick = GetTickCount() - dwTick;
- //mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
- //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
- dwTick = GetTickCount();
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- dwTick = GetTickCount() - dwTick;
- //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
- //mLog::FINFO("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
- std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
- CloseHandle(hEvent);
- return 2;
- }
- std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- //resObj.decode(strResObject.c_str());
- //mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
- //std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
- std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
- CloseHandle(hEvent);
- return 0;
- }
- /// <summary>
- /// 新建MQTT连接发送Ation并等待应答
- /// </summary>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="pszRespTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="dwWaitTime"></param>
- /// <param name="pszSenderName"></param>
- /// <returns></returns>
- LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
- DWORD dwWaitTime, const char* pszSenderName)
- {
- ResDataObject req;
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- //临时创建连接并发送和接收应答
- char pszClientID[256];
- sprintf_s(pszClientID, "TEMP_%s_%d_0X%08X", pszSenderName == nullptr ? "ANONYMOUS" : pszSenderName, GetCurrentProcessId(), GetCurrentThreadId());
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (hEvent == NULL)
- return 0;
- ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, hEvent](ResDataObject* req, const char* topic, void* conn) {
- //应答到了,处理
- PacketAnalizer::GetPacketContext(req, resObj);
- SetEvent(hEvent);
- });
- //发布消息,并等待应答
- PublishAction(&req, pszTopic, connObj);
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- CloseConnection(connObj);
- CloseHandle(hEvent);
- return 2;
- }
- CloseConnection(connObj);
- CloseHandle(hEvent);
- return 0;
- }
- LOGICDEVICE_API int PublishAction(const ResDataObject pAction, const char* pszTopic, const char* pszRespTopic, ccos_mqtt_connection* hConnection)
- {
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- return 0;
- }
|