123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498 |
- // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
- //
- #include <cstring>
- #include <iostream>
- #include <chrono>
- #include "LogicDevice.h"
- #include <stddef.h> // 用于 offsetof
- #include "PacketAnalizer.h"
- #include "MessageInfo.h"
- #include "common_api.h"
- #include "LocalConfig.h"
- #include "Base64.h"
- #include "SystemLogger.hpp"
- #include "mqttclient.h"
- #include "LinuxEvent.h"
- //Log4CPP::Logger* ////mLog::gLogger = nullptr;
- void msgarrivd(void* client, message_data_t* message);
- std::string CurrentDateTime();
- //-------------Logic Device SysIF--------------------------
- LogicDeviceSysIF::LogicDeviceSysIF(void)
- {
- }
- LogicDeviceSysIF::~LogicDeviceSysIF(void)
- {
- }
- //init
- void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
- {
- m_pLogicDev = p;
- //p->SetSysLogicDevice(this);
- };
- LogicDevice* LogicDeviceSysIF::GetLogicDevice()
- {
- return m_pLogicDev;
- }
- //Command In and Out
- //notify from lower layer
- RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- return RET_FAILED;
- };
- //notify to lower layer
- RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
- {
- if (m_pLogicDev)
- {
- return m_pLogicDev->CmdToLogicDev(pCmd);
- }
- return RET_NOSUPPORT;
- };
- const std::string SERVER_ADDRESS("127.0.0.1");
- /*
- string DEVICE_ID; //AS CLIENT_ID
- //const std::string CLIENT_ID("paho_cpp_async_subcribe");
- //const std::string TOPIC("hello");
- mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
- const int QOS = 1;
- const int N_RETRY_ATTEMPTS = 5;*/
- using namespace std;
- LogicDevice* g_pDPCDeviceObject = NULL;
- string LogHost = "";
- //string DEVICE_ID;
- //-------------Data Logic Device--------------------------
- LogicDevice::LogicDevice(void)
- {
- m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
- uuid_t uuid;
- char uuid_str[37];
- uuid_generate_random(uuid);
- uuid_unparse(uuid, uuid_str);
- m_pDevInstance = new char[40];
- strncpy(m_pDevInstance, uuid_str, 40);
- m_pResErrorList = new ResDataObject();
- sem_init(&m_SemphRequest, 0, 0);
- sem_init(&m_SemphPublish, 0, 0);
- m_pDrvDPC = NULL;
- g_pDPCDeviceObject = this;
- m_pMqttConntion = nullptr;
- //m_strServer = "tcp://localhost:1883";
- m_strServer = SERVER_ADDRESS;// "192.168.2.225";
- m_strServerPort = "1883";
- m_bMqttUseSSL = false;
- m_strMqttUser = "";
- m_strMqttPassword = "";
- m_strEBusRoot = "";
- m_strCCOSDevicePath = "";
- m_pParent = nullptr;
- m_topicFilter = nullptr;
- m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
- m_pPacketSendingQue = new MsgQueue<ResDataObject>();
- int x = 0;
- for ( x = 0; x < sizeof(szPad); x++)
- szPad[x] = 'A' + x % 26;
- szPad[x-1] = 0;
- memset(szPad2, 0, sizeof(szPad2));
- m_dwLastPacket = GetTickCount();
- }
- LogicDevice::~LogicDevice(void)
- {
- delete []m_pDevInstance;
- delete m_pResErrorList;
- sem_destroy(&m_SemphRequest);
- sem_destroy(&m_SemphPublish);
- //m_EvtNotify = NULL;
- delete m_pPacketReceivedQue;
- delete m_pPacketSendingQue;
- }
- void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
- if (m_strClientID.length() > 0)
- {
- ////mLog::FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID);
- return;
- }
- ////mLog::FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot);
- if (pszEBusRoot[0] == '/') {
- m_strEBusRoot = pszEBusRoot + 1;
- }
- else {
- m_strEBusRoot = pszEBusRoot;
- }
- char szKeys[256];
- strcpy(szKeys, pszEBusRoot);
- char* pt = szKeys;
- while (*pt != 0)
- {
- if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
- *pt = '_';
- pt++;
- }
- m_strClientID = CCOS_CLIENT_ID_PREFIX;
- if (szKeys[0] == '_')
- m_strClientID += (szKeys + 1);
- else
- m_strClientID += szKeys;
- if (pszCCOSRoot != nullptr)
- {
- std::cout << "LogicDevice::SetClientRootID [ Set CCOS path is" << pszCCOSRoot << "]" << std::endl;
- ////mLog::FINFO("Set CCOS path: {$}", pszCCOSRoot);
- m_strCCOSDevicePath = pszCCOSRoot;
- int nPos = m_strCCOSDevicePath.find('/');
- if (nPos != string::npos)
- {
- //CCOS/
- nPos = m_strCCOSDevicePath.find('/', nPos + 1);
- //CCOS/DEVICE/
- if (nPos != string::npos)
- {
- m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos);
- //CCOS/DEVICE/Generator
- nPos = m_strCCOSDevicePath.find('/', nPos + 1);
- if (nPos != string::npos)
- {
- m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos);
- }
- }
- }
- ////mLog::FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath);
- }
- ////mLog::FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath);
- SetName(m_strClientID.c_str());
- OnSetClientID();
- }
- void LogicDevice::OnSetClientID() {
- }
- void LogicDevice::SubscribeSelf() {
- size_t topicCount = 0;
- if (m_strEBusRoot.length() > 0) topicCount++;
- if (m_strCCOSDevicePath.length() > 0) topicCount++;
- if (m_strAbstractPath.length() > 0) {
- topicCount++;
- if (m_strDevicePath.length() > 0) topicCount++;
- }
- if (m_strCCOSRoot.length() > 0) topicCount++;
- topicCount += 7 * 3;
- m_subscribedTopics.reserve(topicCount);
- if (m_strDevicePath.length() > 0 && m_strDevicePath[0] != '/') {
- m_strDevicePath = "/" + m_strDevicePath;
- }
- ////mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
- std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
- if (m_strEBusRoot.length() > 0) {
- m_subscribedTopics.push_back(m_strEBusRoot);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- }
- if (m_strCCOSDevicePath.length() > 0)
- {
- ////mLog::FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath);
- std::string fullTopic = m_strCCOSDevicePath + m_strDevicePath;
- m_subscribedTopics.push_back(fullTopic);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- //订阅
- }
- ////mLog::FINFO("AbstractPath [{$}] ", m_strAbstractPath);
- if (m_strAbstractPath.length() > 0)
- {
- m_subscribedTopics.push_back(m_strAbstractPath);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- if (m_strDevicePath.length() > 0)
- {
- std::string fullAbstractTopic = m_strAbstractPath + m_strDevicePath;
- m_subscribedTopics.push_back(fullAbstractTopic);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- }
- }
- ////mLog::FINFO("CCOSRoot [{$}] ", m_strCCOSRoot);
- if (m_strCCOSRoot.length() > 0)
- {
- m_subscribedTopics.push_back(m_strCCOSRoot);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- //订阅
- }
- SubscribeActions();
- }
- void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
- {
- if (bSubscribe)
- {
- m_subscribedTopics.push_back(pszTopic);
- SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
- }
- else
- UnSubscribe(m_pMqttConntion, pszTopic);
- }
- void LogicDevice::NotifyDrvThread()
- {
- m_EvtNotify->SetEvent();
- }
- std::shared_ptr<LinuxEvent> LogicDevice::GetEvtHandle()
- {
- return m_EvtNotify;
- }
- //1. init part
- //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
- //{
- // m_pSysLogic = pLogic;
- //}
- //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
- //{
- // m_pLogger = pLogger;
- //}
- //
- //Logger *LogicDevice::GetLogHandle()
- //{
- // return m_pLogger;
- //}
- void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
- {
- m_pDrvDPC = pDPC;
- }
- DriverDPC *LogicDevice::GetDrvDPC()
- {
- return m_pDrvDPC;
- }
- RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
- {
- if (szEbusDevPath == nullptr)
- return RET_FAILED;
- for (auto dev : m_subDevices) {
- if (dev->GetRootPath() == szEbusDevPath)
- return RET_SUCCEED;
- }
- m_subDevices.push_back(pChild);
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
- {
- if (szCcosDevPath == nullptr)
- return RET_FAILED;
- for (auto dev : m_subCcosDevices) {
- if (dev->GetCcosRootPath() == szCcosDevPath)
- return RET_SUCCEED;
- }
- m_subCcosDevices.push_back(pChild);
- return RET_SUCCEED;
- }
- LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
- {
- for (auto dev : m_subDevices) {
- if (dev->GetRootPath() == szEbusDevPath)
- return dev;
- }
- return nullptr;
- }
- LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
- {
- for (auto dev : m_subCcosDevices) {
- if (dev->GetCcosRootPath() == szCcosDevPath)
- return dev;
- }
- return nullptr;
- }
- void SYSTEM_CALL LogicDevice::CompleteInit()
- {
- //if (//mLog::gLogger == nullptr)
- //{
- // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
- // LogHost = ((string)getLogRootpath()).c_str();
- // if (LogHost.length() <= 1)
- // {
- // char szName[256];
- // sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
- // LogHost = szName;
- // }
- // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
- // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
- // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
- // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
- // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
- // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
- //}
- //else
- //{
- // string strRoot = ((string)getLogRootpath()).c_str();
- // if (strRoot.length() > 1 && strRoot != LogHost)
- // {
- // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
- // LogHost = strRoot;
- // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
- // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
- // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
- // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
- // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
- // }
- //}
- //string version;
- //if (GetVersion(version, hMyModule))
- // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
- //else
- ////mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
- ////mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
- if (m_strClientID.length() <= 0)
- {
- ////mLog::FWARN("No Client name ......" );
- std::cout << "No Client name ......." << endl;
- return;
- }
- std::cout << "CompleteInit->NewConnection ......." << endl;
- m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
- [this](ResDataObject* req, const char* topic, void* conn) {
- //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
- //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
- // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
- // 消息处理如果耗时较长,则需要开线程来处理
- if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
- strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0)
- {
- //主题以ebus或者ccos开头,给我发的消息,进行处理
- ProcessSubscribeRequest(req);
- }
- else
- {
- //我主动订阅的外部模块的消息
- ProcessSubscribeMsg(req);
- }
- //CmdToLogicDev(req);
- });
- if (m_pMqttConntion != nullptr)
- {
- StartThread(); //启动Request线程
- SubscribeSelf();
- //DWORD dwThreadID = 0;
- //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID);
- //////mLog::FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID);
- }
- }
- RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
-
- PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
- switch (type) {
- case PACKET_TYPE_REQ:
- PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction);
- ProcessRequest(req, cmd);//请求
- break;
- case PACKET_TYPE_RES:
- ProcessResponse(req, cmd);//应答
- break;
- case PACKET_TYPE_NOTIFY:
- ProcessNotify(req, cmd);
- //通知
- break;
- }
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
- {
- ProcessSubscribeRequest(pCmd);
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- //Open命令
- if (cmd == PACKET_CMD_OPEN) {
- /* {
- "IDX": "40",
- "TYPE" : "0",
- "CMD" : "0",
- "HANDLE" :
- {
- "ROUTE": "1",
- "FLAGS" : "63",
- "LANG" : "en-US",
- "HANDLEID" : "0",
- "OWNERID" :
- {
- "EBUSID": "ImageSave",
- "MACHINEID" : "DESKTOP-FVD53H8",
- "PROCID" : "35408",
- "ADDR" : "2631366957696"
- },
- "DEVID":
- {
- "EBUSID": "ccosChannel",
- "MACHINEID" : "",
- "PROCID" : "0",
- "ADDR" : "0"
- }
- },
- "KEY": "\/ccosChannel"
- ResDataObject resRes, resResponse;
- ResDataObject resTopic;
- PacketAnalizer::GetContextTopic(pCmd, resTopic);
- if (cmd == PACKET_CMD_OPEN )
- {
- //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
- PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
- return RET_SUCCEED;
- }
- if (cmd == PACKET_CMD_CLOSE)
- {
- //CLOSE 客户端主动断开
- }
- } */
- PacketArrived(pCmd);
- return RET_SUCCEED;
- }
- else if (cmd == PACKET_CMD_CLOSE)
- {
- ResDataObject resp;
- InnerOpenClose(*pCmd, resp, false);
- }
- else {
- PacketArrived(pCmd);
- }
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
- {
- PacketArrived(pCmd);
- return RET_SUCCEED;
- }
- void LogicDevice::PacketArrived(ResDataObject* pRequest)
- {
- //m_pPacketReceivedQue->Lock();
- ////mLog::FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
- //if (Thread_Lock(5000) != WAIT_OBJECT_0)
- //{
- // ////mLog::FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
- // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
- // return ;
- //}
- m_pPacketReceivedQue->InQueue(*pRequest);
- ////mLog::FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
- //SetEvent(m_EvtNotify);//notify to user
- long nLast = 0;
- int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程
- if (released <= 0)
- {
- ////mLog::FERROR("PacketArrived ReleaseSemaphore failed {$} ", GetLastError());
- }
- //Thread_UnLock();
- //m_pPacketReceivedQue->UnLock();
- }
- bool LogicDevice::OnStartThread()
- {
- return true;
- }
- bool LogicDevice::OnEndThread()
- {
- return true;
- }
- //单一发送线程,暂时未用
- DWORD LogicDevice::Thread_Publish_Thread(void* pPara)
- {
- INT ret = 0;
- int waitevent = 0;
- LogicDevice* handle = (LogicDevice*)pPara;
- //prev work usleep(30000);
- ////mLog::FINFO("Thread Start [{$}]", handle->m_strClientID);
- while (!handle->m_ExitFlag->Wait(1))
- {
- //do work
- ResDataObject resSend;
- DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100);
- if (wait != WAIT_OBJECT_0)
- continue;
- bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend);
- if (bHasPacket)
- {
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend);
- PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend);
- ////mLog::FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd);
- if (PACKET_CMD_NONE == cmd)
- {
- ////mLog::FDEBUG(" [{$}] Publish what packet {$} ", handle->m_strClientID, resSend.encode());
- }
- if (type == PACKET_TYPE_NOTIFY)
- {
- CcosDevFileHandle* pHandle = new CcosDevFileHandle;
- PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle);
- ////mLog::FINFO("Notify Transaction: {$}", handle->m_strCurTransaction);
- PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction);
- PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion);
- ////mLog::FDEBUG("Notify: {$}", resSend.encode());
- //printf("--> %s \n", pCmd->encode());
- PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion);
- PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion);
- delete pHandle;
- }
- else
- {
- ResDataObject resTopic;
- PacketAnalizer::GetPacketTopic(&resSend, resTopic);
- //PacketAnalizer::GetPacketTopicGetPacketTopic
- //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic);
- ////mLog::FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic);
- PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion);
- }
- }
- }
- return 0;
- }
- RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevOpen - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
- ResDataObject req, reqParam;
- reqParam = pszGroup;
- cout << "LogicDevice::DevOpen - Creating OPEN request. Command: PACKET_CMD_OPEN" << endl;
- PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam);
- RET_STATUS ret = InnerOpenClose(req, resRespons, true);
- cout << "LogicDevice::DevOpen - InnerOpenClose returned: " << ret << endl;
- return ret;
- }
- RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevClose - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
- ResDataObject req, reqParam;
- reqParam = pszGroup;
- cout << "LogicDevice::DevClose - Creating CLOSE request. Command: PACKET_CMD_CLOSE" << endl;
- PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam);
- RET_STATUS ret = InnerOpenClose(req, resRespons, false);
- cout << "LogicDevice::DevClose - InnerOpenClose returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Get device property
- /// </summary>
- RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevGet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr") << endl;
- ResDataObject req, reqParam;
- cout << "LogicDevice::DevGet - Creating GET request. Command: PACKET_CMD_GET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevGet - Request returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Set device property
- /// </summary>
- RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevSet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
- << ", pszValueSet: " << (pszValueSet ? pszValueSet : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszValueSet != nullptr && pszValueSet[0] != 0) {
- if (pszValueSet[0] == '{') {
- cout << "LogicDevice::DevSet - Decoding JSON value for property" << endl;
- reqParam.decode(pszValueSet);
- }
- else {
- cout << "LogicDevice::DevSet - Using raw value for property" << endl;
- reqParam = pszValueSet;
- }
- }
- else {
- cout << "LogicDevice::DevSet - No value provided for property" << endl;
- }
- cout << "LogicDevice::DevSet - Creating SET request. Command: PACKET_CMD_SET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevSet - Request returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Update device property
- /// </summary>
- RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevUpdate - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
- << ", pszValueUpdate: " << (pszValueUpdate ? pszValueUpdate : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) {
- if (pszValueUpdate[0] == '{') {
- cout << "LogicDevice::DevUpdate - Decoding JSON value for update" << endl;
- reqParam.decode(pszValueUpdate);
- }
- else {
- cout << "LogicDevice::DevUpdate - Using raw value for update" << endl;
- reqParam = pszValueUpdate;
- }
- }
- else {
- cout << "LogicDevice::DevUpdate - No update value provided" << endl;
- }
- cout << "LogicDevice::DevUpdate - Creating UPDATE request. Command: PACKET_CMD_UPDATE, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevUpdate - Request returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Add device property
- /// </summary>
- RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevAdd - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
- << ", pszValueAdd: " << (pszValueAdd ? pszValueAdd : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszValueAdd != nullptr && pszValueAdd[0] != 0) {
- if (pszValueAdd[0] == '{') {
- cout << "LogicDevice::DevAdd - Decoding JSON value for addition" << endl;
- reqParam.decode(pszValueAdd);
- }
- else {
- cout << "LogicDevice::DevAdd - Using raw value for addition" << endl;
- reqParam = pszValueAdd;
- }
- }
- else {
- cout << "LogicDevice::DevAdd - No value provided for addition" << endl;
- }
- cout << "LogicDevice::DevAdd - Creating ADD request. Command: PACKET_CMD_ADD, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevAdd - Request returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Delete device property
- /// </summary>
- RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevDel - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
- << ", pszValueDel: " << (pszValueDel ? pszValueDel : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszValueDel != nullptr && pszValueDel[0] != 0) {
- if (pszValueDel[0] == '{') {
- cout << "LogicDevice::DevDel - Decoding JSON value for deletion" << endl;
- reqParam.decode(pszValueDel);
- }
- else {
- cout << "LogicDevice::DevDel - Using raw value for deletion" << endl;
- reqParam = pszValueDel;
- }
- }
- else {
- cout << "LogicDevice::DevDel - No value provided for deletion" << endl;
- }
- cout << "LogicDevice::DevDel - Creating DEL request. Command: PACKET_CMD_DEL, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevDel - Request returned: " << ret << endl;
- return ret;
- }
- /// <summary>
- /// Execute device action
- /// </summary>
- RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevAction - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszActionName: " << (pszActionName ? pszActionName : "nullptr")
- << ", pszParams: " << (pszParams ? pszParams : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszParams != nullptr && pszParams[0] != 0) {
- if (pszParams[0] == '{') {
- cout << "LogicDevice::DevAction - Decoding JSON parameters for action" << endl;
- reqParam.decode(pszParams);
- }
- else {
- cout << "LogicDevice::DevAction - Using raw parameters for action" << endl;
- reqParam = pszParams;
- }
- }
- else {
- cout << "LogicDevice::DevAction - No parameters provided for action" << endl;
- }
- cout << "LogicDevice::DevAction - Creating EXE request. Command: PACKET_CMD_EXE, Action: " << (pszActionName ? pszActionName : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszActionName, PACKET_CMD_EXE, &reqParam);
- ResDataObject resResult;
- RET_STATUS ret = Request(&req, &resResult);
- cout << "LogicDevice::DevAction - Request returned: " << ret << endl;
- PacketAnalizer::GetPacketContext(&resResult, resRespons);
- cout << "LogicDevice::DevAction - Extracted packet context to response" << endl;
- return ret;
- }
- /// <summary>
- /// Send message to device
- /// </summary>
- RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons)
- {
- cout << "LogicDevice::DevMessage - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
- << ", pszTopic: " << (pszTopic ? pszTopic : "nullptr")
- << ", pszMessageValue: " << (pszMessageValue ? pszMessageValue : "nullptr") << endl;
- ResDataObject req, reqParam;
- if (pszMessageValue != nullptr && pszMessageValue[0] != 0) {
- if (pszMessageValue[0] == '{') {
- cout << "LogicDevice::DevMessage - Decoding JSON message value" << endl;
- reqParam.decode(pszMessageValue);
- }
- else {
- cout << "LogicDevice::DevMessage - Using raw message value" << endl;
- reqParam = pszMessageValue;
- }
- }
- else {
- cout << "LogicDevice::DevMessage - No message value provided" << endl;
- }
- cout << "LogicDevice::DevMessage - Creating MSG request. Command: PACKET_CMD_MSG, Topic: " << (pszTopic ? pszTopic : "nullptr") << endl;
- PacketAnalizer::MakeRequest(req, pszTopic, PACKET_CMD_MSG, &reqParam);
- RET_STATUS ret = Request(&req, &resRespons);
- cout << "LogicDevice::DevMessage - Request returned: " << ret << endl;
- return ret;
- }
- RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose)
- {
- if (openOrClose)
- {
- ResDataObject resContext;
- GetDeviceResource(&resp);
- LogicDevice::GetDeviceResource(&resp);
- PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
- if (PacketAnalizer::GetPacketContext(&req, resContext))
- {
- //如果有上线参数
- if (resContext.GetFirstOf("Online") >= 0)
- {
- ////mLog::FINFO("Got Online Request {$}", resContext.encode());
- int idx = resContext.GetFirstOf("Online");
- //如果有上线参数
- if (idx >= 0)
- {
- do
- {
- string wsName = resContext[idx].encode();
- ////mLog::FINFO("SubscribeGroupActions [{$}] ", wsName);
- SubscribeGroupActions(wsName, true);
- idx = resContext.GetNextOf("Online", idx);
- } while (idx >= 0);
- }
- resp.update("Online", m_rsOnlineGroup);
- }
- }
- }
- else
- {
- ResDataObject context;
- if (PacketAnalizer::GetPacketContext(&req, context))
- {
- int idx = context.GetFirstOf("Offline");
- //如果有上线参数
- if (idx >= 0)
- {
- do
- {
- string wsName = context[idx].encode();
- SubscribeGroupActions(wsName, false);
- idx = context.GetNextOf("Offline", idx);
- } while (idx >= 0);
- }
- resp.update("Online", m_rsOnlineGroup);
- }
- }
-
- return RET_SUCCEED;
- }
- bool LogicDevice::Exec(void)
- {
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += 3;
- // 等待退出信号(3秒超时)
- DWORD dwRet = 0;
- dwRet = m_ExitFlag->Wait(3);
- if (dwRet == WAIT_OBJECT_0)
- {
- return false;
- }
- // 等待请求信号(3秒超时)
- int ret = sem_timedwait(&m_SemphRequest, &ts);
- if (ret == 0) {
- ////mLog::FDEBUG("[{$}] Got Packet Event ", m_strClientID);
- ResDataObject req;
- bool got = true;
- do {
- got = m_pPacketReceivedQue->DeQueue(req);
- if (got) {
- m_dwLastPacket = GetTickCount();
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
- std::string keystr = PacketAnalizer::GetPacketKey(&req);
- ////mLog::FINFO(" {$} Got Request key {$} transaction {$}",
- // m_strClientID, keystr, m_strCurTransaction);
- if (cmd == PACKET_CMD_OPEN) {
- ResDataObject resRes, resResponse;
- InnerOpenClose(req, resRes, true);
- PacketAnalizer::MakeOpenResponse(req, resResponse, resRes);
- resResponse.update("Online", m_rsOnlineGroup);
- PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction);
- ResDataObject resTopic;
- PacketAnalizer::GetContextTopic(&req, resTopic);
- PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str());
- if (keystr.substr(0, 4) == "CCOS") {
- PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str());
- }
- else {
- PacketAnalizer::UpdateDeviceNotifyResponse(resResponse,
- getLocalMachineId(),
- getLocalEbusId(),
- (uint64_t)getpid(),
- (uint64_t)m_pMqttConntion);
- }
- PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
- // 发送连接状态通知
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE,
- "ConnectionStatus", "1");
- PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction);
- CmdFromLogicDev(&NotifyData);
- }
- else {
- PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
- ResDataObject resPacket;
- RET_STATUS retStatus = RET_FAILED;
- if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") {
- ResDataObject devRes;
- if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) {
- LogicDevice::GetDeviceResource(&devRes);
- PacketAnalizer::UpdatePacketContext(resPacket, devRes);
- }
- }
- else {
- retStatus = Request(&req, &resPacket);
- }
- PacketAnalizer::MakeRetCode(retStatus, &resPacket);
- PacketAnalizer::CloneTransaction(&req, &resPacket);
- ResDataObject resTopic;
- PacketAnalizer::GetContextTopic(&req, resTopic);
- PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str());
- PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
- }
- }
- } while (got);
- // 检查心跳(10分钟无数据发送心跳)
- if (GetTickCount() - m_dwLastPacket > 600000) {
- m_dwLastPacket = GetTickCount();
- ResDataObject heartBeat;
- PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
- "HeartBeat", m_strClientID.c_str());
- PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
- }
- return true;
- }
- // 检查心跳(即使没有请求)
- if (GetTickCount() - m_dwLastPacket > 600000) {
- m_dwLastPacket = GetTickCount();
- ResDataObject heartBeat;
- PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
- "HeartBeat", m_strClientID.c_str());
- PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
- }
- return true;
- }
- void SYSTEM_CALL LogicDevice::CompleteUnInit()
- {
- StopThread();
- }
- int LogicDevice::GetDevice_Thread_Priority()
- {
- return THREAD_PRIORITY_NONE;
- }
- RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
- {
- //pDeviceResource->update("ClientType", DPC_UnitClient);
- GUID guid;
- string name;
- GetDeviceType(guid);
- guid_2_string(guid, name);
- pDeviceResource->update("DeviceType", name.c_str());
- //Get Unit Type (Unit GUID)
- if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
- {
- pDeviceResource->add("LogicDevInstance", m_pDevInstance);
- }
- ////
- size_t idx = (*pDeviceResource)["Attribute"].size();
- if (idx > 0)
- {
- int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
- if (erroridx < 0)
- {
- (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
- }
- else
- {
- (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
- }
- }
- else
- {
- ResDataObject Attribute;
- Attribute.add("ErrorList", *m_pResErrorList);
- pDeviceResource->add("Attribute", Attribute);
- }
- // (*pDeviceResource)["Action"].size();
- if (pDeviceResource->GetFirstOf("Action") < 0)
- {
- pDeviceResource->add("Action", m_Actions);
- }
- return RET_SUCCEED;
- }
- //notify from lower layer
- RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
- {
- PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
- PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
- if (type == PACKET_TYPE_NOTIFY)
- {
- CcosDevFileHandle* pHandle = new CcosDevFileHandle;
- PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle);
- ////mLog::FDEBUG("Notify Transaction: {$}", m_strCurTransaction);
- PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction);
- ;
- //if (m_strEBusRoot.length() <= 0)
- //{
- // //////mLog::FWARN("EBusRoot is null");
- //}
- PacketAnalizer::UpdateDeviceNotifyResponse(
- *pCmd,
- getLocalMachineId(),
- getLocalEbusId(),
- static_cast<uint64_t>(getpid()), // Linux 上获取进程 ID
- reinterpret_cast<uint64_t>(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型
- );
- ////mLog::FDEBUG("[{$}] Notify: {$}", m_strClientID, pCmd->encode());
- //printf("--> %s \n", pCmd->encode());
- PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str());
- PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd);
- PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str());
- PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion);
- if (m_strAbstractPath.length() > 0)
- {
- PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion);
- string realPath,devPath;
- devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length());
- for (int x = 0; x < m_rsOnlineGroup.size(); x++)
- {
- if ((int)m_rsOnlineGroup[x] == 1)
- {
- realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath;
- PublishAction(pCmd, realPath.c_str(), m_pMqttConntion);
- }
- }
- } //m_pPacketSendingQue->InQueue(*pCmd);
- delete pHandle;
- }
- return RET_SUCCEED;
- /*
- if (pCmd && m_pSysLogic)
- {
- return m_pSysLogic->CmdFromLogicDev(pCmd);
- }
- //put log here
- return RET_FAILED;*/
- }
- std::wstring mb2wc_a(const char* mbstr)
- {
- std::wstring strVal;
- // 获取输入字符串的长度
- size_t mbstr_len = strlen(mbstr);
- // 计算转换后宽字符字符串的长度
- size_t size = mbstr_len + 1; // 需要多一个字符来存储结尾的 '\0'
- wchar_t* wcstr = new wchar_t[size];
- if (wcstr)
- {
- memset(wcstr, 0, size * sizeof(wchar_t));
- // mbsrtowcs 返回转换后的字符数
- size_t ret = mbsrtowcs(wcstr, &mbstr, size, nullptr);
- if (ret != (size_t)-1) // mbsrtowcs 返回 (size_t)-1 表示错误
- {
- strVal = wcstr;
- }
- delete[] wcstr;
- }
- return strVal;
- }
- RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
- {
- string ResBase64, DesBase64;
- wstring wResUTF = ResInfo;
- wstring wDesUTF = Description;
- CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
- CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
- return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
- }
- RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
- {
- string ResBase64,DesBase64;
- wstring wResUTF = mb2wc_a(ResInfo);
- wstring wDesUTF = mb2wc_a(Description);
- CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
- CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
- return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
- }
- RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
- {
- //int ret = 1;
- if (Code == 0 || (string)ResInfo == "")
- {
- ////mLog::FERROR("Code or ResInfo is empty");
- return RET_FAILED;
- }
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
- string strDevInstanceCode = DevInstance;
- strDevInstanceCode += Code;
- for (size_t i = 0; i < m_pResErrorList->size(); i++)
- {
- string strInstancekey = m_pResErrorList->GetKey(i);
- if (strInstancekey == strDevInstanceCode)
- {
- //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
- //{
- // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
- // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
- // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
- // {
- //ret = 0;
- ////mLog::FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
- return RET_SUCCEED;
- // }
- //}
- //ret = 2;
- //break;
- }
- }
- //if (ret==1)
- {
-
- ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
- //info.GetResDataObject(tempInfo);
- //ErrorInfo.update(Code, tempInfo);
- info.GetResDataObject(ErrorInfo);
- struct timeval tv;
- struct tm* tm_info;
- char TimeTag[30];
- // 获取当前时间
- gettimeofday(&tv, NULL);
- // 转换为本地时间
- tm_info = localtime(&tv.tv_sec);
- // 格式化时间为字符串
- snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
- tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
- tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
- ErrorInfo.add("CreationTime", TimeTag);
- ErrorInfo.add("AppId", pAppId);
- ErrorInfo.add("InstanceId", DevInstance);
-
- if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
- {
- m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
- }
- ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
- ////mLog::FWARN( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
- CmdFromLogicDev(&NotifyData);
- //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- //else if (ret == 2)
- //{
- // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
- // info.GetResDataObject(tempInfo);
- // ErrorInfo.update(Code, tempInfo);
- // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
- // {
- // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
- // }
- // ResNotify.update(DevInstance, ErrorInfo);
- // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
- // RES_////mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
- // CmdFromLogicDev(&NotifyData);
- //}
- //put log here
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
- {
- AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
- //put log here
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
- {
- int index = -1;
- bool m_bClearAll = false;
- //trim empty code string
- string CodeStr = Code;
- if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
- {
- CodeStr = "";
- Code = CodeStr.c_str();
- m_bClearAll = true;
- }
-
- //if ((string)Code == "0" || (string)Code == "")
- //{
- // m_bClearAll = true;
- //}
- string strDevInstanceCode = DevInstance;
- strDevInstanceCode += Code;
- if (m_bClearAll)
- {
- m_pResErrorList->clear();
- }
- else
- {
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
-
- for (size_t i = 0; i < m_pResErrorList->size(); i++)
- {
- string strInstancekey = m_pResErrorList->GetKey(i);
- if (strInstancekey == strDevInstanceCode)
- {
- //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
- //{
- // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
- // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
- // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
- // {
- // index = (INT)j;
- // break;
- // }
- //}
- index = (int)i;
- break;
- }
- }
- }
- MessageInfo info;
- info.CodeID = Code;
- info.Type = nMessageType;
- info.Level = Level;
- info.Resouceinfo = ResInfo;
- info.Description = Description;
- ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
- //info.GetResDataObject(tempInfo);
- //ErrorInfo.add(Code, tempInfo);
- info.GetResDataObject(ErrorInfo);
- ErrorInfo.add("InstanceId", DevInstance);
- bool result = false;
- if (index>=0)
- {
- //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
- result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
-
- }
- if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
- {
- ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
- ////mLog::FWARN( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
- CmdFromLogicDev(&NotifyData);
- //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
- {
- DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
- //put log here
- return RET_FAILED;
- }
- RET_STATUS LogicDevice::EvtProcedure()
- {
- return RET_FAILED;
- }
- bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
- {
- ////mLog::FERROR("NOT FINISHED YET");
- return false;
- }
- RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
- {
- std::string strResult = "";
- //组Context包
- //if (m_pLogger)
- {
- wstring wContect = mb2wc_a(pContext);
- CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
- }
- //Thread_Lock();
- //if (NULL != fmt)
- //{
- // va_list marker = NULL;
- // va_start(marker, fmt);
- // size_t nLength = _vscprintf(fmt, marker) + 1;
- // std::vector<char> vBuffer(nLength, '\0');
- // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
- // if (nWritten > 0)
- // {
- // strResult = &vBuffer[0];
- // }
- // va_end(marker);
- //}
- //Thread_UnLock();
- ResDataObject SysLogNode;
- string guidstr;
- GUID DeviceGuid;
- //组Log包
- if (GetDeviceType(DeviceGuid))
- {
- guid_2_string(DeviceGuid, guidstr);
- SysLogNode.add("Module", guidstr.c_str());
- SysLogNode.add("AppId", pAppId);
- SysLogNode.add("ThreadId", GetCurrentThreadId());
- if (pCode)
- {
- SysLogNode.add("BusinessKey", pCode);
- }
- else
- {
- SysLogNode.add("BusinessKey", "");
- }
- SysLogNode.add("IP", (const char*)getLocalIpAddress());
- struct timeval tv;
- struct tm* tm_info;
- char TimeTag[30];
- // 获取当前时间
- gettimeofday(&tv, NULL);
- // 转换为本地时间
- tm_info = localtime(&tv.tv_sec);
- // 格式化时间为字符串
- snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
- tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
- tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
- SysLogNode.add("CreationTime", TimeTag);
- string strLevel = SysLogLevel2str(Level);
- SysLogNode.add("Level", strLevel.c_str());
- SysLogNode.add("HostName", (const char*)getLocalMachineId());
- SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
- SysLogNode.add("FreeText", strResult.c_str());
- SysLogNode.add("Context", pCode);
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
- CmdFromLogicDev(&NotifyData);
- //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- }
- else
- {
- ////mLog::FWARN("no Guid??");
- return RET_FAILED;
- }
- //打印LOG
- switch (Level)
- {
- case Syslog_Debug:
- //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Information:
- ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Warning:
- //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
- ////mLog::Warn("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Error:
- //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FERROR("SysLog {$} ", SysLogNode.encode());
- break;
- case Syslog_Fatal:
- //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
- break;
- default:
- ////mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
- break;
- }
- return RET_SUCCEED;
- }
- RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
- {
- std::string strResult = "";
- //组Context包
- //if (m_pLogger)
- //{
- // m_pLogger->Thread_Lock();
- // if (NULL != fmt)
- // {
- // va_list marker = NULL;
- // va_start(marker, fmt);
- // size_t nLength = _vscprintf(fmt, marker) + 1;
- // std::vector<char> vBuffer(nLength, '\0');
- // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
- // if (nWritten > 0)
- // {
- // strResult = &vBuffer[0];
- // }
- // va_end(marker);
- // }
- // m_pLogger->Thread_UnLock();
- //}
- //else
- {
- Thread_Lock();
- if (fmt != NULL) {
- va_list marker;
- va_start(marker, fmt);
- // 获取格式化字符串所需的大小(不包括终止符)
- int nLength = vsnprintf(NULL, 0, fmt, marker);
- if (nLength >= 0) {
- std::vector<char> vBuffer(nLength + 1, '\0'); // +1 为终止符
- vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker);
- strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串
- }
- va_end(marker);
- }
- Thread_UnLock();
- }
- ResDataObject SysLogNode;
- string guidstr;
- GUID DeviceGuid;
- //组Log包
- if (GetDeviceType(DeviceGuid))
- {
- guid_2_string(DeviceGuid, guidstr);
- SysLogNode.add("Module", guidstr.c_str());
- SysLogNode.add("AppId", "");
- SysLogNode.add("ThreadId", GetCurrentThreadId());
- if (pCode)
- {
- SysLogNode.add("BusinessKey", pCode);
- }
- else
- {
- SysLogNode.add("BusinessKey", "");
- }
- SysLogNode.add("IP", (const char *)getLocalIpAddress());
- struct timeval tv;
- struct tm* tm_info;
- char TimeTag[30];
- // 获取当前时间
- gettimeofday(&tv, NULL);
- // 转换为本地时间
- tm_info = localtime(&tv.tv_sec);
- // 格式化时间为字符串
- snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
- tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
- tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
- SysLogNode.add("CreationTime", TimeTag);
- SysLogNode.add("Level", Level);
- SysLogNode.add("HostName", (const char *)getLocalMachineId());
- SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
- SysLogNode.add("FreeText", strResult.c_str());
- ResDataObject NotifyData;
- PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
- CmdFromLogicDev(&NotifyData);
- //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
- if(m_pParent != nullptr)
- NotifyParent(&NotifyData, "/Notify");
- }
- else
- {
- ////mLog::FERROR("no Guid??");
- return RET_FAILED;
- }
- //打印LOG
- switch (Level)
- {
- case Syslog_Debug:
- //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Information:
- //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Warning:
- //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
- ////mLog::Warn("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Error:
- //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FERROR("SysLog {$}", SysLogNode.encode());
- break;
- case Syslog_Fatal:
- //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
- ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
- break;
- default:
- ////mLog::FINFO( "SysLog {$}", SysLogNode.encode());
- break;
- }
- return RET_SUCCEED;
- }
- /////////////////////////////////////////////////////////////////////////////
- ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
- {
- return nullptr;
- }
- std::string CurrentDateTime()
- {
- auto now = std::chrono::system_clock::now();
- auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
- auto now_time_t = std::chrono::system_clock::to_time_t(now);
- std::tm now_tm{};
- localtime_r(&now_time_t, &now_tm); // 线程安全版本
- char buf[64];
- std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm);
- std::string now_time_str = buf;
- // 格式化微秒部分并追加
- snprintf(buf, sizeof(buf), ".%06lld", static_cast<long long>(now_us.count() % 1000000));
- now_time_str += buf;
- return now_time_str;
- }
- static string CurrentDateTime2()
- {
- std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
- char buf[100] = { 0 };
- std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
- return buf;
- }
- bool LogicDevice::GetActions(ResDataObject& resAction)
- {
- for (int x = 0; x < m_Actions.size(); x++)
- {
- resAction.update(m_Actions.GetKey(x), "");
- }
- return true;
- }
- void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
- {
- if (m_pParent != nullptr)
- {
- if (hConnection == nullptr)
- {
- hConnection = m_pParent->m_pMqttConntion;
- }
- PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
- }
- }
- const mqtt_qos_t MQTT_QOS = QOS2;
- /// <summary>
- /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector
- /// CCOS/Demo/Detector
- /// </summary>
- /// <param name="bOnline"></param>
- void LogicDevice::SubscribeGroupActions(string wsName, int bOnline)
- {
- if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0)
- {
- int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()];
- if (bOnline == lastOnline)
- {
- ////mLog::FWARN("Group Abstract is already ? Online {$}", bOnline);
- return;
- }
- }
- if ( m_strAbstractPath.length() > 0)
- {
- std::string pszAction, gpPath;
- int ret = 0;
- int num = m_Actions.size();
- ////mLog::FINFO("Group Device onLine or offLine {$}", bOnline);
- gpPath = m_strAbstractPath;
- gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName);
- pszAction = gpPath + m_strDevicePath;
- pszAction += "/Action/+";
- SubScribeTopic(pszAction.c_str(), bOnline == 1);
- if (bOnline)
- {
- ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
- }
- else
- {
- ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
- }
- //for (size_t i = 0; i < num; i++)
- //{
- // pszAction = gpPath + m_strDevicePath;
- // pszAction += "/Action/";
- // pszAction += (m_Actions.GetKey(i));
- // SubScribeTopic(pszAction.c_str(), bOnline == 1);
- // if (bOnline)
- // {
- // ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
- // }
- // else
- // {
- // ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
- // }
- //}
- m_rsOnlineGroup.update(wsName.c_str(), bOnline);
- }
- else
- {
- ////mLog::FWARN("try Online but AbstractPath is none {$}", m_strClientID);
- }
- }
- void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase)
- {
- string pszAction = devuribBase ;
- pszAction += "/Get/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Update/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Set/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Add/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Del/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Action/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- pszAction = devuribBase;
- pszAction += "/Message/+";
- m_subscribedTopics.push_back(pszAction);
- SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
- }
- void LogicDevice::SubscribeActions()
- {
- ////mLog::FINFO("Begain");
- if (nullptr == m_pMqttConntion)
- return;
- int num = m_Actions.size();
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
- {
- ResDataObject res,resAction;
- GetDeviceResource(&res);
- std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
- resAction = res["Action"];
- ////mLog::FINFO("Actions from deviceresource [{$}]", resAction.size());
- for (int x = 0; x < resAction.size(); x++)
- {
- string action = (string)resAction.GetKey(x);
- if (action.length() > 0)
- {
- ////mLog::FINFO("Action from DeviceResource [{$}]", action);
- m_Actions.update(action.c_str(), "");
- }
- }
- num = m_Actions.size();
- }
- std::string pszAction;
- int ret = 0;
- SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str());
- ////mLog::FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- if (m_strEBusRoot != m_strCCOSDevicePath)
- {
- pszAction = m_strCCOSDevicePath + m_strDevicePath;
- SubscribeModule(m_pMqttConntion, pszAction.c_str());
- ////mLog::FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- }
- if (m_strAbstractPath.length() > 0)
- {
- pszAction = m_strAbstractPath + m_strDevicePath;
- SubscribeModule(m_pMqttConntion, pszAction.c_str());
- ////mLog::FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- }
-
- //for (size_t i = 0; i < num; i++)
- //{
- // //订阅ebus的路径
- // pszAction = m_strEBusRoot;
- // pszAction += "/Action/";
- // pszAction += (m_Actions.GetKey(i));
- // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
- // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- //
- // //订阅CCOS设备路径
- // if (m_strEBusRoot != m_strCCOSDevicePath)
- // {
- // pszAction = m_strCCOSDevicePath + m_strDevicePath;
- // pszAction += "/Action/";
- // pszAction += (m_Actions.GetKey(i));
- // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
- // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- // }
- // //抽象设备路径要一直存在
- // //订阅抽象设备路径
- // if ( m_strAbstractPath.length() > 0)
- // {
- // pszAction = m_strAbstractPath + m_strDevicePath;
- // pszAction += "/Action/";
- // pszAction += (m_Actions.GetKey(i));
- // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
- // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- // }
- //}
- //pszAction = m_strEBusRoot;
- //pszAction += "/Action/UpdateDeviceResource";
- ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
- //pTopicList->push_back(pszAction);
- //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
- //////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
- //*/
-
- }
- /*
- void connlost(void* context, char* cause)
- {
- //连接中断
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
- printf("\nConnection lost\n");
- printf(" cause: %s\n", cause);
- std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
- }
- void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
- {
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
- }
- */
- //发送成功
- //void delivered(void* context, MQTTAsync_token dt)
- //{
- // printf("Message with token value %d delivery confirmed\n", dt);
- // //deliveredtoken = dt;
- //}
- //void connlost(void* context, char* cause)
- //{
- // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
- // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
- //
- // //连接中断
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
- // return;
- //
- // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- // int rc;
- //
- // printf("Reconnecting\n");
- // conn_opts.keepAliveInterval = 20;
- // conn_opts.cleansession = 1;
- // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
- // {
- // printf("Failed to start connect, return code %d\n", rc);
- // //finished = 1;
- // }
- //}
- //重连成功
- //void onReconnected(void* context, char* cause)
- //{
- // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
- // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
- //
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
- // const char* client_id = std::get<CLINET_ID_ID>(*connection);
- // int rc;
- //
- // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
- // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
- // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
- // // 重新订阅,暂不重新订阅,看看
- //
- // ///*
- // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
- // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
- // //opts.onSuccess = onSubscribe;
- // //opts.onFailure = onSubscribeFailure;
- // //opts.context = client;
- // auto it = pTopicList->begin();
- // while(it != pTopicList->end())
- // {
- // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
- // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
- // it++;
- // }
- // //*/
- //
- //}
- //
- ////MQTT 连接主动断开连接失败
- //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Disconnect failed, rc %d\n", response->code);
- // //disc_finished = 1;
- //}
- //
- ////MQTT 连接主动断开连接成功
- //void onDisconnect(void* context, MQTTAsync_successData* response)
- //{
- // //printf("Successful disconnection\n");
- // //disc_finished = 1;
- //}
- //
- //void onSubscribe(void* context, MQTTAsync_successData* response)
- //{
- // printf("Subscribe succeeded\n");
- // //subscribed = 1;
- //}
- //
- //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Subscribe failed, rc %d\n", response->code);
- // //finished = 1;
- //}
- //
- //
- //void onConnectFailure(void* context, MQTTAsync_failureData* response)
- //{
- // printf("Connect failed, rc %d\n", response->code);
- // //finished = 1;
- //}
- //void onConnect(void* context, MQTTAsync_successData* response)
- //{
- //
- // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
- // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- //
- // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
- //
- // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- // int rc;
- //
- // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
- // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
- // if (hConnected != NULL)
- // SetEvent(hConnected);
- //
- // /*
- // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
- // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
- // opts.onSuccess = onSubscribe;
- // opts.onFailure = onSubscribeFailure;
- // opts.context = client;
- // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
- // {
- // printf("Failed to start subscribe, return code %d\n", rc);
- // finished = 1;
- // }*/
- //}
- //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
- int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS);
- void resubscribe_topic(void* client, void* reconnect_date)
- {
- mqtt_client* pClient = (mqtt_client*)client;
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
- string client_id = std::get<CLINET_ID_ID>(*connection);
- ////mLog::FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size());
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
- pLock->Thread_Lock();
- for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void {
- ////mLog::FWARN("Resubscribe {$}", str);
- mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd);
- });
- pLock->Thread_UnLock();
- }
- //MQTT消息抵达
- //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
- void msgarrivd(void* client, message_data_t* message)
- {
- if (client == nullptr)
- {
- //printf("************************ somthing happend...");
- return;
- }
- //消息抵达
- ResDataObject req;
- string topic = strlen( message->topic_name) > sizeof(message->topic_name)-1 ? string(message->topic_name, sizeof(message->topic_name) - 1) : message->topic_name; //topicName; //msg->get_topic().c_str();
- mqtt_client* pClient = (mqtt_client*)client;
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
- if (!connection) return;
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
- if (pLock == nullptr)
- return;
- pLock->Thread_Lock();
- //再次取Lock,如果发现是空了,什么都不干
- CcosLock* pLockTemp = std::get<CONN_SEND_LOCK_ID>(*connection);
- if (pLockTemp == nullptr)
- {
- pLock->Thread_UnLock();
- return;
- }
- string client_id = std::get<CLINET_ID_ID>(*connection);
- void* pc = std::get<MQTT_CLT_ID>(*connection);
- if (pc == nullptr)
- {
- //printf("mqtt is closed now..\n");
- pLock->Thread_UnLock();
- return;
- }
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl;
- //std::cout << " msg Body " << payload << endl;
- if ((topic.length() >= MQTT_TOPIC_LEN_MAX && message->message->payloadlen > 16380) || message->message->payloadlen > 512*1024)
- {
- //std::string payloads((const char*)message->message->payload, 128);
- ////mLog::FWARN("TID {$} : {$} Get Bad Msg Len[{$}] from {$}", GetCurrentThreadId(), client_id, message->message->payloadlen, topic.substr(0, 255));
- pLock->Thread_UnLock();
- return;
- }
- if((int)message->message->payloadlen > 16380)
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get big Msg from " << topic << " msg Len " << message->message->payloadlen << /*" msg Body " << payload <<*/ endl;
- ////mLog::FWARN("TID {$} : {$} Get big Msg from {$} msg Len {$}", GetCurrentThreadId(), client_id, topic, message->message->payloadlen);
- ////mLog::FDEBUG("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload.substr(0, 1024));
- const char* pclient_id = client_id.c_str();
- try
- {
- std::string strPayload((const char*)message->message->payload, message->message->payloadlen);
- req.decode(strPayload.c_str());
- }
- catch (...)
- {
- pLock->Thread_UnLock();
- ////mLog::FWARN("Decode Exception.. {$}", payload);
- return;
- }
- //取消息钩子
- void* pHook = std::get<MSG_HOOK_ID>(*connection);
- if (pHook != nullptr) {
- ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
- ccos_mqtt_msg_filter_func func = std::get<FILTER_FUNC_ID>(*filter);
- //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl;
- ////mLog::FDEBUG("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic);
- //消息钩子函数存在
- ResDataObject *pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
- for(int x=0;x<pRequests->size();x++) {
- //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc;
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl;
- ////mLog::FDEBUG("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic);
- string keyAction = pRequests->GetKey(x);
- ResDataObject resObj = (*pRequests)[x];
- std::shared_ptr<LinuxEvent> hEvent = std::get<FILTER_HANDLE_ID>(*filter);
- if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES && keyAction == PacketAnalizer::GetPacketKey(&req)) {
- //勾住了,是该钩子的消息,则通知
- //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl;
- hEvent = LinuxEvent::OpenEvent(string(resObj).c_str());
- if (hEvent==nullptr)
- {
- hEvent = std::get<FILTER_HANDLE_ID>(*filter);
- }
-
- ////mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} Notify Event [{$}] Handle [{$}]", GetCurrentThreadId(), client_id, topic, string(resObj), hEvent);
- //PacketAnalizer::GetPacketContext(&req, *resObj);
- ResDataObject* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
- pResponse->add(keyAction.c_str(), req);
- if (!hEvent) {
- std::cerr << "Failed to open shared event" << std::endl;
- }
- else {
- hEvent->SetEvent();
- }
- //CloseHandle(hEvent);
- pRequests->eraseOneOf(keyAction.c_str(), 0);
- pLock->Thread_UnLock();
- //MQTTAsync_freeMessage(&message);
- //MQTTAsync_free(topicName);
- return ;
- }
- }
- }
- std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl;
- pLock->Thread_UnLock();
- ccos_mqtt_callback onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
- if (onmsg != nullptr)
- (onmsg)(&req, topic.c_str(), connection);
- else
- {
- ////mLog::FWARN("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get<CLINET_ID_ID>(*connection), topic);
- cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get<CLINET_ID_ID>(*connection) << " When processing " << topic << endl;
- }
- //MQTTAsync_freeMessage(&message);
- //MQTTAsync_free(topicName);
- return ;
- }
- void* MqttSendThreadFunc(void* pPara)
- {
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara;
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
- mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*connection);
- mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
- std::string client_id = std::get<CLINET_ID_ID>(*connection);
- sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
- // 线程启动日志
- std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
- while (std::get<THREAD_RUNNING_ID>(*connection)) {
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += 1;
- // 等待信号量前的日志
- //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
- if (sem_timedwait(hSemaphore, &ts) == 0) {
- std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
-
- pLock->Thread_Lock();
- std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
- if (pList->empty()) {
- std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
- pLock->Thread_UnLock();
- continue;
- }
- ResDataObject* pMsg = pList->front();
- pList->pop_front();
- std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
- for (int x = 0; x < pMsg->size(); x++) {
- const char* pTopic = pMsg->GetKey(x);
- std::string message = (std::string)(*pMsg)[pTopic];
- std::cout << "[" << client_id << "] Prepare to publish messages to the topic: " << pTopic
- << ", Message length: " << message.length() << "Byte" << std::endl;
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)message.c_str();
- msg.payloadlen = message.length();
- msg.qos = QOS1;
- // 发布消息
- int publishResult = mqtt_publish(pConn, pTopic, &msg);
- if (publishResult == 0) {
- std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl;
- }
- else {
- std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic
- << ", Error code: " << publishResult << std::endl;
- }
- }
- delete pMsg;
- std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
- pLock->Thread_UnLock();
- std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
- }
- else if (errno == ETIMEDOUT) {
- //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
- }
- else {
- // 其他错误情况
- std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
- }
- }
- return NULL;
- }
- // 安全遍历MQTT消息处理程序的宏
- #define mqtt_list_for_each_entry_safe(pos, n, head, member) \
- for (pos = container_of((head)->next, typeof(*pos), member), \
- n = container_of(pos->member.next, typeof(*pos), member); \
- &pos->member != (head); \
- pos = n, n = container_of(n->member.next, typeof(*n), member))
- // 通过成员指针获取包含结构的指针
- #define container_of(ptr, type, member) ({ \
- const typeof(((type*)0)->member)* __mptr = (ptr); \
- (type*)((char*)__mptr - offsetof(type, member)); })
- // 重连回调函数 - 实现自动重订阅
- static void reconnect_handler(void* client, void* reconnect_data) {
- mqtt_client_t* c = (mqtt_client_t*)client;
- message_handlers_t* pos, * n;
- ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context;
- // 遍历所有已订阅的主题并重新订阅
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
- pLock->Thread_Lock();
- mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) {
- mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler);
- }
- pLock->Thread_UnLock();
- }
- mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
- std::cout << "[MQTT] Acquiring MQTT client instance..." << std::endl;
- mqtt_client_t* pMqttClient = mqtt_lease();
- if (!pMqttClient) {
- std::cerr << "[ERROR] Failed to acquire MQTT client instance" << std::endl;
- return nullptr;
- }
- std::cout << "[SUCCESS] MQTT client acquired: " << pMqttClient << std::endl;
- // 确保 connection 指针有效(避免后续访问悬空指针)
- if (!connection) {
- std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl;
- mqtt_release(pMqttClient); // 释放客户端
- return nullptr;
- }
- std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- pMqttClient->mqtt_conn_context = connection;
- std::string pszClientID = std::get<CLINET_ID_ID>(*connection);
- std::cout << "[CONFIG] Setting connection parameters:"
- << "\n\tClient ID: " << (!pszClientID.empty() ? pszClientID : "null")
- << "\n\tHost: " << SERVER_ADDRESS
- << "\n\tPort: 1883"
- << std::endl;
- // 设置MQTT连接参数
- std::string host_str = SERVER_ADDRESS;
- char* host_buf = new char[host_str.size() + 1];
- strncpy(host_buf, host_str.c_str(), host_str.size() + 1);
- mqtt_set_host(pMqttClient, host_buf);
- char port_buf[6] = "1883";
- uint8_t clean_session = 1;
- uint16_t keep_alive = 50;
- mqtt_set_port(pMqttClient, port_buf);
- mqtt_set_user_name(pMqttClient, const_cast<char*>("zskkcc"));
- mqtt_set_password(pMqttClient, const_cast<char*>("zskk1234"));
- mqtt_set_client_id(pMqttClient, (char*)pszClientID.c_str());
- mqtt_set_clean_session(pMqttClient, 0);
- mqtt_set_keep_alive_interval(pMqttClient, keep_alive);
- mqtt_set_cmd_timeout(pMqttClient, 5000);
- mqtt_set_write_buf_size(pMqttClient, 4096+1);
- mqtt_set_read_buf_size(pMqttClient, 4096+1);
- // 设置重连回调(替代mqtt_set_resubscribe_handler)
- //mqtt_subscribe();
- mqtt_set_reconnect_handler(pMqttClient, reconnect_handler);
- int rc = 0;
- uint64_t start_time = GetTickCount();
- const uint64_t timeout_ms = 10000; // 10秒超时
- int attempt_count = 0;
- std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl;
- try {
- while (true) {
- attempt_count++;
- std::cout << "[ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
- rc = mqtt_connect(pMqttClient);
- if (rc != MQTT_SUCCESS_ERROR) {
- uint64_t elapsed = GetTickCount() - start_time;
- std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl;
- if (elapsed > timeout_ms) {
- // 超时处理
- std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl;
- delete[] host_buf;
- mqtt_release(pMqttClient);
- return nullptr;
- }
- // 计算剩余时间
- uint64_t remaining = timeout_ms - elapsed;
- uint64_t wait_time = (remaining > 2000) ? 2000 : remaining;
- std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl;
- usleep(wait_time * 1000);
- }
- else {
- uint64_t total_time = GetTickCount() - start_time;
- std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl;
- break;
- }
- }
- }
- catch (const std::exception& e) {
- std::cerr << "[ERROR] " << e.what() << std::endl;
- delete[] host_buf;
- mqtt_release(pMqttClient);
- return nullptr;
- }
- return pMqttClient;
- }
- ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
- {
- ////mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
- std::cout << "TID " << GetCurrentThreadId()
- << " : " << pszClientID
- << " NewConnection " << pszServer
- << ":" << pszServerPort
- << " user: " << pszUserName
- << " password " << pszPassword
- << std::endl;
- //连接MQTT broker
- DWORD dwTick = GetTickCount();
- ccos_mqtt_connection* connection = new ccos_mqtt_connection;
-
- // 初始化元组字段
- std::get<MQTT_TIPIC_LIST_ID>(*connection) = new mqtt_topic_list;
- std::get<CLINET_ID_ID>(*connection) = pszClientID;
- std::get<USER_MSG_CAKBCK_ID>(*connection) = onmsg;
- CcosLock* pLock = new CcosLock();
- std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
- pLock->Thread_Lock();
- ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
- std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
- std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
- std::get<MSG_HOOK_ID>(*connection) = pfilter;
- std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
- mqtt_client* pMqttClient = InnerConnect(connection);
- if (pMqttClient == nullptr)
- {
- pLock->Thread_UnLock();
- delete connection;
- delete pfilter;
- return nullptr;
- }
- std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- //pMqttClient->mqtt_conn_context = connection;
- //设置MQTT连接参数
-
- //mqtt_set_port(pMqttClient, (char*)pszServerPort);
- //mqtt_set_host(pMqttClient, (char*)pszServer);
- //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
- //mqtt_set_client_id(pMqttClient, (char*)pszClientID);
- //mqtt_set_user_name(pMqttClient, (char*)pszUserName);
- //mqtt_set_password(pMqttClient, (char*)pszPassword);
- //mqtt_set_clean_session(pMqttClient, 1);
- //mqtt_set_version(pMqttClient, 5);
- //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
- //mqtt_set_write_buf_size(pMqttClient, 8192);
- //mqtt_set_read_buf_size(pMqttClient, 8192);
- //mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- //mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
- //if (pszUserName != nullptr && strlen(pszUserName) > 0)
- // conn_opts.username = pszUserName;
- //if (pszPassword != nullptr && strlen(pszPassword) > 0)
- // conn_opts.password = pszPassword;
- //设置回调函数
- /*
- int rc = 0;
- dwTick = GetTickCount();
- while (true)
- {
- int rc = mqtt_connect(pMqttClient);
- if (rc != MQTT_SUCCESS_ERROR)
- {
- //默认10s内尝试
- if (GetTickCount() - dwTick > 10000)
- {
- ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- pLock->Thread_UnLock();
- delete connection;
- delete pfilter;
- return nullptr;
- }
- else
- {
- ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl;
- Sleep(2000);
- }
- }
- else
- {
- break;
- }
- }
- */
- std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
- // 创建POSIX信号量
- sem_t* semaphore = new sem_t;
- sem_init(semaphore, 0, 0);
- std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
- // 创建发送线程
- pthread_t threadId;
- if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
- std::cerr << "Failed to create MQTT send thread" << std::endl;
- // 错误处理...
- sem_destroy(semaphore);
- delete semaphore;
- pLock->Thread_UnLock();
- delete connection;
- return nullptr;
- }
- std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
- std::get<THREAD_RUNNING_ID>(*connection) = true;
- std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
- pLock->Thread_UnLock();
- uint64_t dwWaitTick = GetTickCount() - dwTick;
- std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
- << dwWaitTick << " ms" << std::endl;
- return connection;
- }
- //创建额外连接,需要提供回调函数
- LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async)
- {
- DWORD dwTick = GetTickCount();
- ////mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
- //mqtt_client* pMqttClient = nullptr;
- //pMqttClient = mqtt_lease();
- ccos_mqtt_connection* connection = new ccos_mqtt_connection;
- //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
- //std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- mqtt_topic_list* pTopicList = new std::list<string>;
- std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
- CcosLock* pLock = new CcosLock();
- std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
- pLock->Thread_Lock();
- ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
- std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- std::get<MSG_HOOK_ID>(*connection) = pfilter;
- std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
- std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
- std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
- std::get<CLINET_ID_ID>(*connection) = pszClientID;
- std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
- mqtt_client* pMqttClient = InnerConnect(connection);
- if (pMqttClient == nullptr)
- {
- pLock->Thread_UnLock();
- delete connection;
- delete pfilter;
- return nullptr;
- }
- std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
- //pMqttClient->mqtt_conn_context = connection;
- //设置回调函数
- //设置MQTT连接参数
- //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
- /*
- //连接Broker
- int rc = 0;
- dwTick = GetTickCount();
- while (true)
- {
- rc = mqtt_connect(pMqttClient);
- if (rc != MQTT_SUCCESS_ERROR)
- {
- if (GetTickCount() - dwTick > dwOpenTimeout)
- {
- ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- pLock->Thread_UnLock();
- delete connection;
- delete pfilter;
- return nullptr;
- }
- else
- {
- ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
- //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
- Sleep(20);
- }
- }
- else
- {
- break;
- }
- }*/
- DWORD dwWaitTick = GetTickCount() - dwTick;
- ////mLog::FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
- std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
- // 创建POSIX信号量
- sem_t* semaphore = new sem_t;
- sem_init(semaphore, 0, 0);
- std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
- // 创建发送线程
- pthread_t threadId;
- if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
- std::cerr << "Failed to create MQTT send thread" << std::endl;
- // 错误处理...
- sem_destroy(semaphore);
- delete semaphore;
- pLock->Thread_UnLock();
- delete connection;
- return nullptr;
- }
- std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
- std::get<THREAD_RUNNING_ID>(*connection) = true;
- std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
- pLock->Thread_UnLock();
- std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
- << dwWaitTick << " ms" << std::endl;
- return connection;
- }
- //重置连接
- LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection)
- {
- if (hConnection == nullptr)
- {
- return;
- }
- ////mLog::FWARN(" Reset Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
- mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- if (pconn != nullptr) {
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- //mqtt_do_reconnect(pconn);
- mqtt_disconnect(pconn);
- /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic);
- mqtt_connect(pconn);*/
- mqtt_release(pconn);
- pconn = InnerConnect(hConnection);
- int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
- if (pconn != nullptr)
- {
- //std::get<MQTT_CLT_ID>(*hConnection) = pconn;
- //resubscribe_topic(pconn, pconn->mqtt_conn_context);
- //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
- }
- pLock->Thread_UnLock();
- }
- }
- //关闭并释放连接
- LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
- {
- if (!hConnection) return;
- auto clientId = std::get<CLINET_ID_ID>(*hConnection);
- std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl;
- std::get<THREAD_RUNNING_ID>(*hConnection) = false;
- sem_t* semaphore = std::get<SEMAPHORE_HANDLE_ID>(*hConnection);
- if (semaphore) {
- sem_post(semaphore); // 发送信号量唤醒线程
- }
- pthread_t sendThread = std::get<CONNECTED_HANDLE_ID>(*hConnection);
- if (sendThread != 0) {
- // 等待线程退出,超时时间设为1秒
- struct timespec timeout;
- clock_gettime(CLOCK_REALTIME, &timeout);
- timeout.tv_sec += 1;
- int joinResult = pthread_timedjoin_np(sendThread, nullptr, &timeout);
- if (joinResult != 0) {
- std::cerr << CurrentDateTime() << " " << clientId
- << " Warning: Send thread did not exit in time, force cancel" << std::endl;
- pthread_cancel(sendThread); // 超时后强制终止(最后的备选方案)
- pthread_join(sendThread, nullptr);
- }
- std::get<CONNECTED_HANDLE_ID>(*hConnection) = 0; // 重置线程ID
- }
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- if (nullptr != pLock)
- pLock->Thread_Lock();
- else
- return;
- mqtt_client* pMqttClient = static_cast<mqtt_client*>(std::get<MQTT_CLT_ID>(*hConnection));
- if (pMqttClient) {
- mqtt_disconnect(pMqttClient); // 断开连接
- mqtt_release(pMqttClient); // 释放客户端实例
- pMqttClient = NULL;
- std::get<MQTT_CLT_ID>(*hConnection) = nullptr;
- }
- ccos_mqtt_msg_filter* pFilter = static_cast<ccos_mqtt_msg_filter*>(std::get<MSG_HOOK_ID>(*hConnection));
- if (pFilter) {
- std::cout << "Free Connection filter [" << (UINT64)pFilter << "] .." << std::endl;
- // 释放filter内部的资源对象
- delete std::get<FILTER_RES_OBJ_ID>(*pFilter);
- delete std::get<FILTER_RESPONS_OBJ_ID>(*pFilter);
- delete pFilter;
- std::get<MSG_HOOK_ID>(*hConnection) = nullptr;
- }
- mqtt_msg_list* pMsgList = std::get<MSG_LIST_ID>(*hConnection);
- if (pMsgList) {
- // 清理剩余未发送的消息
- while (!pMsgList->empty()) {
- ResDataObject* pMsg = pMsgList->front();
- pMsgList->pop_front();
- delete pMsg; // 释放消息对象
- }
- delete pMsgList;
- std::get<MSG_LIST_ID>(*hConnection) = nullptr;
- }
- if (semaphore) {
- sem_destroy(semaphore); // 销毁信号量
- delete semaphore;
- std::get<SEMAPHORE_HANDLE_ID>(*hConnection) = nullptr;
- }
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
- if (pTopicList) {
- pTopicList->clear(); // 清空列表
- delete pTopicList;
- std::get<MQTT_TIPIC_LIST_ID>(*hConnection) = nullptr;
- }
- if (pLock) {
- pLock->Thread_UnLock(); // 确保锁被释放
- delete pLock;
- std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
- }
-
- delete hConnection;
- std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl;
- }
- //主动订阅主题
- LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- pTopicList->push_back(std::string(pszTopic));
- //pMqttClient->subscribe(pszTopic, 1, opts);
- int rc = MQTT_SUCCESS_ERROR;
- DWORD dwTick = GetTickCount();
- int nTryTimes = 0;
- do
- {
- if (pTopicList->size() < 1)
- {
- const char* topicToSubscribe = pTopicList->back().c_str();
- int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
- //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
- ////mLog::FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
- std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe First " << pszTopic << endl;
- }
- else
- {
- const char* topicToSubscribe = pTopicList->back().c_str();
- int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
- //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
- ////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
- std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << endl;
- }
- if (rc != MQTT_SUCCESS_ERROR)
- {
- ////mLog::FWARN("try do mqtt reconnect ..");
- //mqtt_do_reconnect(pMqttClient);
- mqtt_disconnect(pMqttClient);
- //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
- mqtt_release(pMqttClient);
- pMqttClient = InnerConnect(hConnection);
- int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
- if (pMqttClient != nullptr)
- {
- //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
- //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
- //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
- }
-
- usleep(2000 * 1000);
- }
- } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
- pLock->Thread_UnLock();
- return 0;
- }
- //主题订阅取消
- LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
- {
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
- //auto ret = pMqttClient->unsubscribe(pszTopic);
- mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- //MQTTAsync_responseOptions resp;
- pTopicList->remove(pszTopic);
- int rc = MQTT_SUCCESS_ERROR;
- DWORD dwTick = GetTickCount();
- int nTryTimes = 0;
- do
- {
- int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
- ////mLog::FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
- } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
- pLock->Thread_UnLock();
- return 2;
- }
- //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
- LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout)
- {
- char pszClientID[256];
- snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
- (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
- getpid(), GetCurrentThreadId());
- ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
- });
- mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
- PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
- //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
- string pLoad = pCmd->encode();
-
- int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName);
-
- std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
- ////mLog::FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc);
- CloseConnection(connObj);
- return rc;
- }
- //往指定主题发送CCOS协议包整包,使用已创建的连接发送
- LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout)
- {
- if (hConnection == nullptr)
- {
- ////mLog::FDEBUG("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
- std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
- return 0;
- }
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to ["<< pszTopic << "] Action Body: " << endl; //<< pAction->encode()
- string topic = pszTopic;
- if (topic.length() <= 0)
- {
- ////mLog::FWARN("ignore empty topic packet {$}", pAction->encode());
- return 2;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- ////mLog::FDEBUG("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode());
- //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction);
- //if(org_publisher.length() <= 0)
- PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
-
- ResDataObject* pPacket = new ResDataObject();
- mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
- pPacket->add(pszTopic, pAction->encode());
- ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
- pList->push_back(pPacket);
- sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
- std::cout << "try publish " << pAction->encode() << endl;
- //pMqttClient->publish(pszTopic, pAction->encode());
- /*
- //pConn->publish(pszTopic, pCmd->encode());
- std::cout << "try publish " << pAction->encode() << endl;
- const char* pLoad = pAction->encode();
- int len = strlen(pLoad);
- //MQTTAsync_responseOptions resp;
- //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- msg.payloadlen = len;
- int rc = MQTT_SUCCESS_ERROR;
- DWORD dwTick = GetTickCount();
- int nTryTimes = 0;
- do
- {
- rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- nTryTimes++;
- if (rc != MQTT_SUCCESS_ERROR)
- {
- ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id);
- //mqtt_do_reconnect(pMqttClient);
- mqtt_disconnect(pMqttClient);
- rc = mqtt_connect(pMqttClient);
- if (rc == MQTT_SUCCESS_ERROR)
- {
- resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
- ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
- }
- Sleep(2);
- }
- } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout);
- //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
- ////mLog::FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
- //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
- //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
- if (rc < 0)
- {
- ////mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
- //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
- pLock->Thread_UnLock();
- return rc;
- }*/
- //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
- //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
- ////mLog::FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size());
- pLock->Thread_UnLock();
- return 2;
- }
- //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
- int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos)
- {
- const int dwTimeout = 500;
- if (pMqttClient == nullptr)
- {
- ////mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message);
- return 0;
- }
- ////mLog::FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message);
- const char* pLoad = message.c_str();
- int len = message.length();
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.payloadlen = len;
- msg.qos = qos;
- int rc = MQTT_SUCCESS_ERROR;
- DWORD dwTick = GetTickCount();
- int nTryTimes = 0;
- do
- {
- rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- nTryTimes++;
- if (rc != MQTT_SUCCESS_ERROR)
- {
- ////mLog::FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id);
- usleep(10000 * 1000);
- //mqtt_do_reconnect(pMqttClient);
- ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context;
- mqtt_disconnect(pMqttClient);
- /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
- mqtt_connect(pMqttClient);*/
- mqtt_release(pMqttClient);
- pMqttClient = InnerConnect(hConnection);
- //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
- if (pMqttClient != nullptr)
- {
- //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
- //resubscribe_topic(pMqttClient, hConnection);
- //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
- }
- }
- } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
- if(nTryTimes > 1)
- ////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
- ////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
- if (rc < 0)
- {
- ////mLog::FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
- return rc;
- }
- return 2;
- }
- /*
- /// <summary>
- /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
- /// 超时没收到应答返回失败,
- /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
- /// </summary>
- /// <param name="pAction">要发送的命令Action名</param>
- /// <param name="pContext">命令携带的参数</param>
- /// <param name="pszTopic">要发送的目标topic</param>
- /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
- /// <param name="resObj">应答返回的参数结果</param>
- /// <param name="dwWaitTime">等待超时时间</param>
- /// <param name="hConnection">复用的MQTT链接句柄</param>
- /// <param name="onmsg">复用的链接的消息处理函数</param>
- /// <returns>成功返回2,其他返回错误码</returns>
- LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
- ResDataObject& resObj, DWORD dwWaitTime)
- {
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- if (hConnection == nullptr)
- {
- return 0;
- }
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- //if (!pMqttClient->is_connected()) {
- // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
- // return 0;
- //}
- HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- //SubscribeTopic()
- auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
- string topic = topicName;//msg->get_topic().c_str();
- if (strcmp(pszRespTopic, topic.c_str()) == 0) {
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
- //应答到了,处理
- ResDataObject req;
- req.decode((const char*)message->payload);
- PacketAnalizer::GetPacketContext(&req, resObj);
- SetEvent(hEvent);
- //处理结束 将函数指针换回去
- void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
- //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
- MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
- }
- else
- {
- //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
- (msgarrvd)(context, topicName, topicLen, message);
- }
- };
- //接管回调
- // pMqttClient->set_message_callback(action_func);
- MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
- //pMqttClient->subscribe(pszRespTopic, 1);
- MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
- MQTTProperties prop = MQTTProperties_initializer;
- std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
- //pMqttClient->subscribe(pszTopic, 1, opts);
- MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
- //pMqttClient->publish(pszTopic, req.encode());
- MQTTClient_message pubmsg = MQTTClient_message_initializer;
- MQTTClient_message* m = NULL;
- MQTTClient_deliveryToken dt;
- MQTTProperty property;
- property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
- property.value.data.data = "test user property";
- property.value.data.len = (int)strlen(property.value.data.data);
- property.value.value.data = "test user property value";
- property.value.value.len = (int)strlen(property.value.value.data);
- MQTTProperties properties = MQTTProperties_initializer;
- MQTTProperties_add(&properties, &property);
- //pConn->publish(pszTopic, pCmd->encode());
- const char* pLoad = req.encode();
- MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
- DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
- if (ret == WAIT_OBJECT_0) {
- //等到应答了
- return 2;
- }
- return 0;
- }
- */
- /// <summary>
- /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
- /// </summary>
- /// <param name="hConnection"></param>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="hEvent"></param>
- /// <param name="dwWaitTime"></param>
- /// <returns></returns>
- LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
- const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
- {
- usleep(1000 * 1000);
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
- if (hConnection == nullptr)
- {
- return 0;
- }
- DWORD dwTick = GetTickCount();
- mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
- std::shared_ptr<LinuxEvent> hEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
- ResDataObject resContext;
- string strResObject;
- char* pszPad = 0;
- char* pszPad2 = 0;
- auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool {
- ////mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
- std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
- if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
- strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
- {
- if (GetTickCount() - dwTick < dwWaitTime)
- {
- ////mLog::FDEBUG("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
- resObj = *rsp;
- return true;
- }
- else
- {
- ////mLog::FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
- return false;
- }
- std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
- }
- ////mLog::FDEBUG("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
- std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
- return false;
- };
- CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
- pLock->Thread_Lock();
- ////mLog::FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
- ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
- ResDataObject *resActions = std::get<FILTER_RES_OBJ_ID>(*pfilter);
- resActions->add(pAction, pszTopic);
- std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
-
- std::get<FILTER_FUNC_ID>(*pfilter) = func;
- std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
- PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection).c_str());
- const char* pLoad = req.encode();
- ResDataObject* pPacket = new ResDataObject();
- mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
- pPacket->add(pszTopic, pLoad);
- ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
- pList->push_back(pPacket);
- sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
- /*
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- msg.payload = (void*)pLoad;
- msg.qos = MQTT_QOS;
- msg.payloadlen = strlen(pLoad);
- //int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- int rc = MQTT_SUCCESS_ERROR;
- dwTick = GetTickCount();
- int nTryTimes = 0;
- do
- {
- rc = mqtt_publish(pMqttClient, pszTopic, &msg);
- nTryTimes++;
- if (rc != MQTT_SUCCESS_ERROR)
- {
- ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id);
- //mqtt_do_reconnect(pMqttClient);
- mqtt_disconnect(pMqttClient);
- rc = mqtt_connect(pMqttClient);
- if (rc == MQTT_SUCCESS_ERROR)
- {
- resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
- ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
- }
- Sleep(2);
- }
- } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3);
- if (nTryTimes >= 1)
- ////mLog::FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, nTryTimes, rc);
- */
- pLock->Thread_UnLock();
- dwTick = GetTickCount() - dwTick;
- //////mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
- dwTick = GetTickCount();
- DWORD ret = hEvent->Wait(dwWaitTime);
- if (ret) {
- //等到应答了
- dwTick = GetTickCount() - dwTick;
- pLock->Thread_Lock();
- ResDataObject* pResp = std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
- for (int x = 0; x < pResp->size(); x++)
- {
- ResDataObject r = (*pResp)[x];
- if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r))
- {
- resObj = r;
- pResp->eraseOneOf(pAction, x);
- break;
- }
- }
- pLock->Thread_UnLock();
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
- ////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
- //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
- return 2;
- }
- //std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
- //resObj.decode(strResObject.c_str());
- ////mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
- std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
- //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
- return 0;
- }
- /// <summary>
- /// 新建MQTT连接发送Ation并等待应答
- /// </summary>
- /// <param name="pAction"></param>
- /// <param name="pContext"></param>
- /// <param name="pszTopic"></param>
- /// <param name="pszRespTopic"></param>
- /// <param name="resObj"></param>
- /// <param name="dwWaitTime"></param>
- /// <param name="pszSenderName"></param>
- /// <returns></returns>
- LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
- DWORD dwWaitTime, const char* pszSenderName)
- {
- ResDataObject req;
- if (pszRespTopic != nullptr)
- PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
- //临时创建连接并发送和接收应答
- char pszClientID[256];
- snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
- (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
- getpid(), GetCurrentThreadId());
- sem_t sem;
- if (sem_init(&sem, 0, 0) != 0) {
- return 0; // 如果初始化失败,返回0
- }
- std::cout << " ActionAndResp->NewConnection " << endl;
- ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) {
- //应答到了,处理
- PacketAnalizer::GetPacketContext(req, resObj);
- sem_post(&sem);
- });
- ////发布消息,并等待应答
- PublishAction(&req, pszTopic, connObj);
- // 等待应答或超时
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += dwWaitTime / 1000; // 转换为秒
- ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒
- // 等待信号量或者超时
- int ret = 0;
- while (ret == 0) {
- ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量
- if (ret == -1 && errno == ETIMEDOUT) {
- // 超时
- sem_destroy(&sem);
- CloseConnection(connObj);
- return 0; // 超时返回0
- }
- }
- // 等到应答了
- sem_destroy(&sem); // 销毁信号量
- CloseConnection(connObj);
- return 2; // 返回2表示应答成功
- }
|