LogicDevice.cpp 115 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705
  1. // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include <cstring>
  4. #include <iostream>
  5. #include <chrono>
  6. #include <codecvt>
  7. #include "LogicDevice.h"
  8. #include <stddef.h> // 用于 offsetof
  9. #include "PacketAnalizer.h"
  10. #include "MessageInfo.h"
  11. #include "common_api.h"
  12. #include "LocalConfig.h"
  13. #include "Base64.h"
  14. #include "SystemLogger.hpp"
  15. #include "AsyncMsgHandler.h"
  16. #include "mqttclient.h"
  17. #include "LinuxEvent.h"
  18. #include "LogLocalHelper.h"
  19. #include "Log4CPP.h"
  20. //Log4CPP::Logger* gLogger = nullptr;
  21. void msgarrivd(void* client, message_data_t* message);
  22. std::string CurrentDateTime();
  23. //-------------Logic Device SysIF--------------------------
  24. LogicDeviceSysIF::LogicDeviceSysIF(void)
  25. {
  26. }
  27. LogicDeviceSysIF::~LogicDeviceSysIF(void)
  28. {
  29. }
  30. //init
  31. void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
  32. {
  33. m_pLogicDev = p;
  34. //p->SetSysLogicDevice(this);
  35. };
  36. LogicDevice* LogicDeviceSysIF::GetLogicDevice()
  37. {
  38. return m_pLogicDev;
  39. }
  40. //Command In and Out
  41. //notify from lower layer
  42. RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  43. {
  44. return RET_FAILED;
  45. };
  46. //notify to lower layer
  47. RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
  48. {
  49. if (m_pLogicDev)
  50. {
  51. return m_pLogicDev->CmdToLogicDev(pCmd);
  52. }
  53. return RET_NOSUPPORT;
  54. };
  55. const std::string SERVER_ADDRESS("127.0.0.1");
  56. /*
  57. string DEVICE_ID; //AS CLIENT_ID
  58. //const std::string CLIENT_ID("paho_cpp_async_subcribe");
  59. //const std::string TOPIC("hello");
  60. mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
  61. const int QOS = 1;
  62. const int N_RETRY_ATTEMPTS = 5;*/
  63. using namespace std;
  64. LogicDevice* g_pDPCDeviceObject = NULL;
  65. //string DEVICE_ID;
  66. //-------------Data Logic Device--------------------------
  67. LogicDevice::LogicDevice(void)
  68. {
  69. m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  70. uuid_t uuid;
  71. char uuid_str[37];
  72. uuid_generate_random(uuid);
  73. uuid_unparse(uuid, uuid_str);
  74. m_pDevInstance = new char[40];
  75. strncpy(m_pDevInstance, uuid_str, 40);
  76. m_pResErrorList = new ResDataObject();
  77. sem_init(&m_SemphRequest, 0, 0);
  78. sem_init(&m_SemphPublish, 0, 0);
  79. m_pDrvDPC = NULL;
  80. g_pDPCDeviceObject = this;
  81. m_pMqttConntion = nullptr;
  82. //m_strServer = "tcp://localhost:1883";
  83. m_strServer = SERVER_ADDRESS;// "192.168.2.225";
  84. m_strServerPort = "1883";
  85. m_bMqttUseSSL = false;
  86. m_strMqttUser = "";
  87. m_strMqttPassword = "";
  88. m_strEBusRoot = "";
  89. m_strCCOSDevicePath = "";
  90. m_pParent = nullptr;
  91. m_topicFilter = nullptr;
  92. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  93. m_pPacketSendingQue = new MsgQueue<ResDataObject>();
  94. int x = 0;
  95. for ( x = 0; x < sizeof(szPad); x++)
  96. szPad[x] = 'A' + x % 26;
  97. szPad[x-1] = 0;
  98. memset(szPad2, 0, sizeof(szPad2));
  99. m_dwLastPacket = GetTickCount();
  100. string strLogPath = GetProcessDirectory() + R"(/Conf/log_config.xml)";
  101. string LogHost = (string)getRootpath();
  102. std::string moduleName = "LogicDevice";
  103. bool ret = initLogModule(
  104. LogHost, // 主机名(用于日志路径中的{host}占位符)
  105. moduleName, // 唯一模块名
  106. strLogPath, // 配置文件路径
  107. true // 是否输出到控制台(可选)
  108. );
  109. if (!ret) {
  110. std::cerr << "Log init failed!" << std::endl;
  111. }
  112. LogicDevice_SetLocalModuleName(moduleName);
  113. }
  114. LogicDevice::~LogicDevice(void)
  115. {
  116. delete []m_pDevInstance;
  117. delete m_pResErrorList;
  118. sem_destroy(&m_SemphRequest);
  119. sem_destroy(&m_SemphPublish);
  120. //m_EvtNotify = NULL;
  121. delete m_pPacketReceivedQue;
  122. delete m_pPacketSendingQue;
  123. }
  124. void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
  125. if (m_strClientID.length() > 0)
  126. {
  127. FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID);
  128. return;
  129. }
  130. FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot);
  131. if (pszEBusRoot[0] == '/') {
  132. m_strEBusRoot = pszEBusRoot + 1;
  133. }
  134. else {
  135. m_strEBusRoot = pszEBusRoot;
  136. }
  137. char szKeys[256];
  138. strcpy(szKeys, pszEBusRoot);
  139. char* pt = szKeys;
  140. while (*pt != 0)
  141. {
  142. if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
  143. *pt = '_';
  144. pt++;
  145. }
  146. m_strClientID = CCOS_CLIENT_ID_PREFIX;
  147. if (szKeys[0] == '_')
  148. m_strClientID += (szKeys + 1);
  149. else
  150. m_strClientID += szKeys;
  151. if (pszCCOSRoot != nullptr)
  152. {
  153. // std::cout << "LogicDevice::SetClientRootID [ Set CCOS path is" << pszCCOSRoot << "]" << std::endl;
  154. FINFO("Set CCOS path: {$}", pszCCOSRoot);
  155. m_strCCOSDevicePath = pszCCOSRoot;
  156. int nPos = m_strCCOSDevicePath.find('/');
  157. if (nPos != string::npos)
  158. {
  159. //CCOS/
  160. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  161. //CCOS/DEVICE/
  162. if (nPos != string::npos)
  163. {
  164. m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos);
  165. //CCOS/DEVICE/Generator
  166. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  167. if (nPos != string::npos)
  168. {
  169. m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos);
  170. }
  171. }
  172. }
  173. FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath);
  174. }
  175. FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath);
  176. SetName(m_strClientID.c_str());
  177. OnSetClientID();
  178. }
  179. void LogicDevice::OnSetClientID() {
  180. }
  181. void LogicDevice::SubscribeSelf() {
  182. if (m_strDevicePath.length() > 0 && m_strDevicePath[0] != '/') {
  183. m_strDevicePath = "/" + m_strDevicePath;
  184. }
  185. FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
  186. // std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn " << (UINT64)m_pMqttConntion << endl;
  187. if (m_strEBusRoot.length() > 0) {
  188. SubscribeTopic(m_pMqttConntion, (m_strEBusRoot).c_str());
  189. }
  190. if (m_strCCOSDevicePath.length() > 0)
  191. {
  192. FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath);
  193. SubscribeTopic(m_pMqttConntion, (m_strCCOSDevicePath + m_strDevicePath).c_str());
  194. //订阅
  195. }
  196. FINFO("AbstractPath [{$}] ", m_strAbstractPath);
  197. if (m_strAbstractPath.length() > 0)
  198. {
  199. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath).c_str());
  200. if (m_strDevicePath.length() > 0)
  201. SubscribeTopic(m_pMqttConntion, (m_strAbstractPath + m_strDevicePath).c_str());
  202. }
  203. FINFO("CCOSRoot [{$}] ", m_strCCOSRoot);
  204. if (m_strCCOSRoot.length() > 0)
  205. {
  206. SubscribeTopic(m_pMqttConntion, (m_strCCOSRoot).c_str());
  207. //订阅
  208. }
  209. // 等待初始订阅完成(EBusRoot、CCOSDevice、Abstract等)
  210. FINFO("{$} Waiting for initial subscriptions to complete...", m_strClientID);
  211. const DWORD initialSubTimeoutMs = 5000;
  212. if (!WaitForSubscriptionComplete(m_pMqttConntion, initialSubTimeoutMs)) {
  213. FERROR("{$} Some initial subscriptions did not complete within timeout", m_strClientID);
  214. }
  215. // SUBACK收到后,等待Broker完成路由表更新
  216. // 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
  217. WaitForBrokerRouting(m_strClientID.c_str());
  218. // 订阅Actions(内部也会等待SUBACK + Broker延时)
  219. SubscribeActions();
  220. }
  221. void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  222. {
  223. cout << "SubScribeTopic IN" << endl;
  224. if (bSubscribe)
  225. SubscribeTopic(m_pMqttConntion, pszTopic);
  226. else
  227. UnSubscribe(m_pMqttConntion, pszTopic);
  228. }
  229. void LogicDevice::NotifyDrvThread()
  230. {
  231. m_EvtNotify->SetEvent();
  232. }
  233. std::shared_ptr<LinuxEvent> LogicDevice::GetEvtHandle()
  234. {
  235. return m_EvtNotify;
  236. }
  237. //1. init part
  238. //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
  239. //{
  240. // m_pSysLogic = pLogic;
  241. //}
  242. //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
  243. //{
  244. // m_pLogger = pLogger;
  245. //}
  246. //
  247. //Logger *LogicDevice::GetLogHandle()
  248. //{
  249. // return m_pLogger;
  250. //}
  251. void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
  252. {
  253. m_pDrvDPC = pDPC;
  254. }
  255. DriverDPC *LogicDevice::GetDrvDPC()
  256. {
  257. return m_pDrvDPC;
  258. }
  259. RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
  260. {
  261. if (szEbusDevPath == nullptr)
  262. return RET_FAILED;
  263. for (auto dev : m_subDevices) {
  264. if (dev->GetRootPath() == szEbusDevPath)
  265. return RET_SUCCEED;
  266. }
  267. m_subDevices.push_back(pChild);
  268. return RET_SUCCEED;
  269. }
  270. RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
  271. {
  272. if (szCcosDevPath == nullptr)
  273. return RET_FAILED;
  274. for (auto dev : m_subCcosDevices) {
  275. if (dev->GetCcosRootPath() == szCcosDevPath)
  276. return RET_SUCCEED;
  277. }
  278. m_subCcosDevices.push_back(pChild);
  279. return RET_SUCCEED;
  280. }
  281. LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
  282. {
  283. for (auto dev : m_subDevices) {
  284. if (dev->GetRootPath() == szEbusDevPath)
  285. return dev;
  286. }
  287. return nullptr;
  288. }
  289. LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
  290. {
  291. for (auto dev : m_subCcosDevices) {
  292. if (dev->GetCcosRootPath() == szCcosDevPath)
  293. return dev;
  294. }
  295. return nullptr;
  296. }
  297. void SYSTEM_CALL LogicDevice::CompleteInit()
  298. {
  299. FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
  300. FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
  301. if (m_strClientID.length() <= 0)
  302. {
  303. FWARN("No Client name ......" );
  304. // std::cout << "No Client name ......." << endl;
  305. return;
  306. }
  307. // std::cout << "CompleteInit->NewConnection ......." << endl;
  308. m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
  309. [this](ResDataObject* req, const char* topic, void* conn) {
  310. //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
  311. //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
  312. // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
  313. // 消息处理如果耗时较长,则需要开线程来处理
  314. if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
  315. strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0)
  316. {
  317. //主题以ebus或者ccos开头,给我发的消息,进行处理
  318. ProcessSubscribeRequest(req);
  319. }
  320. else
  321. {
  322. //我主动订阅的外部模块的消息
  323. ProcessSubscribeMsg(req);
  324. }
  325. //CmdToLogicDev(req);
  326. });
  327. if (m_pMqttConntion != nullptr)
  328. {
  329. // 使用异步启动避免阻塞:Sync=false表示不等待线程启动完成
  330. // 如果Sync=true,会等待300秒直到线程设置m_RunFlag,可能导致启动卡顿
  331. StartThread(false); //启动Request线程(异步)
  332. SubscribeSelf();
  333. //DWORD dwThreadID = 0;
  334. //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID);
  335. //FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID);
  336. }
  337. }
  338. RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
  339. PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
  340. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
  341. switch (type) {
  342. case PACKET_TYPE_REQ:
  343. PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction);
  344. ProcessRequest(req, cmd);//请求
  345. break;
  346. case PACKET_TYPE_RES:
  347. ProcessResponse(req, cmd);//应答
  348. break;
  349. case PACKET_TYPE_NOTIFY:
  350. ProcessNotify(req, cmd);
  351. //通知
  352. break;
  353. }
  354. return RET_FAILED;
  355. }
  356. RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
  357. {
  358. ProcessSubscribeRequest(pCmd);
  359. return RET_SUCCEED;
  360. }
  361. RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
  362. {
  363. //Open命令
  364. if (cmd == PACKET_CMD_OPEN) {
  365. /* {
  366. "IDX": "40",
  367. "TYPE" : "0",
  368. "CMD" : "0",
  369. "HANDLE" :
  370. {
  371. "ROUTE": "1",
  372. "FLAGS" : "63",
  373. "LANG" : "en-US",
  374. "HANDLEID" : "0",
  375. "OWNERID" :
  376. {
  377. "EBUSID": "ImageSave",
  378. "MACHINEID" : "DESKTOP-FVD53H8",
  379. "PROCID" : "35408",
  380. "ADDR" : "2631366957696"
  381. },
  382. "DEVID":
  383. {
  384. "EBUSID": "ccosChannel",
  385. "MACHINEID" : "",
  386. "PROCID" : "0",
  387. "ADDR" : "0"
  388. }
  389. },
  390. "KEY": "\/ccosChannel"
  391. ResDataObject resRes, resResponse;
  392. ResDataObject resTopic;
  393. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  394. if (cmd == PACKET_CMD_OPEN )
  395. {
  396. //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
  397. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  398. return RET_SUCCEED;
  399. }
  400. if (cmd == PACKET_CMD_CLOSE)
  401. {
  402. //CLOSE 客户端主动断开
  403. }
  404. } */
  405. PacketArrived(pCmd);
  406. return RET_SUCCEED;
  407. }
  408. else if (cmd == PACKET_CMD_CLOSE)
  409. {
  410. ResDataObject resp;
  411. InnerOpenClose(*pCmd, resp, false);
  412. }
  413. else {
  414. PacketArrived(pCmd);
  415. }
  416. return RET_SUCCEED;
  417. }
  418. RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
  419. {
  420. return RET_SUCCEED;
  421. }
  422. RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
  423. {
  424. PacketArrived(pCmd);
  425. return RET_SUCCEED;
  426. }
  427. void LogicDevice::PacketArrived(ResDataObject* pRequest)
  428. {
  429. //m_pPacketReceivedQue->Lock();
  430. FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  431. //if (Thread_Lock(5000) != WAIT_OBJECT_0)
  432. //{
  433. // FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  434. // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  435. // return ;
  436. //}
  437. m_pPacketReceivedQue->InQueue(*pRequest);
  438. FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  439. //SetEvent(m_EvtNotify);//notify to user
  440. long nLast = 0;
  441. int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程
  442. if (released <= 0)
  443. {
  444. FERROR("PacketArrived: sem_post failed, error code: {$}, description: {$}",
  445. errno, strerror(errno));
  446. }
  447. }
  448. bool LogicDevice::OnStartThread()
  449. {
  450. return true;
  451. }
  452. bool LogicDevice::OnEndThread()
  453. {
  454. return true;
  455. }
  456. //单一发送线程,暂时未用
  457. DWORD LogicDevice::Thread_Publish_Thread(void* pPara)
  458. {
  459. INT ret = 0;
  460. int waitevent = 0;
  461. LogicDevice* handle = (LogicDevice*)pPara;
  462. //prev work usleep(30000);
  463. FINFO("Thread Start [{$}]", handle->m_strClientID);
  464. while (!handle->m_ExitFlag->Wait(1))
  465. {
  466. //do work
  467. ResDataObject resSend;
  468. DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100);
  469. if (wait != WAIT_OBJECT_0)
  470. continue;
  471. bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend);
  472. if (bHasPacket)
  473. {
  474. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend);
  475. PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend);
  476. FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd);
  477. if (PACKET_CMD_NONE == cmd)
  478. {
  479. // 移除 encode() 调用避免内存问题
  480. FDEBUG(" [{$}] Publish what packet (packet details not logged)", handle->m_strClientID);
  481. }
  482. if (type == PACKET_TYPE_NOTIFY)
  483. {
  484. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  485. PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle);
  486. FINFO("Notify Transaction: {$}", handle->m_strCurTransaction);
  487. PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction);
  488. PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion);
  489. // 移除 encode() 调用避免内存问题
  490. FDEBUG("Notify key: {$}", PacketAnalizer::GetPacketKey(&resSend));
  491. //printf("--> %s \n", pCmd->encode());
  492. PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion);
  493. PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion);
  494. delete pHandle;
  495. }
  496. else
  497. {
  498. ResDataObject resTopic;
  499. PacketAnalizer::GetPacketTopic(&resSend, resTopic);
  500. //PacketAnalizer::GetPacketTopicGetPacketTopic
  501. //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic);
  502. FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic);
  503. PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion);
  504. }
  505. }
  506. }
  507. return 0;
  508. }
  509. RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  510. {
  511. cout << "LogicDevice::DevOpen - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  512. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  513. ResDataObject req, reqParam;
  514. reqParam = pszGroup;
  515. cout << "LogicDevice::DevOpen - Creating OPEN request. Command: PACKET_CMD_OPEN" << endl;
  516. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam);
  517. RET_STATUS ret = InnerOpenClose(req, resRespons, true);
  518. cout << "LogicDevice::DevOpen - InnerOpenClose returned: " << ret << endl;
  519. return ret;
  520. }
  521. RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  522. {
  523. cout << "LogicDevice::DevClose - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  524. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  525. ResDataObject req, reqParam;
  526. reqParam = pszGroup;
  527. cout << "LogicDevice::DevClose - Creating CLOSE request. Command: PACKET_CMD_CLOSE" << endl;
  528. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam);
  529. RET_STATUS ret = InnerOpenClose(req, resRespons, false);
  530. cout << "LogicDevice::DevClose - InnerOpenClose returned: " << ret << endl;
  531. return ret;
  532. }
  533. /// <summary>
  534. /// Get device property
  535. /// </summary>
  536. RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons)
  537. {
  538. cout << "LogicDevice::DevGet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  539. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr") << endl;
  540. ResDataObject req, reqParam;
  541. cout << "LogicDevice::DevGet - Creating GET request. Command: PACKET_CMD_GET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  542. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam);
  543. RET_STATUS ret = Request(&req, &resRespons);
  544. cout << "LogicDevice::DevGet - Request returned: " << ret << endl;
  545. return ret;
  546. }
  547. /// <summary>
  548. /// Set device property
  549. /// </summary>
  550. RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons)
  551. {
  552. cout << "LogicDevice::DevSet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  553. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  554. << ", pszValueSet: " << (pszValueSet ? pszValueSet : "nullptr") << endl;
  555. ResDataObject req, reqParam;
  556. if (pszValueSet != nullptr && pszValueSet[0] != 0) {
  557. if (pszValueSet[0] == '{') {
  558. cout << "LogicDevice::DevSet - Decoding JSON value for property" << endl;
  559. reqParam.decode(pszValueSet);
  560. }
  561. else {
  562. cout << "LogicDevice::DevSet - Using raw value for property" << endl;
  563. reqParam = pszValueSet;
  564. }
  565. }
  566. else {
  567. cout << "LogicDevice::DevSet - No value provided for property" << endl;
  568. }
  569. cout << "LogicDevice::DevSet - Creating SET request. Command: PACKET_CMD_SET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  570. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam);
  571. RET_STATUS ret = Request(&req, &resRespons);
  572. cout << "LogicDevice::DevSet - Request returned: " << ret << endl;
  573. return ret;
  574. }
  575. /// <summary>
  576. /// Update device property
  577. /// </summary>
  578. RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons)
  579. {
  580. cout << "LogicDevice::DevUpdate - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  581. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  582. << ", pszValueUpdate: " << (pszValueUpdate ? pszValueUpdate : "nullptr") << endl;
  583. ResDataObject req, reqParam;
  584. if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) {
  585. if (pszValueUpdate[0] == '{') {
  586. cout << "LogicDevice::DevUpdate - Decoding JSON value for update" << endl;
  587. reqParam.decode(pszValueUpdate);
  588. }
  589. else {
  590. cout << "LogicDevice::DevUpdate - Using raw value for update" << endl;
  591. reqParam = pszValueUpdate;
  592. }
  593. }
  594. else {
  595. cout << "LogicDevice::DevUpdate - No update value provided" << endl;
  596. }
  597. cout << "LogicDevice::DevUpdate - Creating UPDATE request. Command: PACKET_CMD_UPDATE, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  598. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam);
  599. RET_STATUS ret = Request(&req, &resRespons);
  600. cout << "LogicDevice::DevUpdate - Request returned: " << ret << endl;
  601. return ret;
  602. }
  603. /// <summary>
  604. /// Add device property
  605. /// </summary>
  606. RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons)
  607. {
  608. cout << "LogicDevice::DevAdd - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  609. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  610. << ", pszValueAdd: " << (pszValueAdd ? pszValueAdd : "nullptr") << endl;
  611. ResDataObject req, reqParam;
  612. if (pszValueAdd != nullptr && pszValueAdd[0] != 0) {
  613. if (pszValueAdd[0] == '{') {
  614. cout << "LogicDevice::DevAdd - Decoding JSON value for addition" << endl;
  615. reqParam.decode(pszValueAdd);
  616. }
  617. else {
  618. cout << "LogicDevice::DevAdd - Using raw value for addition" << endl;
  619. reqParam = pszValueAdd;
  620. }
  621. }
  622. else {
  623. cout << "LogicDevice::DevAdd - No value provided for addition" << endl;
  624. }
  625. cout << "LogicDevice::DevAdd - Creating ADD request. Command: PACKET_CMD_ADD, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  626. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam);
  627. RET_STATUS ret = Request(&req, &resRespons);
  628. cout << "LogicDevice::DevAdd - Request returned: " << ret << endl;
  629. return ret;
  630. }
  631. /// <summary>
  632. /// Delete device property
  633. /// </summary>
  634. RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons)
  635. {
  636. cout << "LogicDevice::DevDel - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  637. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  638. << ", pszValueDel: " << (pszValueDel ? pszValueDel : "nullptr") << endl;
  639. ResDataObject req, reqParam;
  640. if (pszValueDel != nullptr && pszValueDel[0] != 0) {
  641. if (pszValueDel[0] == '{') {
  642. cout << "LogicDevice::DevDel - Decoding JSON value for deletion" << endl;
  643. reqParam.decode(pszValueDel);
  644. }
  645. else {
  646. cout << "LogicDevice::DevDel - Using raw value for deletion" << endl;
  647. reqParam = pszValueDel;
  648. }
  649. }
  650. else {
  651. cout << "LogicDevice::DevDel - No value provided for deletion" << endl;
  652. }
  653. cout << "LogicDevice::DevDel - Creating DEL request. Command: PACKET_CMD_DEL, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  654. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam);
  655. RET_STATUS ret = Request(&req, &resRespons);
  656. cout << "LogicDevice::DevDel - Request returned: " << ret << endl;
  657. return ret;
  658. }
  659. /// <summary>
  660. /// Execute device action
  661. /// </summary>
  662. RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons)
  663. {
  664. cout << "LogicDevice::DevAction - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  665. << ", pszActionName: " << (pszActionName ? pszActionName : "nullptr")
  666. << ", pszParams: " << (pszParams ? pszParams : "nullptr") << endl;
  667. ResDataObject req, reqParam;
  668. if (pszParams != nullptr && pszParams[0] != 0) {
  669. if (pszParams[0] == '{') {
  670. cout << "LogicDevice::DevAction - Decoding JSON parameters for action" << endl;
  671. reqParam.decode(pszParams);
  672. }
  673. else {
  674. cout << "LogicDevice::DevAction - Using raw parameters for action" << endl;
  675. reqParam = pszParams;
  676. }
  677. }
  678. else {
  679. cout << "LogicDevice::DevAction - No parameters provided for action" << endl;
  680. }
  681. cout << "LogicDevice::DevAction - Creating EXE request. Command: PACKET_CMD_EXE, Action: " << (pszActionName ? pszActionName : "nullptr") << endl;
  682. PacketAnalizer::MakeRequest(req, pszActionName, PACKET_CMD_EXE, &reqParam);
  683. ResDataObject resResult;
  684. RET_STATUS ret = Request(&req, &resResult);
  685. cout << "LogicDevice::DevAction - Request returned: " << ret << endl;
  686. PacketAnalizer::GetPacketContext(&resResult, resRespons);
  687. cout << "LogicDevice::DevAction - Extracted packet context to response" << endl;
  688. return ret;
  689. }
  690. /// <summary>
  691. /// Send message to device
  692. /// </summary>
  693. RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons)
  694. {
  695. cout << "LogicDevice::DevMessage - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  696. << ", pszTopic: " << (pszTopic ? pszTopic : "nullptr")
  697. << ", pszMessageValue: " << (pszMessageValue ? pszMessageValue : "nullptr") << endl;
  698. ResDataObject req, reqParam;
  699. if (pszMessageValue != nullptr && pszMessageValue[0] != 0) {
  700. if (pszMessageValue[0] == '{') {
  701. cout << "LogicDevice::DevMessage - Decoding JSON message value" << endl;
  702. reqParam.decode(pszMessageValue);
  703. }
  704. else {
  705. cout << "LogicDevice::DevMessage - Using raw message value" << endl;
  706. reqParam = pszMessageValue;
  707. }
  708. }
  709. else {
  710. cout << "LogicDevice::DevMessage - No message value provided" << endl;
  711. }
  712. cout << "LogicDevice::DevMessage - Creating MSG request. Command: PACKET_CMD_MSG, Topic: " << (pszTopic ? pszTopic : "nullptr") << endl;
  713. PacketAnalizer::MakeRequest(req, pszTopic, PACKET_CMD_MSG, &reqParam);
  714. RET_STATUS ret = Request(&req, &resRespons);
  715. cout << "LogicDevice::DevMessage - Request returned: " << ret << endl;
  716. return ret;
  717. }
  718. RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose)
  719. {
  720. if (openOrClose)
  721. {
  722. ResDataObject resContext;
  723. GetDeviceResource(&resp);
  724. LogicDevice::GetDeviceResource(&resp);
  725. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  726. if (PacketAnalizer::GetPacketContext(&req, resContext))
  727. {
  728. //如果有上线参数
  729. if (resContext.GetFirstOf("Online") >= 0)
  730. {
  731. FINFO("Got Online Request {$}", resContext.encode());
  732. int idx = resContext.GetFirstOf("Online");
  733. //如果有上线参数
  734. if (idx >= 0)
  735. {
  736. do
  737. {
  738. string wsName = resContext[idx].encode();
  739. FINFO("SubscribeGroupActions [{$}] ", wsName);
  740. SubscribeGroupActions(wsName, true);
  741. idx = resContext.GetNextOf("Online", idx);
  742. } while (idx >= 0);
  743. }
  744. resp.update("Online", m_rsOnlineGroup);
  745. }
  746. }
  747. }
  748. else
  749. {
  750. ResDataObject context;
  751. if (PacketAnalizer::GetPacketContext(&req, context))
  752. {
  753. int idx = context.GetFirstOf("Offline");
  754. //如果有上线参数
  755. if (idx >= 0)
  756. {
  757. do
  758. {
  759. string wsName = context[idx].encode();
  760. SubscribeGroupActions(wsName, false);
  761. idx = context.GetNextOf("Offline", idx);
  762. } while (idx >= 0);
  763. }
  764. resp.update("Online", m_rsOnlineGroup);
  765. }
  766. }
  767. return RET_SUCCEED;
  768. }
  769. bool LogicDevice::Exec(void)
  770. {
  771. struct timespec ts;
  772. clock_gettime(CLOCK_REALTIME, &ts);
  773. ts.tv_sec += 3;
  774. // 等待退出信号(3秒超时)
  775. DWORD dwRet = 0;
  776. dwRet = m_ExitFlag->Wait(3);
  777. if (dwRet == WAIT_OBJECT_0)
  778. {
  779. return false;
  780. }
  781. // 等待请求信号(3秒超时)
  782. int ret = sem_timedwait(&m_SemphRequest, &ts);
  783. if (ret == 0) {
  784. FDEBUG("[{$}] Got Packet Event ", m_strClientID);
  785. ResDataObject req;
  786. bool got = true;
  787. do {
  788. got = m_pPacketReceivedQue->DeQueue(req);
  789. if (got) {
  790. m_dwLastPacket = GetTickCount();
  791. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
  792. std::string keystr = PacketAnalizer::GetPacketKey(&req);
  793. FINFO(" {$} Got Request key {$} transaction {$}",
  794. m_strClientID, keystr, m_strCurTransaction);
  795. if (cmd == PACKET_CMD_OPEN) {
  796. ResDataObject resRes, resResponse;
  797. InnerOpenClose(req, resRes, true);
  798. PacketAnalizer::MakeOpenResponse(req, resResponse, resRes);
  799. resResponse.update("Online", m_rsOnlineGroup);
  800. PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction);
  801. ResDataObject resTopic;
  802. PacketAnalizer::GetContextTopic(&req, resTopic);
  803. PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str());
  804. if (keystr.substr(0, 4) == "CCOS") {
  805. PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str());
  806. }
  807. else {
  808. PacketAnalizer::UpdateDeviceNotifyResponse(resResponse,
  809. getLocalMachineId(),
  810. getLocalEbusId(),
  811. (uint64_t)getpid(),
  812. (uint64_t)m_pMqttConntion);
  813. }
  814. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  815. // 发送连接状态通知
  816. ResDataObject NotifyData;
  817. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE,
  818. "ConnectionStatus", "1");
  819. PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction);
  820. CmdFromLogicDev(&NotifyData);
  821. }
  822. else {
  823. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  824. ResDataObject resPacket;
  825. RET_STATUS retStatus = RET_FAILED;
  826. if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") {
  827. ResDataObject devRes;
  828. if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) {
  829. LogicDevice::GetDeviceResource(&devRes);
  830. PacketAnalizer::UpdatePacketContext(resPacket, devRes);
  831. }
  832. }
  833. else {
  834. retStatus = Request(&req, &resPacket);
  835. }
  836. PacketAnalizer::MakeRetCode(retStatus, &resPacket);
  837. PacketAnalizer::CloneTransaction(&req, &resPacket);
  838. ResDataObject resTopic;
  839. PacketAnalizer::GetContextTopic(&req, resTopic);
  840. PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str());
  841. PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
  842. }
  843. }
  844. } while (got);
  845. // 检查心跳(10分钟无数据发送心跳)
  846. if (GetTickCount() - m_dwLastPacket > 600000) {
  847. m_dwLastPacket = GetTickCount();
  848. ResDataObject heartBeat;
  849. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  850. "HeartBeat", m_strClientID.c_str());
  851. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  852. }
  853. return true;
  854. }
  855. // 检查心跳(即使没有请求)
  856. if (GetTickCount() - m_dwLastPacket > 600000) {
  857. m_dwLastPacket = GetTickCount();
  858. ResDataObject heartBeat;
  859. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  860. "HeartBeat", m_strClientID.c_str());
  861. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  862. }
  863. return true;
  864. }
  865. void SYSTEM_CALL LogicDevice::CompleteUnInit()
  866. {
  867. StopThread();
  868. }
  869. int LogicDevice::GetDevice_Thread_Priority()
  870. {
  871. return THREAD_PRIORITY_NONE;
  872. }
  873. RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  874. {
  875. //pDeviceResource->update("ClientType", DPC_UnitClient);
  876. GUID guid;
  877. string name;
  878. GetDeviceType(guid);
  879. guid_2_string(guid, name);
  880. pDeviceResource->update("DeviceType", name.c_str());
  881. //Get Unit Type (Unit GUID)
  882. if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
  883. {
  884. pDeviceResource->add("LogicDevInstance", m_pDevInstance);
  885. }
  886. ////
  887. size_t idx = (*pDeviceResource)["Attribute"].size();
  888. if (idx > 0)
  889. {
  890. int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
  891. if (erroridx < 0)
  892. {
  893. (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
  894. }
  895. else
  896. {
  897. (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
  898. }
  899. }
  900. else
  901. {
  902. ResDataObject Attribute;
  903. Attribute.add("ErrorList", *m_pResErrorList);
  904. pDeviceResource->add("Attribute", Attribute);
  905. }
  906. // (*pDeviceResource)["Action"].size();
  907. if (pDeviceResource->GetFirstOf("Action") < 0)
  908. {
  909. pDeviceResource->add("Action", m_Actions);
  910. }
  911. return RET_SUCCEED;
  912. }
  913. //notify from lower layer
  914. RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
  915. {
  916. // 移除 encode() 调用以避免内存问题
  917. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
  918. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  919. FDEBUG("[{$}] CmdFromLogicDev: type={$} cmd={$}", m_strClientID, (int)type, (int)cmd);
  920. if (type == PACKET_TYPE_NOTIFY)
  921. {
  922. FDEBUG("[{$}] Processing NOTIFY", m_strClientID);
  923. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  924. PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle);
  925. FDEBUG("Notify Transaction: {$}", m_strCurTransaction);
  926. PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction);
  927. // 已删除导致内存错误的调试代码
  928. ;
  929. //if (m_strEBusRoot.length() <= 0)
  930. //{
  931. // //FWARN("EBusRoot is null");
  932. //}
  933. PacketAnalizer::UpdateDeviceNotifyResponse(
  934. *pCmd,
  935. getLocalMachineId(),
  936. getLocalEbusId(),
  937. static_cast<uint64_t>(getpid()), // Linux 上获取进程 ID
  938. reinterpret_cast<uint64_t>(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型
  939. );
  940. // 已移除 pCmd->encode() 调用,避免潜在的内存问题
  941. FDEBUG("[{$}] Notify: key={$}", m_strClientID, PacketAnalizer::GetPacketKey(pCmd));
  942. //printf("--> %s \n", pCmd->encode());
  943. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str());
  944. FDEBUG("[{$}] Notify updated topic", m_strClientID);
  945. PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  946. string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd);
  947. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str());
  948. PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion);
  949. if (m_strAbstractPath.length() > 0)
  950. {
  951. PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion);
  952. string realPath,devPath;
  953. devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length());
  954. for (int x = 0; x < m_rsOnlineGroup.size(); x++)
  955. {
  956. if ((int)m_rsOnlineGroup[x] == 1)
  957. {
  958. realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath;
  959. PublishAction(pCmd, realPath.c_str(), m_pMqttConntion);
  960. }
  961. }
  962. } //m_pPacketSendingQue->InQueue(*pCmd);
  963. delete pHandle;
  964. }
  965. return RET_SUCCEED;
  966. /*
  967. if (pCmd && m_pSysLogic)
  968. {
  969. return m_pSysLogic->CmdFromLogicDev(pCmd);
  970. }
  971. //put log here
  972. return RET_FAILED;*/
  973. }
  974. std::wstring mb2wc_a(const char* mbstr) {
  975. if (!mbstr) return L"";
  976. try {
  977. // 使用C++11的codecvt进行UTF-8到wstring的转换
  978. std::wstring_convert<std::codecvt_utf8<wchar_t>> converter;
  979. return converter.from_bytes(mbstr);
  980. }
  981. catch (...) {
  982. // 如果转换失败,返回空宽字符串
  983. return L"";
  984. }
  985. }
  986. RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
  987. {
  988. string ResBase64, DesBase64;
  989. wstring wResUTF = ResInfo;
  990. wstring wDesUTF = Description;
  991. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  992. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  993. return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
  994. }
  995. RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  996. {
  997. string ResBase64,DesBase64;
  998. cout << "ResInfo:" << ResInfo << endl;
  999. wstring wResUTF = mb2wc_a(ResInfo);
  1000. wstring wDesUTF = mb2wc_a(Description);
  1001. const unsigned char* resData = reinterpret_cast<const unsigned char*>(wResUTF.c_str());
  1002. unsigned long resDataLen = wResUTF.size() * sizeof(wchar_t);
  1003. const unsigned char* desData = reinterpret_cast<const unsigned char*>(wDesUTF.c_str());
  1004. unsigned long desDataLen = wDesUTF.size() * sizeof(wchar_t);
  1005. // 进行Base64编码
  1006. CBase64::Encode(resData, resDataLen, ResBase64);
  1007. CBase64::Encode(desData, desDataLen, DesBase64);
  1008. cout << "ResBase64:" << ResBase64 << endl;
  1009. return AddErrorMessageBase(DevInstance, Code, Level, ResInfo, Description, nMessageType, pAppId);
  1010. }
  1011. RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  1012. {
  1013. //int ret = 1;
  1014. if (Code == 0 || (string)ResInfo == "")
  1015. {
  1016. FERROR("Code or ResInfo is empty");
  1017. return RET_FAILED;
  1018. }
  1019. MessageInfo info;
  1020. info.CodeID = Code;
  1021. info.Type = nMessageType;
  1022. info.Level = Level;
  1023. info.Resouceinfo = ResInfo;
  1024. info.Description = Description;
  1025. string strDevInstanceCode = DevInstance;
  1026. strDevInstanceCode += Code;
  1027. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1028. {
  1029. string strInstancekey = m_pResErrorList->GetKey(i);
  1030. if (strInstancekey == strDevInstanceCode)
  1031. {
  1032. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1033. //{
  1034. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1035. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1036. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1037. // {
  1038. //ret = 0;
  1039. FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
  1040. return RET_SUCCEED;
  1041. // }
  1042. //}
  1043. //ret = 2;
  1044. //break;
  1045. }
  1046. }
  1047. //if (ret==1)
  1048. {
  1049. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1050. //info.GetResDataObject(tempInfo);
  1051. //ErrorInfo.update(Code, tempInfo);
  1052. info.GetResDataObject(ErrorInfo);
  1053. struct timeval tv;
  1054. struct tm* tm_info;
  1055. char TimeTag[128];
  1056. // 获取当前时间
  1057. gettimeofday(&tv, NULL);
  1058. // 转换为本地时间
  1059. tm_info = localtime(&tv.tv_sec);
  1060. // 格式化时间为字符串
  1061. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1062. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1063. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1064. ErrorInfo.add("CreationTime", TimeTag);
  1065. ErrorInfo.add("AppId", pAppId);
  1066. ErrorInfo.add("InstanceId", DevInstance);
  1067. if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1068. {
  1069. m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
  1070. }
  1071. ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
  1072. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1073. // 移除 encode() 调用避免内存问题
  1074. FWARN( "preposterror ErrorType:{$}", nMessageType);
  1075. CmdFromLogicDev(&NotifyData);
  1076. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1077. }
  1078. //else if (ret == 2)
  1079. //{
  1080. // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
  1081. // info.GetResDataObject(tempInfo);
  1082. // ErrorInfo.update(Code, tempInfo);
  1083. // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1084. // {
  1085. // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
  1086. // }
  1087. // ResNotify.update(DevInstance, ErrorInfo);
  1088. // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1089. // RES_FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
  1090. // CmdFromLogicDev(&NotifyData);
  1091. //}
  1092. //put log here
  1093. return RET_SUCCEED;
  1094. }
  1095. RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
  1096. {
  1097. AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
  1098. //put log here
  1099. return RET_FAILED;
  1100. }
  1101. RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
  1102. {
  1103. int index = -1;
  1104. bool m_bClearAll = false;
  1105. //trim empty code string
  1106. string CodeStr = Code;
  1107. if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
  1108. {
  1109. CodeStr = "";
  1110. Code = CodeStr.c_str();
  1111. m_bClearAll = true;
  1112. }
  1113. //if ((string)Code == "0" || (string)Code == "")
  1114. //{
  1115. // m_bClearAll = true;
  1116. //}
  1117. string strDevInstanceCode = DevInstance;
  1118. strDevInstanceCode += Code;
  1119. if (m_bClearAll)
  1120. {
  1121. m_pResErrorList->clear();
  1122. }
  1123. else
  1124. {
  1125. MessageInfo info;
  1126. info.CodeID = Code;
  1127. info.Type = nMessageType;
  1128. info.Level = Level;
  1129. info.Resouceinfo = ResInfo;
  1130. info.Description = Description;
  1131. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1132. {
  1133. string strInstancekey = m_pResErrorList->GetKey(i);
  1134. if (strInstancekey == strDevInstanceCode)
  1135. {
  1136. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1137. //{
  1138. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1139. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1140. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1141. // {
  1142. // index = (INT)j;
  1143. // break;
  1144. // }
  1145. //}
  1146. index = (int)i;
  1147. break;
  1148. }
  1149. }
  1150. }
  1151. MessageInfo info;
  1152. info.CodeID = Code;
  1153. info.Type = nMessageType;
  1154. info.Level = Level;
  1155. info.Resouceinfo = ResInfo;
  1156. info.Description = Description;
  1157. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1158. //info.GetResDataObject(tempInfo);
  1159. //ErrorInfo.add(Code, tempInfo);
  1160. info.GetResDataObject(ErrorInfo);
  1161. ErrorInfo.add("InstanceId", DevInstance);
  1162. bool result = false;
  1163. if (index>=0)
  1164. {
  1165. //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
  1166. result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
  1167. }
  1168. if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
  1169. {
  1170. ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
  1171. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
  1172. // 移除 encode() 调用避免内存问题
  1173. FWARN( "pre Del error ErrorCode:{$}", Code);
  1174. CmdFromLogicDev(&NotifyData);
  1175. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1176. }
  1177. return RET_SUCCEED;
  1178. }
  1179. RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
  1180. {
  1181. DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
  1182. //put log here
  1183. return RET_FAILED;
  1184. }
  1185. RET_STATUS LogicDevice::EvtProcedure()
  1186. {
  1187. return RET_FAILED;
  1188. }
  1189. bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
  1190. {
  1191. FERROR("NOT FINISHED YET");
  1192. return false;
  1193. }
  1194. RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
  1195. {
  1196. std::string strResult = "";
  1197. //组Context包
  1198. //if (m_pLogger)
  1199. {
  1200. wstring wContect = mb2wc_a(pContext);
  1201. CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
  1202. }
  1203. //Thread_Lock();
  1204. //if (NULL != fmt)
  1205. //{
  1206. // va_list marker = NULL;
  1207. // va_start(marker, fmt);
  1208. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1209. // std::vector<char> vBuffer(nLength, '\0');
  1210. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1211. // if (nWritten > 0)
  1212. // {
  1213. // strResult = &vBuffer[0];
  1214. // }
  1215. // va_end(marker);
  1216. //}
  1217. //Thread_UnLock();
  1218. ResDataObject SysLogNode;
  1219. string guidstr;
  1220. GUID DeviceGuid;
  1221. //组Log包
  1222. if (GetDeviceType(DeviceGuid))
  1223. {
  1224. guid_2_string(DeviceGuid, guidstr);
  1225. SysLogNode.add("Module", guidstr.c_str());
  1226. SysLogNode.add("AppId", pAppId);
  1227. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1228. if (pCode)
  1229. {
  1230. SysLogNode.add("BusinessKey", pCode);
  1231. }
  1232. else
  1233. {
  1234. SysLogNode.add("BusinessKey", "");
  1235. }
  1236. SysLogNode.add("IP", (const char*)getLocalIpAddress());
  1237. struct timeval tv;
  1238. struct tm* tm_info;
  1239. char TimeTag[128];
  1240. // 获取当前时间
  1241. gettimeofday(&tv, NULL);
  1242. // 转换为本地时间
  1243. tm_info = localtime(&tv.tv_sec);
  1244. // 格式化时间为字符串
  1245. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1246. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1247. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1248. SysLogNode.add("CreationTime", TimeTag);
  1249. string strLevel = SysLogLevel2str(Level);
  1250. SysLogNode.add("Level", strLevel.c_str());
  1251. SysLogNode.add("HostName", (const char*)getLocalMachineId());
  1252. SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
  1253. SysLogNode.add("FreeText", strResult.c_str());
  1254. SysLogNode.add("Context", pCode);
  1255. ResDataObject NotifyData;
  1256. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1257. CmdFromLogicDev(&NotifyData);
  1258. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1259. }
  1260. else
  1261. {
  1262. FWARN("no Guid??");
  1263. return RET_FAILED;
  1264. }
  1265. //打印LOG
  1266. switch (Level)
  1267. {
  1268. case Syslog_Debug:
  1269. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1270. // 移除 encode() 调用避免内存问题
  1271. FDEBUG("SysLog");
  1272. break;
  1273. case Syslog_Information:
  1274. ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1275. // 移除 encode() 调用避免内存问题
  1276. FINFO("SysLog");
  1277. break;
  1278. case Syslog_Warning:
  1279. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1280. // 移除 encode() 调用避免内存问题
  1281. FWARN("SysLog");
  1282. break;
  1283. case Syslog_Error:
  1284. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1285. // 移除 encode() 调用避免内存问题
  1286. FERROR("SysLog");
  1287. break;
  1288. case Syslog_Fatal:
  1289. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1290. // 移除 encode() 调用避免内存问题
  1291. FINFO("SysLog");
  1292. break;
  1293. default:
  1294. // 移除 encode() 调用避免内存问题
  1295. FINFO("SysLog");
  1296. break;
  1297. }
  1298. return RET_SUCCEED;
  1299. }
  1300. RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
  1301. {
  1302. std::string strResult = "";
  1303. //组Context包
  1304. //if (m_pLogger)
  1305. //{
  1306. // m_pLogger->Thread_Lock();
  1307. // if (NULL != fmt)
  1308. // {
  1309. // va_list marker = NULL;
  1310. // va_start(marker, fmt);
  1311. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1312. // std::vector<char> vBuffer(nLength, '\0');
  1313. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1314. // if (nWritten > 0)
  1315. // {
  1316. // strResult = &vBuffer[0];
  1317. // }
  1318. // va_end(marker);
  1319. // }
  1320. // m_pLogger->Thread_UnLock();
  1321. //}
  1322. //else
  1323. {
  1324. Thread_Lock();
  1325. if (fmt != NULL) {
  1326. va_list marker;
  1327. va_start(marker, fmt);
  1328. // 获取格式化字符串所需的大小(不包括终止符)
  1329. int nLength = vsnprintf(NULL, 0, fmt, marker);
  1330. if (nLength >= 0) {
  1331. std::vector<char> vBuffer(nLength + 1, '\0'); // +1 为终止符
  1332. vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker);
  1333. strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串
  1334. }
  1335. va_end(marker);
  1336. }
  1337. Thread_UnLock();
  1338. }
  1339. ResDataObject SysLogNode;
  1340. string guidstr;
  1341. GUID DeviceGuid;
  1342. //组Log包
  1343. if (GetDeviceType(DeviceGuid))
  1344. {
  1345. guid_2_string(DeviceGuid, guidstr);
  1346. SysLogNode.add("Module", guidstr.c_str());
  1347. SysLogNode.add("AppId", "");
  1348. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1349. if (pCode)
  1350. {
  1351. SysLogNode.add("BusinessKey", pCode);
  1352. }
  1353. else
  1354. {
  1355. SysLogNode.add("BusinessKey", "");
  1356. }
  1357. SysLogNode.add("IP", (const char *)getLocalIpAddress());
  1358. struct timeval tv;
  1359. struct tm* tm_info;
  1360. char TimeTag[128];
  1361. // 获取当前时间
  1362. gettimeofday(&tv, NULL);
  1363. // 转换为本地时间
  1364. tm_info = localtime(&tv.tv_sec);
  1365. // 格式化时间为字符串
  1366. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1367. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1368. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1369. SysLogNode.add("CreationTime", TimeTag);
  1370. SysLogNode.add("Level", Level);
  1371. SysLogNode.add("HostName", (const char *)getLocalMachineId());
  1372. SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
  1373. SysLogNode.add("FreeText", strResult.c_str());
  1374. ResDataObject NotifyData;
  1375. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1376. CmdFromLogicDev(&NotifyData);
  1377. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1378. if(m_pParent != nullptr)
  1379. NotifyParent(&NotifyData, "/Notify");
  1380. }
  1381. else
  1382. {
  1383. FERROR("no Guid??");
  1384. return RET_FAILED;
  1385. }
  1386. //打印LOG
  1387. switch (Level)
  1388. {
  1389. case Syslog_Debug:
  1390. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1391. // 移除 encode() 调用避免内存问题
  1392. FDEBUG("SysLog");
  1393. break;
  1394. case Syslog_Information:
  1395. //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1396. // 移除 encode() 调用避免内存问题
  1397. FINFO("SysLog");
  1398. break;
  1399. case Syslog_Warning:
  1400. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1401. // 移除 encode() 调用避免内存问题
  1402. FWARN("SysLog");
  1403. break;
  1404. case Syslog_Error:
  1405. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1406. // 移除 encode() 调用避免内存问题
  1407. FERROR("SysLog");
  1408. break;
  1409. case Syslog_Fatal:
  1410. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1411. // 移除 encode() 调用避免内存问题
  1412. FINFO("SysLog");
  1413. break;
  1414. default:
  1415. // 移除 encode() 调用避免内存问题
  1416. FINFO("SysLog");
  1417. break;
  1418. }
  1419. return RET_SUCCEED;
  1420. }
  1421. /////////////////////////////////////////////////////////////////////////////
  1422. ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  1423. {
  1424. return nullptr;
  1425. }
  1426. std::string CurrentDateTime()
  1427. {
  1428. auto now = std::chrono::system_clock::now();
  1429. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  1430. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  1431. std::tm now_tm{};
  1432. localtime_r(&now_time_t, &now_tm); // 线程安全版本
  1433. char buf[64];
  1434. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm);
  1435. std::string now_time_str = buf;
  1436. // 格式化微秒部分并追加
  1437. snprintf(buf, sizeof(buf), ".%06lld", static_cast<long long>(now_us.count() % 1000000));
  1438. now_time_str += buf;
  1439. return now_time_str;
  1440. }
  1441. static string CurrentDateTime2()
  1442. {
  1443. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  1444. char buf[100] = { 0 };
  1445. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  1446. return buf;
  1447. }
  1448. bool LogicDevice::GetActions(ResDataObject& resAction)
  1449. {
  1450. for (int x = 0; x < m_Actions.size(); x++)
  1451. {
  1452. resAction.update(m_Actions.GetKey(x), "");
  1453. }
  1454. return true;
  1455. }
  1456. void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
  1457. {
  1458. if (m_pParent != nullptr)
  1459. {
  1460. if (hConnection == nullptr)
  1461. {
  1462. hConnection = m_pParent->m_pMqttConntion;
  1463. }
  1464. PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
  1465. }
  1466. }
  1467. const mqtt_qos_t MQTT_QOS = QOS2;
  1468. /// <summary>
  1469. /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector
  1470. /// CCOS/Demo/Detector
  1471. /// </summary>
  1472. /// <param name="bOnline"></param>
  1473. void LogicDevice::SubscribeGroupActions(string wsName, int bOnline)
  1474. {
  1475. if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0)
  1476. {
  1477. int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()];
  1478. if (bOnline == lastOnline)
  1479. {
  1480. FWARN("Group Abstract is already ? Online {$}", bOnline);
  1481. return;
  1482. }
  1483. }
  1484. if ( m_strAbstractPath.length() > 0)
  1485. {
  1486. std::string pszAction, gpPath;
  1487. int ret = 0;
  1488. int num = m_Actions.size();
  1489. FINFO("Group Device onLine or offLine {$}", bOnline);
  1490. gpPath = m_strAbstractPath;
  1491. gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName);
  1492. pszAction = gpPath + m_strDevicePath;
  1493. pszAction += "/Action/+";
  1494. SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1495. if (bOnline)
  1496. {
  1497. FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1498. }
  1499. else
  1500. {
  1501. FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1502. }
  1503. //for (size_t i = 0; i < num; i++)
  1504. //{
  1505. // pszAction = gpPath + m_strDevicePath;
  1506. // pszAction += "/Action/";
  1507. // pszAction += (m_Actions.GetKey(i));
  1508. // SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1509. // if (bOnline)
  1510. // {
  1511. // FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1512. // }
  1513. // else
  1514. // {
  1515. // FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1516. // }
  1517. //}
  1518. m_rsOnlineGroup.update(wsName.c_str(), bOnline);
  1519. }
  1520. else
  1521. {
  1522. FWARN("try Online but AbstractPath is none {$}", m_strClientID);
  1523. }
  1524. }
  1525. void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase)
  1526. {
  1527. string pszAction = devuribBase;
  1528. pszAction += "/Get/+";
  1529. SubscribeTopic(pconn, pszAction.c_str());
  1530. pszAction = devuribBase;
  1531. pszAction += "/Update/+";
  1532. SubscribeTopic(pconn, pszAction.c_str());
  1533. pszAction = devuribBase;
  1534. pszAction += "/Set/+";
  1535. SubscribeTopic(pconn, pszAction.c_str());
  1536. pszAction = devuribBase;
  1537. pszAction += "/Add/+";
  1538. SubscribeTopic(pconn, pszAction.c_str());
  1539. pszAction = devuribBase;
  1540. pszAction += "/Del/+";
  1541. SubscribeTopic(pconn, pszAction.c_str());
  1542. pszAction = devuribBase;
  1543. pszAction += "/Action/+";
  1544. SubscribeTopic(pconn, pszAction.c_str());
  1545. pszAction = devuribBase;
  1546. pszAction += "/Message/+";
  1547. SubscribeTopic(pconn, pszAction.c_str());
  1548. }
  1549. void LogicDevice::SubscribeActions()
  1550. {
  1551. FINFO("Begain");
  1552. if (nullptr == m_pMqttConntion)
  1553. return;
  1554. int num = m_Actions.size();
  1555. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
  1556. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
  1557. {
  1558. ResDataObject res,resAction;
  1559. GetDeviceResource(&res);
  1560. // std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
  1561. resAction = res["Action"];
  1562. FINFO("Actions from deviceresource [{$}]", resAction.size());
  1563. for (int x = 0; x < resAction.size(); x++)
  1564. {
  1565. string action = (string)resAction.GetKey(x);
  1566. if (action.length() > 0)
  1567. {
  1568. FINFO("Action from DeviceResource [{$}]", action);
  1569. m_Actions.update(action.c_str(), "");
  1570. }
  1571. }
  1572. num = m_Actions.size();
  1573. }
  1574. std::string pszAction;
  1575. int ret = 0;
  1576. SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str());
  1577. FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1578. if (m_strEBusRoot != m_strCCOSDevicePath)
  1579. {
  1580. pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1581. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1582. FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1583. }
  1584. if (m_strAbstractPath.length() > 0)
  1585. {
  1586. pszAction = m_strAbstractPath + m_strDevicePath;
  1587. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1588. FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1589. }
  1590. //for (size_t i = 0; i < num; i++)
  1591. //{
  1592. // //订阅ebus的路径
  1593. // pszAction = m_strEBusRoot;
  1594. // pszAction += "/Action/";
  1595. // pszAction += (m_Actions.GetKey(i));
  1596. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1597. // FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1598. //
  1599. // //订阅CCOS设备路径
  1600. // if (m_strEBusRoot != m_strCCOSDevicePath)
  1601. // {
  1602. // pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1603. // pszAction += "/Action/";
  1604. // pszAction += (m_Actions.GetKey(i));
  1605. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1606. // FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1607. // }
  1608. // //抽象设备路径要一直存在
  1609. // //订阅抽象设备路径
  1610. // if ( m_strAbstractPath.length() > 0)
  1611. // {
  1612. // pszAction = m_strAbstractPath + m_strDevicePath;
  1613. // pszAction += "/Action/";
  1614. // pszAction += (m_Actions.GetKey(i));
  1615. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1616. // FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1617. // }
  1618. //}
  1619. //pszAction = m_strEBusRoot;
  1620. //pszAction += "/Action/UpdateDeviceResource";
  1621. ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1622. //pTopicList->push_back(pszAction);
  1623. //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1624. //FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1625. //*/
  1626. // 等待所有订阅完成:轮询检查ack_list,确保所有SUBACK都已收到
  1627. // 这比固定延时更可靠,能确保订阅真正生效后才继续
  1628. FINFO("{$} Waiting for all subscriptions to complete...", m_strClientID);
  1629. const DWORD subscribeTimeoutMs = 10000; // 10秒超时(可能有多个订阅)
  1630. if (!WaitForSubscriptionComplete(m_pMqttConntion, subscribeTimeoutMs)) {
  1631. FERROR("{$} Some subscriptions did not complete within timeout", m_strClientID);
  1632. } else {
  1633. FINFO("{$} All subscriptions ready", m_strClientID);
  1634. }
  1635. // SUBACK收到后,等待Broker完成内部路由表更新
  1636. // 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
  1637. WaitForBrokerRouting(m_strClientID.c_str());
  1638. }
  1639. /*
  1640. void connlost(void* context, char* cause)
  1641. {
  1642. //连接中断
  1643. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1644. // std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
  1645. printf("\nConnection lost\n");
  1646. printf(" cause: %s\n", cause);
  1647. // std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
  1648. }
  1649. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  1650. {
  1651. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1652. // std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
  1653. }
  1654. */
  1655. //发送成功
  1656. //void delivered(void* context, MQTTAsync_token dt)
  1657. //{
  1658. // printf("Message with token value %d delivery confirmed\n", dt);
  1659. // //deliveredtoken = dt;
  1660. //}
  1661. //void connlost(void* context, char* cause)
  1662. //{
  1663. // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
  1664. // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
  1665. //
  1666. // //连接中断
  1667. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1668. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1669. // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
  1670. // return;
  1671. //
  1672. // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1673. // int rc;
  1674. //
  1675. // printf("Reconnecting\n");
  1676. // conn_opts.keepAliveInterval = 20;
  1677. // conn_opts.cleansession = 1;
  1678. // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  1679. // {
  1680. // printf("Failed to start connect, return code %d\n", rc);
  1681. // //finished = 1;
  1682. // }
  1683. //}
  1684. //重连成功
  1685. //void onReconnected(void* context, char* cause)
  1686. //{
  1687. // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
  1688. // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
  1689. //
  1690. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1691. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1692. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1693. // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1694. // const char* client_id = std::get<CLINET_ID_ID>(*connection);
  1695. // int rc;
  1696. //
  1697. // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
  1698. // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
  1699. // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
  1700. // // 重新订阅,暂不重新订阅,看看
  1701. //
  1702. // ///*
  1703. // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1704. // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1705. // //opts.onSuccess = onSubscribe;
  1706. // //opts.onFailure = onSubscribeFailure;
  1707. // //opts.context = client;
  1708. // auto it = pTopicList->begin();
  1709. // while(it != pTopicList->end())
  1710. // {
  1711. // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
  1712. // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
  1713. // it++;
  1714. // }
  1715. // //*/
  1716. //
  1717. //}
  1718. //
  1719. ////MQTT 连接主动断开连接失败
  1720. //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1721. //{
  1722. // printf("Disconnect failed, rc %d\n", response->code);
  1723. // //disc_finished = 1;
  1724. //}
  1725. //
  1726. ////MQTT 连接主动断开连接成功
  1727. //void onDisconnect(void* context, MQTTAsync_successData* response)
  1728. //{
  1729. // //printf("Successful disconnection\n");
  1730. // //disc_finished = 1;
  1731. //}
  1732. //
  1733. //void onSubscribe(void* context, MQTTAsync_successData* response)
  1734. //{
  1735. // printf("Subscribe succeeded\n");
  1736. // //subscribed = 1;
  1737. //}
  1738. //
  1739. //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1740. //{
  1741. // printf("Subscribe failed, rc %d\n", response->code);
  1742. // //finished = 1;
  1743. //}
  1744. //
  1745. //
  1746. //void onConnectFailure(void* context, MQTTAsync_failureData* response)
  1747. //{
  1748. // printf("Connect failed, rc %d\n", response->code);
  1749. // //finished = 1;
  1750. //}
  1751. //void onConnect(void* context, MQTTAsync_successData* response)
  1752. //{
  1753. //
  1754. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1755. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1756. //
  1757. // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
  1758. //
  1759. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1760. // int rc;
  1761. //
  1762. // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
  1763. // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
  1764. // if (hConnected != NULL)
  1765. // SetEvent(hConnected);
  1766. //
  1767. // /*
  1768. // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1769. // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1770. // opts.onSuccess = onSubscribe;
  1771. // opts.onFailure = onSubscribeFailure;
  1772. // opts.context = client;
  1773. // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  1774. // {
  1775. // printf("Failed to start subscribe, return code %d\n", rc);
  1776. // finished = 1;
  1777. // }*/
  1778. //}
  1779. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  1780. int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS);
  1781. void resubscribe_topic(void* client, void* reconnect_date)
  1782. {
  1783. mqtt_client* pClient = (mqtt_client*)client;
  1784. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1785. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1786. string client_id = std::get<CLINET_ID_ID>(*connection);
  1787. FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size());
  1788. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1789. pLock->Thread_Lock();
  1790. for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void {
  1791. FWARN("Resubscribe {$}", str);
  1792. mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd);
  1793. });
  1794. pLock->Thread_UnLock();
  1795. }
  1796. //MQTT消息抵达
  1797. //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1798. void msgarrivd(void* client, message_data_t* message)
  1799. {
  1800. // 日志前缀:时间+线程ID
  1801. const auto logPrefix = CurrentDateTime() + " msgarrivd:: TID [" + std::to_string(GetCurrentThreadId()) + "] ";
  1802. // std::cout << logPrefix << std::endl;
  1803. FINFO(logPrefix);
  1804. // 基础参数校验
  1805. if (!client || !message || !message->topic_name) return;
  1806. // 转换并校验客户端对象
  1807. auto* pClient = reinterpret_cast<mqtt_client*>(client);
  1808. if (!pClient || !pClient->mqtt_conn_context) {
  1809. std::cout << logPrefix << "Invalid client or connection context" << std::endl;
  1810. return;
  1811. }
  1812. // 转换并校验连接对象
  1813. ccos_mqtt_connection* connection = nullptr;
  1814. try {
  1815. connection = reinterpret_cast<ccos_mqtt_connection*>(pClient->mqtt_conn_context);
  1816. if (!connection) {
  1817. // std::cout << logPrefix << "Connection is null" << std::endl;
  1818. return;
  1819. }
  1820. }
  1821. catch (...) {
  1822. std::cout << logPrefix << "Failed to cast connection context" << std::endl;
  1823. return;
  1824. }
  1825. // 获取并校验锁对象
  1826. CcosLock* pLock = nullptr;
  1827. try {
  1828. pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1829. if (!pLock) {
  1830. // std::cout << logPrefix << "Lock is null" << std::endl;
  1831. return;
  1832. }
  1833. }
  1834. catch (...) {
  1835. std::cout << logPrefix << "Failed to get lock from connection" << std::endl;
  1836. return;
  1837. }
  1838. // 加锁并二次校验
  1839. pLock->Thread_Lock();
  1840. auto* pLockTemp = std::get<CONN_SEND_LOCK_ID>(*connection);
  1841. if (!pLockTemp) {
  1842. pLock->Thread_UnLock();
  1843. return;
  1844. }
  1845. if (!pClient->mqtt_conn_context) {
  1846. pLock->Thread_UnLock();
  1847. // std::cout << logPrefix << "Connection context became null during locking" << std::endl;
  1848. return;
  1849. }
  1850. // 获取客户端ID
  1851. std::string clientId;
  1852. try {
  1853. clientId = std::get<CLINET_ID_ID>(*connection);
  1854. }
  1855. catch (...) {
  1856. pLock->Thread_UnLock();
  1857. std::cout << logPrefix << "Failed to get client ID" << std::endl;
  1858. return;
  1859. }
  1860. // 获取MQTT客户端对象
  1861. void* mqttClient = nullptr;
  1862. try {
  1863. mqttClient = std::get<MQTT_CLT_ID>(*connection);
  1864. if (!mqttClient) {
  1865. pLock->Thread_UnLock();
  1866. // std::cout << logPrefix << "MQTT client is null" << std::endl;
  1867. return;
  1868. }
  1869. }
  1870. catch (...) {
  1871. pLock->Thread_UnLock();
  1872. std::cout << logPrefix << "Failed to get MQTT client" << std::endl;
  1873. return;
  1874. }
  1875. // 处理主题名
  1876. std::string topic;
  1877. try {
  1878. const size_t topicLen = strlen(message->topic_name);
  1879. const size_t maxTopicLen = sizeof(message->topic_name) - 1;
  1880. topic = (topicLen > maxTopicLen) ?
  1881. std::string(message->topic_name, maxTopicLen) :
  1882. std::string(message->topic_name);
  1883. }
  1884. catch (...) {
  1885. pLock->Thread_UnLock();
  1886. std::cout << logPrefix << "Failed to process topic name" << std::endl;
  1887. return;
  1888. }
  1889. // 输出消息接收日志
  1890. // std::cout << logPrefix << clientId << " Get Msg from " << topic << std::endl;
  1891. FINFO("[msgarrivd] {$} received message from topic: {$}, payload_len: {$}", clientId, topic, message->message->payloadlen);
  1892. // 校验消息数据
  1893. if (!message->message || !message->message->payload) {
  1894. pLock->Thread_UnLock();
  1895. std::cout << logPrefix << "Invalid message data" << std::endl;
  1896. return;
  1897. }
  1898. // 校验消息大小
  1899. const auto payloadLen = message->message->payloadlen;
  1900. if ((topic.length() >= MQTT_TOPIC_LEN_MAX && payloadLen > 16380) || payloadLen > 512 * 1024) {
  1901. pLock->Thread_UnLock();
  1902. // std::cout << logPrefix << "Message too large: topic_len=" << topic.length()
  1903. // << ", payload_len=" << payloadLen << std::endl;
  1904. return;
  1905. }
  1906. // 大消息提示
  1907. if (payloadLen > 16380) {
  1908. // std::cout << logPrefix << clientId << " Get big Msg from " << topic
  1909. // << " msg Len " << payloadLen << std::endl;
  1910. }
  1911. // 解码消息载荷
  1912. ResDataObject req;
  1913. try {
  1914. std::string payload(static_cast<const char*>(message->message->payload), payloadLen);
  1915. req.decode(payload.c_str());
  1916. FINFO("[msgarrivd] {$} decoded message successfully, KEY: {$}, TYPE: {$}, CMD: {$}",
  1917. clientId,
  1918. PacketAnalizer::GetPacketKey(&req),
  1919. (int)PacketAnalizer::GetPacketType(&req),
  1920. (int)PacketAnalizer::GetPacketCmd(&req));
  1921. }
  1922. catch (...) {
  1923. pLock->Thread_UnLock();
  1924. FERROR("[msgarrivd] {$} failed to decode message from topic: {$}", clientId, topic);
  1925. return;
  1926. }
  1927. // 处理消息钩子
  1928. void* pHook = nullptr;
  1929. try {
  1930. pHook = std::get<MSG_HOOK_ID>(*connection);
  1931. }
  1932. catch (...) {
  1933. pLock->Thread_UnLock();
  1934. return;
  1935. }
  1936. if (pHook) {
  1937. try {
  1938. auto* filter = reinterpret_cast<ccos_mqtt_msg_filter*>(pHook);
  1939. if (!filter) {
  1940. pLock->Thread_UnLock();
  1941. return;
  1942. }
  1943. auto func = std::get<FILTER_FUNC_ID>(*filter);
  1944. // std::cout << logPrefix << clientId << " Got Hook Process " << topic << std::endl;
  1945. FINFO("[msgarrivd] {$} processing hook for topic: {$}", clientId, topic);
  1946. auto* pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
  1947. if (!pRequests) {
  1948. pLock->Thread_UnLock();
  1949. FWARN("[msgarrivd] {$} pRequests is null", clientId);
  1950. return;
  1951. }
  1952. FINFO("[msgarrivd] {$} checking {$} pending requests in hook", clientId, pRequests->size());
  1953. // 遍历请求处理钩子
  1954. for (int x = 0; x < pRequests->size(); x++) {
  1955. // std::cout << logPrefix << clientId << " Hook Process Function " << topic << std::endl;
  1956. const std::string keyAction = pRequests->GetKey(x);
  1957. ResDataObject resObj = (*pRequests)[x];
  1958. auto hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1959. FINFO("[msgarrivd] {$} comparing: pending_key=[{$}] vs received_key=[{$}], msg_type={$}",
  1960. clientId, keyAction, PacketAnalizer::GetPacketKey(&req), (int)PacketAnalizer::GetPacketType(&req));
  1961. // 匹配钩子消息
  1962. if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES &&
  1963. keyAction == PacketAnalizer::GetPacketKey(&req)) {
  1964. // std::cout << logPrefix << clientId << " Got Hook Function Hooked.. " << topic << std::endl;
  1965. FINFO("[msgarrivd] {$} MATCHED! Hook triggered for key: {$}", clientId, keyAction);
  1966. // 打开事件(优先使用resObj,失败则用默认)
  1967. auto event = LinuxEvent::OpenEvent(std::string(resObj).c_str());
  1968. if (!event) event = hEvent;
  1969. // 设置响应对象
  1970. auto* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
  1971. if (pResponse) {
  1972. pResponse->add(keyAction.c_str(), req);
  1973. FINFO("[msgarrivd] {$} added response to pResponse for key: {$}", clientId, keyAction);
  1974. } else {
  1975. FWARN("[msgarrivd] {$} pResponse is null, cannot store response", clientId);
  1976. }
  1977. // 触发事件
  1978. if (event) {
  1979. try {
  1980. event->SetEvent();
  1981. FINFO("[msgarrivd] {$} event signaled successfully for key: {$}", clientId, keyAction);
  1982. }
  1983. catch (...) {
  1984. FERROR("[msgarrivd] {$} failed to signal event for key: {$}", clientId, keyAction);
  1985. }
  1986. } else {
  1987. FWARN("[msgarrivd] {$} event is null, cannot signal", clientId);
  1988. }
  1989. // 清理并退出
  1990. pRequests->eraseOneOf(keyAction.c_str(), 0);
  1991. FINFO("[msgarrivd] {$} erased request from pending list, key: {$}", clientId, keyAction);
  1992. pLock->Thread_UnLock();
  1993. return;
  1994. }
  1995. }
  1996. }
  1997. catch (...) {
  1998. FERROR("[msgarrivd] {$} exception in hook processing for topic: {$}", clientId, topic);
  1999. }
  2000. }
  2001. // 未命中钩子,继续处理
  2002. // std::cout << logPrefix << clientId << " Go on Process " << topic << std::endl;
  2003. FINFO("[msgarrivd] {$} hook not matched, calling user callback for topic: {$}", clientId, topic);
  2004. pLock->Thread_UnLock();
  2005. // 调用用户回调
  2006. ccos_mqtt_callback onmsg = nullptr;
  2007. try {
  2008. onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
  2009. }
  2010. catch (...) {
  2011. return;
  2012. }
  2013. if (onmsg) {
  2014. // 使用异步处理避免阻塞MQTT接收流程
  2015. try {
  2016. AsyncMsgHandler::getInstance().handleMessageAsync(onmsg, req, topic, connection);
  2017. }
  2018. catch (const std::exception& e) {
  2019. }
  2020. catch (...) {
  2021. }
  2022. }
  2023. else {
  2024. // std::cout << logPrefix << "**** ----- **** USER_MSG_CAKBCK_ID is null "
  2025. // << clientId << " When processing " << topic << std::endl;
  2026. }
  2027. }
  2028. void* MqttSendThreadFunc(void* pPara)
  2029. {
  2030. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara;
  2031. if (!connection) {
  2032. return NULL;
  2033. }
  2034. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  2035. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*connection);
  2036. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  2037. std::string client_id = std::get<CLINET_ID_ID>(*connection);
  2038. sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
  2039. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  2040. // 添加延时控制相关变量(每个连接独立,不使用static)
  2041. DWORD lastSendTime = 0;
  2042. const DWORD MIN_SEND_INTERVAL = 50; // 最小发送间隔50ms
  2043. // 线程启动日志
  2044. //std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
  2045. while (std::get<THREAD_RUNNING_ID>(*connection)) {
  2046. struct timespec ts;
  2047. clock_gettime(CLOCK_REALTIME, &ts);
  2048. ts.tv_sec += 1;
  2049. // 等待信号量前的日志
  2050. //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
  2051. if (sem_timedwait(hSemaphore, &ts) == 0) {
  2052. pLock->Thread_Lock();
  2053. if (pList->empty()) {
  2054. pLock->Thread_UnLock();
  2055. continue;
  2056. }
  2057. ResDataObject* pMsg = pList->front();
  2058. pList->pop_front();
  2059. pLock->Thread_UnLock();
  2060. if (pMsg->size() > 0) {
  2061. const char* pTopic = pMsg->GetKey(0);
  2062. std::string m_strSendMessage = (std::string)(*pMsg)[pTopic];
  2063. // ========== 简化延时控制逻辑 ==========
  2064. DWORD now = GetTickCount();
  2065. DWORD elapsed = (lastSendTime == 0) ? MIN_SEND_INTERVAL + 1 : (now - lastSendTime);
  2066. // 如果上次发送在最小间隔内,需要延时
  2067. if (elapsed < MIN_SEND_INTERVAL) {
  2068. DWORD sleepMs = MIN_SEND_INTERVAL - elapsed;
  2069. FINFO("[MqttSendThreadFunc] {$} - Sleeping {$}ms to avoid QoS handshake conflict, elapsed: {$}ms",
  2070. client_id, sleepMs, elapsed);
  2071. if (sleepMs > 0) {
  2072. // 使用usleep进行毫秒级延时
  2073. usleep(sleepMs * 1000);
  2074. // 更新当前时间(因为经过了延时)
  2075. now = GetTickCount();
  2076. }
  2077. }
  2078. // 更新最后发送时间
  2079. lastSendTime = now;
  2080. // ========== 延时控制逻辑结束 ==========
  2081. // 发送消息
  2082. PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id,
  2083. pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
  2084. }
  2085. delete pMsg;
  2086. }
  2087. else if (errno == ETIMEDOUT) {
  2088. //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
  2089. continue;
  2090. }
  2091. else {
  2092. // 其他错误情况
  2093. //std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
  2094. continue;
  2095. }
  2096. }
  2097. return NULL;
  2098. }
  2099. // 安全遍历MQTT消息处理程序的宏
  2100. #define mqtt_list_for_each_entry_safe(pos, n, head, member) \
  2101. for (pos = container_of((head)->next, decltype(*pos), member), \
  2102. n = container_of(pos->member.next, decltype(*pos), member); \
  2103. &pos->member != (head); \
  2104. pos = n, n = container_of(n->member.next, decltype(*n), member))
  2105. // 通过成员指针获取包含结构的指针
  2106. #define container_of(ptr, type, member) ({ \
  2107. const typeof(((type*)0)->member)* __mptr = (ptr); \
  2108. (type*)((char*)__mptr - offsetof(type, member)); })
  2109. // 重连回调函数 - 实现自动重订阅
  2110. static void reconnect_handler(void* client, void* reconnect_data) {
  2111. mqtt_client_t* c = (mqtt_client_t*)client;
  2112. message_handlers_t* pos, * n;
  2113. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context;
  2114. // 遍历所有已订阅的主题并重新订阅
  2115. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  2116. pLock->Thread_Lock();
  2117. /*mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) {
  2118. mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler);
  2119. }*/
  2120. pLock->Thread_UnLock();
  2121. }
  2122. mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
  2123. if (!connection) {
  2124. std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl;
  2125. return nullptr;
  2126. }
  2127. //mqtt_log_init();
  2128. mqtt_client_t* client = NULL;
  2129. client = mqtt_lease();
  2130. mqtt_log_init();
  2131. if (!client) { // 修复:检查客户端指针是否有效
  2132. return nullptr;
  2133. }
  2134. std::get<MQTT_CLT_ID>(*connection) = client;
  2135. client->mqtt_conn_context = connection;
  2136. std::string& pszClientID = std::get<CLINET_ID_ID>(*connection);
  2137. // 使用本地副本存储连接参数
  2138. std::string host = SERVER_ADDRESS;
  2139. std::string port = "1883";
  2140. std::string username = "zskkcc";
  2141. std::string password = "zskk1234";
  2142. // std::cout << "[CONFIG] Setting connection parameters:"
  2143. // << "\n\tClient ID: " << pszClientID
  2144. // << "\n\tHost: " << host
  2145. // << "\n\tPort: " << port
  2146. // << std::endl;
  2147. // 设置MQTT连接参数
  2148. mqtt_set_host(client, const_cast<char*>(host.c_str()));
  2149. mqtt_set_port(client, const_cast<char*>(port.c_str()));
  2150. mqtt_set_user_name(client, const_cast<char*>(username.c_str()));
  2151. mqtt_set_password(client, const_cast<char*>(password.c_str()));
  2152. mqtt_set_client_id(client, const_cast<char*>(pszClientID.c_str()));
  2153. mqtt_set_clean_session(client, 0);
  2154. mqtt_set_write_buf_size(client, 8192);
  2155. mqtt_set_read_buf_size(client, 8192);
  2156. mqtt_set_cmd_timeout(client, 2000);
  2157. mqtt_set_keep_alive_interval(client, 30);
  2158. //mqtt_subscribe();
  2159. //mqtt_set_reconnect_handler(pMqttClient, reconnect_handler);
  2160. int rc = 0;
  2161. const uint64_t timeout_ms = 10000; // 10秒超时
  2162. uint64_t start_time = GetTickCount();
  2163. int attempt_count = 0;
  2164. bool connected = false;
  2165. // std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl;
  2166. try {
  2167. while (true) {
  2168. attempt_count++;
  2169. uint64_t current_time = GetTickCount();
  2170. uint64_t elapsed = current_time - start_time;
  2171. // 检查超时
  2172. if (elapsed > timeout_ms) {
  2173. std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl;
  2174. return nullptr;
  2175. }
  2176. // std::cout << CurrentDateTime() << " [ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
  2177. rc = mqtt_connect(client);
  2178. if (rc == MQTT_SUCCESS_ERROR) {
  2179. connected = true;
  2180. break;
  2181. }
  2182. std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl;
  2183. // 计算等待时间
  2184. uint64_t remaining = timeout_ms - elapsed;
  2185. uint64_t wait_time = (remaining > 2000) ? 2000 : remaining;
  2186. if (wait_time > 0) {
  2187. // std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl;
  2188. usleep(wait_time * 1000);
  2189. }
  2190. }
  2191. if (connected) {
  2192. uint64_t total_time = GetTickCount() - start_time;
  2193. FINFO("[SUCCESS] Connected in {$}ms after {$} attempts", total_time, attempt_count);
  2194. // 等待CONNACK处理完成:mqtt_connect()是异步的,返回0只表示CONNECT包已发送
  2195. // 需要给MQTT内部线程一点时间处理CONNACK(通常几十毫秒内完成)
  2196. // 实际的订阅完成由WaitForSubscriptionComplete()保证,所以这里只需要短暂等待
  2197. const int connackDelayMs = 100;
  2198. FINFO("[MQTT] Waiting {$}ms for CONNACK processing...", connackDelayMs);
  2199. usleep(connackDelayMs * 1000);
  2200. FINFO("[MQTT] CONNACK wait complete, connection ready for subscriptions");
  2201. // 安全地设置连接上下文
  2202. try {
  2203. return client;
  2204. }
  2205. catch (const std::exception& e) {
  2206. FERROR("[ERROR] Failed to set connection context: {$}", e.what());
  2207. return nullptr;
  2208. }
  2209. }
  2210. }
  2211. catch (const std::exception& e) {
  2212. std::cerr << "[ERROR] Exception during connection: " << e.what() << std::endl;
  2213. return nullptr;
  2214. }
  2215. catch (...) {
  2216. std::cerr << "[ERROR] Unknown exception during connection" << std::endl;
  2217. return nullptr;
  2218. }
  2219. return nullptr;
  2220. }
  2221. ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
  2222. {
  2223. FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
  2224. // std::cout << "TID " << GetCurrentThreadId()
  2225. // << " : " << pszClientID
  2226. // << " NewConnection " << pszServer
  2227. // << ":" << pszServerPort
  2228. // << " user: " << pszUserName
  2229. // << " password " << pszPassword
  2230. // << std::endl;
  2231. //连接MQTT broker
  2232. DWORD dwTick = GetTickCount();
  2233. std::string clientIdStr(pszClientID ? pszClientID : "");
  2234. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2235. try {
  2236. // 初始化元组字段
  2237. std::get<MQTT_TIPIC_LIST_ID>(*connection) = new mqtt_topic_list;
  2238. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2239. std::get<USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2240. CcosLock* pLock = new CcosLock();
  2241. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2242. pLock->Thread_Lock();
  2243. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2244. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2245. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2246. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2247. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2248. // std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
  2249. mqtt_client* pMqttClient = InnerConnect(connection);
  2250. if (pMqttClient == nullptr)
  2251. {
  2252. pLock->Thread_UnLock();
  2253. throw std::runtime_error("InnerConnect failed");
  2254. }
  2255. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2256. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2257. // 创建POSIX信号量
  2258. sem_t* semaphore = new sem_t;
  2259. if (sem_init(semaphore, 0, 0) != 0) {
  2260. pLock->Thread_UnLock();
  2261. delete semaphore;
  2262. throw std::runtime_error("Failed to initialize semaphore");
  2263. }
  2264. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2265. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2266. // 创建发送线程
  2267. pthread_t threadId;
  2268. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2269. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2270. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2271. // 错误处理...
  2272. sem_destroy(semaphore);
  2273. delete semaphore;
  2274. pLock->Thread_UnLock();
  2275. throw std::runtime_error("Failed to initialize semaphore");
  2276. }
  2277. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2278. // std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2279. pLock->Thread_UnLock();
  2280. uint64_t dwWaitTick = GetTickCount() - dwTick;
  2281. // std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2282. // << dwWaitTick << " ms" << std::endl;
  2283. return connection;
  2284. }
  2285. catch (const std::exception& e) {
  2286. std::cerr << "Exception in NewConnection: " << e.what() << std::endl;
  2287. // 清理资源
  2288. if (std::get<SEMAPHORE_HANDLE_ID>(*connection)) {
  2289. sem_destroy(std::get<SEMAPHORE_HANDLE_ID>(*connection));
  2290. delete std::get<SEMAPHORE_HANDLE_ID>(*connection);
  2291. }
  2292. delete std::get<MSG_LIST_ID>(*connection);
  2293. delete std::get<CONN_SEND_LOCK_ID>(*connection);
  2294. delete std::get<MQTT_TIPIC_LIST_ID>(*connection);
  2295. delete connection;
  2296. return nullptr;
  2297. }
  2298. }
  2299. //创建额外连接,需要提供回调函数
  2300. LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async)
  2301. {
  2302. DWORD dwTick = GetTickCount();
  2303. FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
  2304. //mqtt_client* pMqttClient = nullptr;
  2305. //pMqttClient = mqtt_lease();
  2306. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2307. //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  2308. //std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2309. mqtt_topic_list* pTopicList = new std::list<string>;
  2310. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  2311. CcosLock* pLock = new CcosLock();
  2312. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2313. pLock->Thread_Lock();
  2314. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2315. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2316. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2317. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2318. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2319. // std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
  2320. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2321. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2322. mqtt_client* pMqttClient = InnerConnect(connection);
  2323. if (pMqttClient == nullptr)
  2324. {
  2325. pLock->Thread_UnLock();
  2326. delete std::get<MQTT_TIPIC_LIST_ID>(*connection); // 删除 pTopicList
  2327. delete std::get<CONN_SEND_LOCK_ID>(*connection); // 删除 pLock
  2328. delete connection;
  2329. delete pfilter;
  2330. return nullptr;
  2331. }
  2332. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2333. //pMqttClient->mqtt_conn_context = connection;
  2334. //设置回调函数
  2335. //设置MQTT连接参数
  2336. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2337. /*
  2338. //连接Broker
  2339. int rc = 0;
  2340. dwTick = GetTickCount();
  2341. while (true)
  2342. {
  2343. rc = mqtt_connect(pMqttClient);
  2344. if (rc != MQTT_SUCCESS_ERROR)
  2345. {
  2346. if (GetTickCount() - dwTick > dwOpenTimeout)
  2347. {
  2348. FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2349. pLock->Thread_UnLock();
  2350. delete connection;
  2351. delete pfilter;
  2352. return nullptr;
  2353. }
  2354. else
  2355. {
  2356. FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2357. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
  2358. Sleep(20);
  2359. }
  2360. }
  2361. else
  2362. {
  2363. break;
  2364. }
  2365. }*/
  2366. DWORD dwWaitTick = GetTickCount() - dwTick;
  2367. FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  2368. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2369. // 创建POSIX信号量
  2370. sem_t* semaphore = new sem_t;
  2371. sem_init(semaphore, 0, 0);
  2372. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2373. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2374. // 创建发送线程
  2375. // std::cout << "Preapre create MqttSendThreadFunc" << std::endl;
  2376. pthread_t threadId;
  2377. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2378. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2379. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2380. // 错误处理...
  2381. sem_destroy(semaphore);
  2382. delete semaphore;
  2383. pLock->Thread_UnLock();
  2384. delete connection;
  2385. return nullptr;
  2386. }
  2387. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2388. // std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2389. pLock->Thread_UnLock();
  2390. // std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2391. // << dwWaitTick << " ms" << std::endl;
  2392. return connection;
  2393. }
  2394. //重置连接
  2395. LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection)
  2396. {
  2397. if (hConnection == nullptr)
  2398. {
  2399. return;
  2400. }
  2401. FWARN(" Reset Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  2402. // std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
  2403. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2404. if (pconn != nullptr) {
  2405. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2406. pLock->Thread_Lock();
  2407. //mqtt_do_reconnect(pconn);
  2408. mqtt_disconnect(pconn);
  2409. /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic);
  2410. mqtt_connect(pconn);*/
  2411. mqtt_release(pconn);
  2412. pconn = InnerConnect(hConnection);
  2413. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2414. if (pconn != nullptr)
  2415. {
  2416. //std::get<MQTT_CLT_ID>(*hConnection) = pconn;
  2417. //resubscribe_topic(pconn, pconn->mqtt_conn_context);
  2418. //FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2419. }
  2420. pLock->Thread_UnLock();
  2421. }
  2422. }
  2423. //关闭并释放连接
  2424. LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
  2425. {
  2426. if (!hConnection) return;
  2427. auto clientId = std::get<CLINET_ID_ID>(*hConnection);
  2428. // std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl;
  2429. std::get<THREAD_RUNNING_ID>(*hConnection) = false;
  2430. sem_t* semaphore = std::get<SEMAPHORE_HANDLE_ID>(*hConnection);
  2431. if (semaphore) {
  2432. sem_post(semaphore); // 发送信号量唤醒线程
  2433. }
  2434. pthread_t sendThread = std::get<CONNECTED_HANDLE_ID>(*hConnection);
  2435. if (sendThread != 0) {
  2436. // 等待线程退出,超时时间设为1秒
  2437. struct timespec timeout;
  2438. clock_gettime(CLOCK_REALTIME, &timeout);
  2439. timeout.tv_sec += 1;
  2440. int joinResult = pthread_timedjoin_np(sendThread, nullptr, &timeout);
  2441. if (joinResult != 0) {
  2442. std::cerr << CurrentDateTime() << " " << clientId
  2443. << " Warning: Send thread did not exit in time, force cancel" << std::endl;
  2444. pthread_cancel(sendThread); // 超时后强制终止(最后的备选方案)
  2445. pthread_join(sendThread, nullptr);
  2446. }
  2447. std::get<CONNECTED_HANDLE_ID>(*hConnection) = 0; // 重置线程ID
  2448. }
  2449. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2450. if (nullptr != pLock)
  2451. pLock->Thread_Lock();
  2452. else
  2453. return;
  2454. mqtt_client* pMqttClient = static_cast<mqtt_client*>(std::get<MQTT_CLT_ID>(*hConnection));
  2455. if (pMqttClient) {
  2456. mqtt_disconnect(pMqttClient); // 断开连接
  2457. mqtt_release(pMqttClient); // 释放客户端实例
  2458. pMqttClient = NULL;
  2459. std::get<MQTT_CLT_ID>(*hConnection) = nullptr;
  2460. }
  2461. ccos_mqtt_msg_filter* pFilter = static_cast<ccos_mqtt_msg_filter*>(std::get<MSG_HOOK_ID>(*hConnection));
  2462. if (pFilter) {
  2463. // std::cout << "Free Connection filter [" << (UINT64)pFilter << "] .." << std::endl;
  2464. // 释放filter内部的资源对象
  2465. delete std::get<FILTER_RES_OBJ_ID>(*pFilter);
  2466. delete std::get<FILTER_RESPONS_OBJ_ID>(*pFilter);
  2467. delete pFilter;
  2468. std::get<MSG_HOOK_ID>(*hConnection) = nullptr;
  2469. }
  2470. mqtt_msg_list* pMsgList = std::get<MSG_LIST_ID>(*hConnection);
  2471. if (pMsgList) {
  2472. // 清理剩余未发送的消息
  2473. while (!pMsgList->empty()) {
  2474. ResDataObject* pMsg = pMsgList->front();
  2475. pMsgList->pop_front();
  2476. delete pMsg; // 释放消息对象
  2477. }
  2478. delete pMsgList;
  2479. std::get<MSG_LIST_ID>(*hConnection) = nullptr;
  2480. }
  2481. if (semaphore) {
  2482. sem_destroy(semaphore); // 销毁信号量
  2483. delete semaphore;
  2484. std::get<SEMAPHORE_HANDLE_ID>(*hConnection) = nullptr;
  2485. }
  2486. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2487. if (pTopicList) {
  2488. pTopicList->clear(); // 清空列表
  2489. delete pTopicList;
  2490. std::get<MQTT_TIPIC_LIST_ID>(*hConnection) = nullptr;
  2491. }
  2492. if (pLock) {
  2493. pLock->Thread_UnLock(); // 确保锁被释放
  2494. delete pLock;
  2495. std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
  2496. }
  2497. delete hConnection;
  2498. // std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl;
  2499. }
  2500. //主动订阅主题
  2501. LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
  2502. {
  2503. // std::cout << "SubscribeTopic called. Topic: " << (pszTopic ? pszTopic : "null")
  2504. // << ", isShare: " << (isShare ? "true" : "false") << std::endl;
  2505. if (hConnection == nullptr)
  2506. {
  2507. std::cout << "SubscribeTopic error: hConnection is nullptr" << std::endl;
  2508. return 0;
  2509. }
  2510. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2511. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2512. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2513. pLock->Thread_Lock();
  2514. pTopicList->push_back(std::string(pszTopic));
  2515. //pMqttClient->subscribe(pszTopic, 1, opts);
  2516. int rc = MQTT_SUCCESS_ERROR;
  2517. DWORD dwTick = GetTickCount();
  2518. int nTryTimes = 0;
  2519. do
  2520. {
  2521. if (pTopicList->size() < 1)
  2522. {
  2523. const char* topicToSubscribe = pTopicList->back().c_str();
  2524. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2525. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2526. FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2527. // std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe First " << pszTopic << endl;
  2528. }
  2529. else
  2530. {
  2531. const char* topicToSubscribe = pTopicList->back().c_str();
  2532. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2533. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2534. FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2535. // std::cout << CurrentDateTime() << " mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << " ret: "<<ret <<endl;
  2536. }
  2537. if (rc != MQTT_SUCCESS_ERROR)
  2538. {
  2539. FWARN("try do mqtt reconnect ..");
  2540. //mqtt_do_reconnect(pMqttClient);
  2541. mqtt_disconnect(pMqttClient);
  2542. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2543. mqtt_release(pMqttClient);
  2544. pMqttClient = InnerConnect(hConnection);
  2545. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2546. if (pMqttClient != nullptr)
  2547. {
  2548. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2549. //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2550. FINFO("Reconnected successfully, connection ready for subscriptions");
  2551. }
  2552. // 移除2秒延时:InnerConnect已等待CONNACK,WaitForSubscriptionComplete会确保订阅完成
  2553. }
  2554. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2555. pLock->Thread_UnLock();
  2556. return 0;
  2557. }
  2558. // 等待订阅完成(轮询检查ack_list,确保SUBACK已收到)
  2559. LOGICDEVICE_API bool WaitForSubscriptionComplete(ccos_mqtt_connection* hConnection, DWORD timeoutMs)
  2560. {
  2561. if (hConnection == nullptr) {
  2562. FERROR("WaitForSubscriptionComplete: hConnection is nullptr");
  2563. return false;
  2564. }
  2565. mqtt_client_t* pMqttClient = (mqtt_client_t*)std::get<MQTT_CLT_ID>(*hConnection);
  2566. if (pMqttClient == nullptr) {
  2567. FERROR("WaitForSubscriptionComplete: pMqttClient is nullptr");
  2568. return false;
  2569. }
  2570. const std::string clientId = std::get<CLINET_ID_ID>(*hConnection);
  2571. FINFO("[WaitForSubscriptionComplete] {$} - Waiting for all subscriptions to complete (timeout: {$}ms)",
  2572. clientId, timeoutMs);
  2573. DWORD startTime = GetTickCount();
  2574. const DWORD pollIntervalMs = 10; // 每10ms轮询一次
  2575. int lastAckCount = -1;
  2576. while (GetTickCount() - startTime < timeoutMs) {
  2577. // 检查ack_handler_list中是否还有待处理的SUBACK
  2578. // mqtt_ack_handler_number记录了ack_list中的条目数量
  2579. int currentAckCount = pMqttClient->mqtt_ack_handler_number;
  2580. // 如果ack数量变化,输出日志
  2581. if (currentAckCount != lastAckCount) {
  2582. FINFO("[WaitForSubscriptionComplete] {$} - Pending ack count: {$}", clientId, currentAckCount);
  2583. lastAckCount = currentAckCount;
  2584. }
  2585. // 如果没有待处理的ack,说明所有订阅都已完成
  2586. if (currentAckCount == 0) {
  2587. DWORD elapsed = GetTickCount() - startTime;
  2588. FINFO("[WaitForSubscriptionComplete] {$} - All subscriptions completed in {$}ms",
  2589. clientId, elapsed);
  2590. return true;
  2591. }
  2592. // 短暂休眠后继续轮询
  2593. usleep(pollIntervalMs * 1000);
  2594. }
  2595. // 超时
  2596. int finalAckCount = pMqttClient->mqtt_ack_handler_number;
  2597. FERROR("[WaitForSubscriptionComplete] {$} - Timeout after {$}ms, still have {$} pending acks",
  2598. clientId, timeoutMs, finalAckCount);
  2599. return false;
  2600. }
  2601. // 等待Broker完成路由表更新 - 可配置延时
  2602. // 通过环境变量 MQTT_BROKER_ROUTING_DELAY_MS 配置,默认200ms
  2603. LOGICDEVICE_API DWORD WaitForBrokerRouting(const char* clientId)
  2604. {
  2605. static int brokerDelayMs = -1; // 静态变量,只读取一次环境变量
  2606. if (brokerDelayMs == -1) {
  2607. const char* envDelay = getenv("MQTT_BROKER_ROUTING_DELAY_MS");
  2608. if (envDelay != nullptr && atoi(envDelay) > 0) {
  2609. brokerDelayMs = atoi(envDelay);
  2610. FINFO("[WaitForBrokerRouting] Using custom delay from environment: {$}ms", brokerDelayMs);
  2611. } else {
  2612. brokerDelayMs = 100; // 默认100ms,足够Broker处理路由表更新
  2613. FINFO("[WaitForBrokerRouting] Using default delay: {$}ms (set MQTT_BROKER_ROUTING_DELAY_MS to customize)", brokerDelayMs);
  2614. }
  2615. }
  2616. if (clientId != nullptr) {
  2617. FINFO("[WaitForBrokerRouting] {$} - Waiting {$}ms for Broker to process subscription routing",
  2618. clientId, brokerDelayMs);
  2619. }
  2620. usleep(brokerDelayMs * 1000);
  2621. return brokerDelayMs;
  2622. }
  2623. //主题订阅取消
  2624. LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  2625. {
  2626. if (hConnection == nullptr)
  2627. {
  2628. return 0;
  2629. }
  2630. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2631. //if (!pMqttClient->is_connected()) {
  2632. // //FERROR( " MQTT connection lost at subscribe %s ", topic);
  2633. // return 0;
  2634. //}
  2635. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2636. //auto ret = pMqttClient->unsubscribe(pszTopic);
  2637. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2638. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2639. pLock->Thread_Lock();
  2640. //MQTTAsync_responseOptions resp;
  2641. pTopicList->remove(pszTopic);
  2642. int rc = MQTT_SUCCESS_ERROR;
  2643. DWORD dwTick = GetTickCount();
  2644. int nTryTimes = 0;
  2645. do
  2646. {
  2647. int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
  2648. FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  2649. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2650. pLock->Thread_UnLock();
  2651. return 2;
  2652. }
  2653. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  2654. LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout)
  2655. {
  2656. char pszClientID[256];
  2657. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2658. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2659. getpid(), GetCurrentThreadId());
  2660. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
  2661. });
  2662. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
  2663. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  2664. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2665. string pLoad = pCmd->encode();
  2666. int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName);
  2667. // std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2668. FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc);
  2669. CloseConnection(connObj);
  2670. return rc;
  2671. }
  2672. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  2673. LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout)
  2674. {
  2675. if (hConnection == nullptr)
  2676. {
  2677. // 移除 encode() 调用避免内存问题
  2678. FDEBUG("Who ????? Publish to {$} Action Body: (not logged to avoid memory issues)", pszTopic);
  2679. return 0;
  2680. }
  2681. // std::cout << CurrentDateTime()<<" " << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  2682. string topic = pszTopic;
  2683. if (topic.length() <= 0)
  2684. {
  2685. // 移除 encode() 调用避免内存问题
  2686. FWARN("ignore empty topic packet");
  2687. return 2;
  2688. }
  2689. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2690. //if (!pMqttClient->is_connected()) {
  2691. // //FERROR( " MQTT connection lost at subscribe %s ", topic);
  2692. // return 0;
  2693. //}
  2694. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2695. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2696. pLock->Thread_Lock();
  2697. // 移除 encode() 调用避免内存问题
  2698. FDEBUG("{$} Publish Action to {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic);
  2699. //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction);
  2700. //if(org_publisher.length() <= 0)
  2701. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
  2702. ResDataObject* pPacket = new ResDataObject();
  2703. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2704. pPacket->add(pszTopic, pAction->encode());
  2705. //FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2706. pList->push_back(pPacket);
  2707. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2708. // std::cout << "try publish " << pAction->encode() << endl;
  2709. //pMqttClient->publish(pszTopic, pAction->encode());
  2710. /*
  2711. //pConn->publish(pszTopic, pCmd->encode());
  2712. // std::cout << "try publish " << pAction->encode() << endl;
  2713. const char* pLoad = pAction->encode();
  2714. int len = strlen(pLoad);
  2715. //MQTTAsync_responseOptions resp;
  2716. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2717. mqtt_message_t msg;
  2718. memset(&msg, 0, sizeof(msg));
  2719. msg.payload = (void*)pLoad;
  2720. msg.qos = MQTT_QOS;
  2721. msg.payloadlen = len;
  2722. int rc = MQTT_SUCCESS_ERROR;
  2723. DWORD dwTick = GetTickCount();
  2724. int nTryTimes = 0;
  2725. do
  2726. {
  2727. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2728. nTryTimes++;
  2729. if (rc != MQTT_SUCCESS_ERROR)
  2730. {
  2731. FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id);
  2732. //mqtt_do_reconnect(pMqttClient);
  2733. mqtt_disconnect(pMqttClient);
  2734. rc = mqtt_connect(pMqttClient);
  2735. if (rc == MQTT_SUCCESS_ERROR)
  2736. {
  2737. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2738. FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2739. }
  2740. Sleep(2);
  2741. }
  2742. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout);
  2743. //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2744. FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2745. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  2746. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
  2747. if (rc < 0)
  2748. {
  2749. FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2750. //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
  2751. pLock->Thread_UnLock();
  2752. return rc;
  2753. }*/
  2754. //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
  2755. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2756. FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size());
  2757. pLock->Thread_UnLock();
  2758. return 2;
  2759. }
  2760. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  2761. int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos)
  2762. {
  2763. const int dwTimeout = 500;
  2764. if (pMqttClient == nullptr)
  2765. {
  2766. FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message);
  2767. return 0;
  2768. }
  2769. FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message);
  2770. std::string messageCopy = message;
  2771. const char* pLoad = messageCopy.c_str();
  2772. int len = messageCopy.length();
  2773. mqtt_message_t msg;
  2774. memset(&msg, 0, sizeof(msg));
  2775. msg.payload = (void*)pLoad;
  2776. msg.payloadlen = len;
  2777. msg.qos = qos;
  2778. msg.retained = 0;
  2779. int rc = MQTT_SUCCESS_ERROR;
  2780. DWORD dwTick = GetTickCount();
  2781. int nTryTimes = 0;
  2782. do
  2783. {
  2784. // 防止空指针崩溃:检查 pMqttClient 是否为空
  2785. if (pMqttClient == nullptr)
  2786. {
  2787. FERROR("pMqttClient is NULL before mqtt_publish, cannot publish to {$}", pszTopic);
  2788. return MQTT_NOT_CONNECT_ERROR;
  2789. }
  2790. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2791. nTryTimes++;
  2792. if (rc != MQTT_SUCCESS_ERROR)
  2793. {
  2794. FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id);
  2795. //mqtt_do_reconnect(pMqttClient);
  2796. ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context;
  2797. mqtt_disconnect(pMqttClient);
  2798. /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2799. mqtt_connect(pMqttClient);*/
  2800. mqtt_release(pMqttClient);
  2801. pMqttClient = nullptr;
  2802. usleep(10000 * 1000);
  2803. pMqttClient = InnerConnect(hConnection);
  2804. //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2805. if (pMqttClient != nullptr)
  2806. {
  2807. break;
  2808. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2809. //resubscribe_topic(pMqttClient, hConnection);
  2810. //FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2811. }
  2812. else
  2813. {
  2814. // 重连失败,记录错误并退出循环
  2815. FERROR("InnerConnect failed for {$}, giving up publish to {$}", client_id, pszTopic);
  2816. return MQTT_NOT_CONNECT_ERROR;
  2817. }
  2818. }
  2819. } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
  2820. cout << CurrentDateTime() << " CLT " << client_id.c_str() << " PublishAction to " << pszTopic << " send Times " << nTryTimes << " result " << rc << endl;
  2821. FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2822. if (rc < 0)
  2823. {
  2824. FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2825. return rc;
  2826. }
  2827. return 2;
  2828. }
  2829. /*
  2830. /// <summary>
  2831. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  2832. /// 超时没收到应答返回失败,
  2833. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  2834. /// </summary>
  2835. /// <param name="pAction">要发送的命令Action名</param>
  2836. /// <param name="pContext">命令携带的参数</param>
  2837. /// <param name="pszTopic">要发送的目标topic</param>
  2838. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  2839. /// <param name="resObj">应答返回的参数结果</param>
  2840. /// <param name="dwWaitTime">等待超时时间</param>
  2841. /// <param name="hConnection">复用的MQTT链接句柄</param>
  2842. /// <param name="onmsg">复用的链接的消息处理函数</param>
  2843. /// <returns>成功返回2,其他返回错误码</returns>
  2844. LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  2845. ResDataObject& resObj, DWORD dwWaitTime)
  2846. {
  2847. // std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2848. if (pszRespTopic != nullptr)
  2849. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2850. if (hConnection == nullptr)
  2851. {
  2852. return 0;
  2853. }
  2854. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2855. //if (!pMqttClient->is_connected()) {
  2856. // //FERROR( " MQTT connection lost at subscribe %s ", topic);
  2857. // return 0;
  2858. //}
  2859. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  2860. //SubscribeTopic()
  2861. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
  2862. string topic = topicName;//msg->get_topic().c_str();
  2863. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  2864. // std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
  2865. //应答到了,处理
  2866. ResDataObject req;
  2867. req.decode((const char*)message->payload);
  2868. PacketAnalizer::GetPacketContext(&req, resObj);
  2869. SetEvent(hEvent);
  2870. //处理结束 将函数指针换回去
  2871. void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2872. //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  2873. MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
  2874. }
  2875. else
  2876. {
  2877. //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2878. (msgarrvd)(context, topicName, topicLen, message);
  2879. }
  2880. };
  2881. //接管回调
  2882. // pMqttClient->set_message_callback(action_func);
  2883. MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
  2884. //pMqttClient->subscribe(pszRespTopic, 1);
  2885. MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2886. MQTTProperties prop = MQTTProperties_initializer;
  2887. // std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
  2888. //pMqttClient->subscribe(pszTopic, 1, opts);
  2889. MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
  2890. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  2891. //pMqttClient->publish(pszTopic, req.encode());
  2892. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  2893. MQTTClient_message* m = NULL;
  2894. MQTTClient_deliveryToken dt;
  2895. MQTTProperty property;
  2896. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  2897. property.value.data.data = "test user property";
  2898. property.value.data.len = (int)strlen(property.value.data.data);
  2899. property.value.value.data = "test user property value";
  2900. property.value.value.len = (int)strlen(property.value.value.data);
  2901. MQTTProperties properties = MQTTProperties_initializer;
  2902. MQTTProperties_add(&properties, &property);
  2903. //pConn->publish(pszTopic, pCmd->encode());
  2904. const char* pLoad = req.encode();
  2905. MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
  2906. // std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2907. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  2908. if (ret == WAIT_OBJECT_0) {
  2909. //等到应答了
  2910. return 2;
  2911. }
  2912. return 0;
  2913. }
  2914. */
  2915. /// <summary>
  2916. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  2917. /// </summary>
  2918. /// <param name="hConnection"></param>
  2919. /// <param name="pAction"></param>
  2920. /// <param name="pContext"></param>
  2921. /// <param name="pszTopic"></param>
  2922. /// <param name="resObj"></param>
  2923. /// <param name="hEvent"></param>
  2924. /// <param name="dwWaitTime"></param>
  2925. /// <returns></returns>
  2926. LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
  2927. const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
  2928. {
  2929. // QoS=2消息握手需要时间(PUBLISH→PUBREC→PUBREL→PUBCOMP)
  2930. // 连续发送时增加50ms延时,避免握手状态冲突
  2931. const std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2932. static std::map<std::string, DWORD> lastSendTime;
  2933. DWORD now = GetTickCount();
  2934. DWORD elapsed = (lastSendTime.count(client_id) > 0) ? (now - lastSendTime[client_id]) : 1000;
  2935. if (elapsed < 50) {
  2936. DWORD sleepMs = 50 - elapsed;
  2937. FINFO("[ActionAndRespWithConnDefalt] {$} - Sleeping {$}ms to avoid QoS handshake conflict", client_id, sleepMs);
  2938. usleep(sleepMs * 1000);
  2939. }
  2940. lastSendTime[client_id] = GetTickCount();
  2941. // std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2942. if (hConnection == nullptr)
  2943. {
  2944. return 0;
  2945. }
  2946. DWORD dwTick = GetTickCount();
  2947. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2948. std::shared_ptr<LinuxEvent> hEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  2949. ResDataObject resContext;
  2950. string strResObject;
  2951. char* pszPad = 0;
  2952. char* pszPad2 = 0;
  2953. auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool {
  2954. FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
  2955. // std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
  2956. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  2957. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  2958. {
  2959. if (GetTickCount() - dwTick < dwWaitTime)
  2960. {
  2961. // 移除 encode() 调用避免内存问题
  2962. FDEBUG("ActionAndRespWithConnDefalt Packet Content {$}", PacketAnalizer::GetPacketKey(rsp));
  2963. resObj = *rsp;
  2964. return true;
  2965. }
  2966. else
  2967. {
  2968. // 移除 encode() 调用避免内存问题
  2969. FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout", PacketAnalizer::GetPacketKey(rsp));
  2970. return false;
  2971. }
  2972. // std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
  2973. }
  2974. // 移除 encode() 调用避免内存问题
  2975. FDEBUG("ActionAndRespWithConnDefalt what ? {$}", PacketAnalizer::GetPacketKey(rsp));
  2976. // std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
  2977. return false;
  2978. };
  2979. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2980. pLock->Thread_Lock();
  2981. // 移除 encode() 调用避免内存问题
  2982. FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic);
  2983. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  2984. ResDataObject *resActions = std::get<FILTER_RES_OBJ_ID>(*pfilter);
  2985. resActions->add(pAction, pszTopic);
  2986. std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
  2987. std::get<FILTER_FUNC_ID>(*pfilter) = func;
  2988. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection).c_str());
  2989. const char* pLoad = req.encode();
  2990. ResDataObject* pPacket = new ResDataObject();
  2991. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2992. pPacket->add(pszTopic, pLoad);
  2993. // 移除 encode() 调用避免内存问题
  2994. FDEBUG(" {$} Try push packet to Send list", client_id);
  2995. pList->push_back(pPacket);
  2996. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2997. /*
  2998. mqtt_message_t msg;
  2999. memset(&msg, 0, sizeof(msg));
  3000. msg.payload = (void*)pLoad;
  3001. msg.qos = MQTT_QOS;
  3002. msg.payloadlen = strlen(pLoad);
  3003. //int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  3004. int rc = MQTT_SUCCESS_ERROR;
  3005. dwTick = GetTickCount();
  3006. int nTryTimes = 0;
  3007. do
  3008. {
  3009. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  3010. nTryTimes++;
  3011. if (rc != MQTT_SUCCESS_ERROR)
  3012. {
  3013. FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id);
  3014. //mqtt_do_reconnect(pMqttClient);
  3015. mqtt_disconnect(pMqttClient);
  3016. rc = mqtt_connect(pMqttClient);
  3017. if (rc == MQTT_SUCCESS_ERROR)
  3018. {
  3019. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  3020. FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  3021. }
  3022. Sleep(2);
  3023. }
  3024. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3);
  3025. if (nTryTimes >= 1)
  3026. FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, nTryTimes, rc);
  3027. */
  3028. pLock->Thread_UnLock();
  3029. dwTick = GetTickCount() - dwTick;
  3030. //FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
  3031. // std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
  3032. dwTick = GetTickCount();
  3033. DWORD ret = hEvent->Wait(dwWaitTime);
  3034. if (ret) {
  3035. //等到应答了
  3036. dwTick = GetTickCount() - dwTick;
  3037. pLock->Thread_Lock();
  3038. ResDataObject* pResp = std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
  3039. for (int x = 0; x < pResp->size(); x++)
  3040. {
  3041. ResDataObject r = (*pResp)[x];
  3042. if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r))
  3043. {
  3044. resObj = r;
  3045. pResp->eraseOneOf(pAction, x);
  3046. break;
  3047. }
  3048. }
  3049. pLock->Thread_UnLock();
  3050. // std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
  3051. FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
  3052. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  3053. return 2;
  3054. }
  3055. //std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  3056. //resObj.decode(strResObject.c_str());
  3057. FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
  3058. // std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
  3059. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  3060. return 0;
  3061. }
  3062. /// <summary>
  3063. /// 新建MQTT连接发送Ation并等待应答
  3064. /// </summary>
  3065. /// <param name="pAction"></param>
  3066. /// <param name="pContext"></param>
  3067. /// <param name="pszTopic"></param>
  3068. /// <param name="pszRespTopic"></param>
  3069. /// <param name="resObj"></param>
  3070. /// <param name="dwWaitTime"></param>
  3071. /// <param name="pszSenderName"></param>
  3072. /// <returns></returns>
  3073. LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  3074. DWORD dwWaitTime, const char* pszSenderName)
  3075. {
  3076. ResDataObject req;
  3077. if (pszRespTopic != nullptr)
  3078. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  3079. //临时创建连接并发送和接收应答
  3080. char pszClientID[256];
  3081. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  3082. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  3083. getpid(), GetCurrentThreadId());
  3084. sem_t sem;
  3085. if (sem_init(&sem, 0, 0) != 0) {
  3086. return 0; // 如果初始化失败,返回0
  3087. }
  3088. // std::cout << " ActionAndResp->NewConnection " << endl;
  3089. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) {
  3090. //应答到了,处理
  3091. PacketAnalizer::GetPacketContext(req, resObj);
  3092. sem_post(&sem);
  3093. });
  3094. ////发布消息,并等待应答
  3095. PublishAction(&req, pszTopic, connObj);
  3096. // 等待应答或超时
  3097. struct timespec ts;
  3098. clock_gettime(CLOCK_REALTIME, &ts);
  3099. ts.tv_sec += dwWaitTime / 1000; // 转换为秒
  3100. ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒
  3101. // 等待信号量或者超时
  3102. int ret = 0;
  3103. while (ret == 0) {
  3104. ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量
  3105. if (ret == -1 && errno == ETIMEDOUT) {
  3106. // 超时
  3107. sem_destroy(&sem);
  3108. CloseConnection(connObj);
  3109. return 0; // 超时返回0
  3110. }
  3111. }
  3112. // 等到应答了
  3113. sem_destroy(&sem); // 销毁信号量
  3114. CloseConnection(connObj);
  3115. return 2; // 返回2表示应答成功
  3116. }