LogicDevice.cpp 109 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508
  1. // LogicDevice.cpp : 定义 DLL 应用程序的导出函数。
  2. //
  3. #include <cstring>
  4. #include <iostream>
  5. #include <chrono>
  6. #include "LogicDevice.h"
  7. #include <stddef.h> // 用于 offsetof
  8. #include "PacketAnalizer.h"
  9. #include "MessageInfo.h"
  10. #include "common_api.h"
  11. #include "LocalConfig.h"
  12. #include "Base64.h"
  13. #include "SystemLogger.hpp"
  14. #include "mqttclient.h"
  15. #include "LinuxEvent.h"
  16. //Log4CPP::Logger* ////mLog::gLogger = nullptr;
  17. void msgarrivd(void* client, message_data_t* message);
  18. std::string CurrentDateTime();
  19. //-------------Logic Device SysIF--------------------------
  20. LogicDeviceSysIF::LogicDeviceSysIF(void)
  21. {
  22. }
  23. LogicDeviceSysIF::~LogicDeviceSysIF(void)
  24. {
  25. }
  26. //init
  27. void LogicDeviceSysIF::SetLogicDevice(LogicDevice *p)
  28. {
  29. m_pLogicDev = p;
  30. //p->SetSysLogicDevice(this);
  31. };
  32. LogicDevice* LogicDeviceSysIF::GetLogicDevice()
  33. {
  34. return m_pLogicDev;
  35. }
  36. //Command In and Out
  37. //notify from lower layer
  38. RET_STATUS HW_ACTION LogicDeviceSysIF::CmdFromLogicDev(ResDataObject PARAM_IN *pCmd)
  39. {
  40. return RET_FAILED;
  41. };
  42. //notify to lower layer
  43. RET_STATUS SYSTEM_CALL LogicDeviceSysIF::CmdToLogicDev(ResDataObject PARAM_IN *pCmd)
  44. {
  45. if (m_pLogicDev)
  46. {
  47. return m_pLogicDev->CmdToLogicDev(pCmd);
  48. }
  49. return RET_NOSUPPORT;
  50. };
  51. const std::string SERVER_ADDRESS("127.0.0.1");
  52. /*
  53. string DEVICE_ID; //AS CLIENT_ID
  54. //const std::string CLIENT_ID("paho_cpp_async_subcribe");
  55. //const std::string TOPIC("hello");
  56. mqtt::async_client* m_pMqttClient = NULL; //MQTT 对象
  57. const int QOS = 1;
  58. const int N_RETRY_ATTEMPTS = 5;*/
  59. using namespace std;
  60. LogicDevice* g_pDPCDeviceObject = NULL;
  61. string LogHost = "";
  62. //string DEVICE_ID;
  63. //-------------Data Logic Device--------------------------
  64. LogicDevice::LogicDevice(void)
  65. {
  66. m_EvtNotify = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  67. uuid_t uuid;
  68. char uuid_str[37];
  69. uuid_generate_random(uuid);
  70. uuid_unparse(uuid, uuid_str);
  71. m_pDevInstance = new char[40];
  72. strncpy(m_pDevInstance, uuid_str, 40);
  73. m_pResErrorList = new ResDataObject();
  74. sem_init(&m_SemphRequest, 0, 0);
  75. sem_init(&m_SemphPublish, 0, 0);
  76. m_pDrvDPC = NULL;
  77. g_pDPCDeviceObject = this;
  78. m_pMqttConntion = nullptr;
  79. //m_strServer = "tcp://localhost:1883";
  80. m_strServer = SERVER_ADDRESS;// "192.168.2.225";
  81. m_strServerPort = "1883";
  82. m_bMqttUseSSL = false;
  83. m_strMqttUser = "";
  84. m_strMqttPassword = "";
  85. m_strEBusRoot = "";
  86. m_strCCOSDevicePath = "";
  87. m_pParent = nullptr;
  88. m_topicFilter = nullptr;
  89. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  90. m_pPacketSendingQue = new MsgQueue<ResDataObject>();
  91. int x = 0;
  92. for ( x = 0; x < sizeof(szPad); x++)
  93. szPad[x] = 'A' + x % 26;
  94. szPad[x-1] = 0;
  95. memset(szPad2, 0, sizeof(szPad2));
  96. m_dwLastPacket = GetTickCount();
  97. }
  98. LogicDevice::~LogicDevice(void)
  99. {
  100. delete []m_pDevInstance;
  101. delete m_pResErrorList;
  102. sem_destroy(&m_SemphRequest);
  103. sem_destroy(&m_SemphPublish);
  104. //m_EvtNotify = NULL;
  105. delete m_pPacketReceivedQue;
  106. delete m_pPacketSendingQue;
  107. }
  108. void LogicDevice::SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot) {
  109. if (m_strClientID.length() > 0)
  110. {
  111. ////mLog::FINFO("Aready set RootID EBUS: [{$}] CCOS : [{$}] result m_strClientID [{$}]", pszEBusRoot, pszCCOSRoot, m_strClientID);
  112. return;
  113. }
  114. ////mLog::FINFO("Set RootID EBUS: [{$}] CCOS : [{$}]", pszEBusRoot, pszCCOSRoot);
  115. if (pszEBusRoot[0] == '/') {
  116. m_strEBusRoot = pszEBusRoot + 1;
  117. }
  118. else {
  119. m_strEBusRoot = pszEBusRoot;
  120. }
  121. char szKeys[256];
  122. strcpy(szKeys, pszEBusRoot);
  123. char* pt = szKeys;
  124. while (*pt != 0)
  125. {
  126. if (*pt == '/' || *pt == '{' || *pt == '}'|| *pt == '-')
  127. *pt = '_';
  128. pt++;
  129. }
  130. m_strClientID = CCOS_CLIENT_ID_PREFIX;
  131. if (szKeys[0] == '_')
  132. m_strClientID += (szKeys + 1);
  133. else
  134. m_strClientID += szKeys;
  135. if (pszCCOSRoot != nullptr)
  136. {
  137. std::cout << "LogicDevice::SetClientRootID [ Set CCOS path is" << pszCCOSRoot << "]" << std::endl;
  138. ////mLog::FINFO("Set CCOS path: {$}", pszCCOSRoot);
  139. m_strCCOSDevicePath = pszCCOSRoot;
  140. int nPos = m_strCCOSDevicePath.find('/');
  141. if (nPos != string::npos)
  142. {
  143. //CCOS/
  144. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  145. //CCOS/DEVICE/
  146. if (nPos != string::npos)
  147. {
  148. m_strCCOSRoot = m_strCCOSDevicePath.substr(0, nPos);
  149. //CCOS/DEVICE/Generator
  150. nPos = m_strCCOSDevicePath.find('/', nPos + 1);
  151. if (nPos != string::npos)
  152. {
  153. m_strAbstractPath = m_strCCOSDevicePath.substr(0, nPos);
  154. }
  155. }
  156. }
  157. ////mLog::FINFO("Set CCOS path: CCOSROOT {$} AbstractPath {$} ", m_strCCOSRoot, m_strAbstractPath);
  158. }
  159. ////mLog::FINFO("Set MQTT ClientName {$} @CCOSRoot {$}", m_strClientID, m_strCCOSDevicePath);
  160. SetName(m_strClientID.c_str());
  161. OnSetClientID();
  162. }
  163. void LogicDevice::OnSetClientID() {
  164. }
  165. void LogicDevice::SubscribeSelf() {
  166. size_t topicCount = 0;
  167. if (m_strEBusRoot.length() > 0) topicCount++;
  168. if (m_strCCOSDevicePath.length() > 0) topicCount++;
  169. if (m_strAbstractPath.length() > 0) {
  170. topicCount++;
  171. if (m_strDevicePath.length() > 0) topicCount++;
  172. }
  173. if (m_strCCOSRoot.length() > 0) topicCount++;
  174. topicCount += 7 * 3;
  175. m_subscribedTopics.reserve(topicCount);
  176. if (m_strDevicePath.length() > 0 && m_strDevicePath[0] != '/') {
  177. m_strDevicePath = "/" + m_strDevicePath;
  178. }
  179. ////mLog::FINFO("{$} try Subscribe {$} use conn {$} ", m_strClientID, m_strEBusRoot, (UINT64)m_pMqttConntion);
  180. std::cout << "----***** " << m_strClientID << " [" << m_strEBusRoot << "] use conn" << (UINT64)m_pMqttConntion << endl;
  181. if (m_strEBusRoot.length() > 0) {
  182. m_subscribedTopics.push_back(m_strEBusRoot);
  183. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  184. }
  185. if (m_strCCOSDevicePath.length() > 0)
  186. {
  187. ////mLog::FINFO("CCOSDevice [{$}] ", m_strCCOSDevicePath + m_strDevicePath);
  188. std::string fullTopic = m_strCCOSDevicePath + m_strDevicePath;
  189. m_subscribedTopics.push_back(fullTopic);
  190. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  191. //订阅
  192. }
  193. ////mLog::FINFO("AbstractPath [{$}] ", m_strAbstractPath);
  194. if (m_strAbstractPath.length() > 0)
  195. {
  196. m_subscribedTopics.push_back(m_strAbstractPath);
  197. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  198. if (m_strDevicePath.length() > 0)
  199. {
  200. std::string fullAbstractTopic = m_strAbstractPath + m_strDevicePath;
  201. m_subscribedTopics.push_back(fullAbstractTopic);
  202. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  203. }
  204. }
  205. ////mLog::FINFO("CCOSRoot [{$}] ", m_strCCOSRoot);
  206. if (m_strCCOSRoot.length() > 0)
  207. {
  208. m_subscribedTopics.push_back(m_strCCOSRoot);
  209. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  210. //订阅
  211. }
  212. SubscribeActions();
  213. }
  214. void LogicDevice::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  215. {
  216. if (bSubscribe)
  217. {
  218. m_subscribedTopics.push_back(pszTopic);
  219. SubscribeTopic(m_pMqttConntion, m_subscribedTopics.back().c_str());
  220. }
  221. else
  222. UnSubscribe(m_pMqttConntion, pszTopic);
  223. }
  224. void LogicDevice::NotifyDrvThread()
  225. {
  226. m_EvtNotify->SetEvent();
  227. }
  228. std::shared_ptr<LinuxEvent> LogicDevice::GetEvtHandle()
  229. {
  230. return m_EvtNotify;
  231. }
  232. //1. init part
  233. //void LogicDevice::SetSysLogicDevice(LogicDeviceSysIF *pLogic)
  234. //{
  235. // m_pSysLogic = pLogic;
  236. //}
  237. //void LogicDevice::SetLogHandle(Logger PARAM_IN *pLogger)
  238. //{
  239. // m_pLogger = pLogger;
  240. //}
  241. //
  242. //Logger *LogicDevice::GetLogHandle()
  243. //{
  244. // return m_pLogger;
  245. //}
  246. void LogicDevice::SetDrvDPC(DriverDPC *pDPC)
  247. {
  248. m_pDrvDPC = pDPC;
  249. }
  250. DriverDPC *LogicDevice::GetDrvDPC()
  251. {
  252. return m_pDrvDPC;
  253. }
  254. RET_STATUS LogicDevice::AddEbusChildren(LogicDevice* pChild, const char* szEbusDevPath)
  255. {
  256. if (szEbusDevPath == nullptr)
  257. return RET_FAILED;
  258. for (auto dev : m_subDevices) {
  259. if (dev->GetRootPath() == szEbusDevPath)
  260. return RET_SUCCEED;
  261. }
  262. m_subDevices.push_back(pChild);
  263. return RET_SUCCEED;
  264. }
  265. RET_STATUS LogicDevice::AddCcosChildren(LogicDevice* pChild, const char* szCcosDevPath)
  266. {
  267. if (szCcosDevPath == nullptr)
  268. return RET_FAILED;
  269. for (auto dev : m_subCcosDevices) {
  270. if (dev->GetCcosRootPath() == szCcosDevPath)
  271. return RET_SUCCEED;
  272. }
  273. m_subCcosDevices.push_back(pChild);
  274. return RET_SUCCEED;
  275. }
  276. LogicDevice* LogicDevice::GetEbusChild(const char* szEbusDevPath)
  277. {
  278. for (auto dev : m_subDevices) {
  279. if (dev->GetRootPath() == szEbusDevPath)
  280. return dev;
  281. }
  282. return nullptr;
  283. }
  284. LogicDevice* LogicDevice::GetCcosChild(const char* szCcosDevPath)
  285. {
  286. for (auto dev : m_subCcosDevices) {
  287. if (dev->GetCcosRootPath() == szCcosDevPath)
  288. return dev;
  289. }
  290. return nullptr;
  291. }
  292. void SYSTEM_CALL LogicDevice::CompleteInit()
  293. {
  294. //if (//mLog::gLogger == nullptr)
  295. //{
  296. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  297. // LogHost = ((string)getLogRootpath()).c_str();
  298. // if (LogHost.length() <= 1)
  299. // {
  300. // char szName[256];
  301. // sprintf(szName, "/LogicDevice_%08d", GetCurrentProcessId());
  302. // LogHost = szName;
  303. // }
  304. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  305. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  306. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  307. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  308. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  309. // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
  310. //}
  311. //else
  312. //{
  313. // string strRoot = ((string)getLogRootpath()).c_str();
  314. // if (strRoot.length() > 1 && strRoot != LogHost)
  315. // {
  316. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  317. // LogHost = strRoot;
  318. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicDevice");
  319. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  320. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  321. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  322. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicDevice");
  323. // }
  324. //}
  325. //string version;
  326. //if (GetVersion(version, hMyModule))
  327. // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
  328. //else
  329. ////mLog::FINFO("\n===============log begin : version:1.0.0.0 ===================\n");
  330. ////mLog::FINFO("{$}Connect MQTT Server {$}:{$}", m_strServer, m_strServerPort, m_strClientID);
  331. if (m_strClientID.length() <= 0)
  332. {
  333. ////mLog::FWARN("No Client name ......" );
  334. std::cout << "No Client name ......." << endl;
  335. return;
  336. }
  337. std::cout << "CompleteInit->NewConnection ......." << endl;
  338. m_pMqttConntion = NewConnection(m_strServer.c_str(), m_strServerPort.c_str(), m_strMqttUser.c_str(), m_strMqttPassword.c_str(), m_strClientID.c_str(),
  339. [this](ResDataObject* req, const char* topic, void* conn) {
  340. //这里只处理当前层次设备的请求,下一层的直接靠URI来路由
  341. //首先根据topic判断是请求我的,还是我请求的,请求我的topic 是 以 ROOT开头的URI
  342. // 发送给我的,如果需要应答,则需要调用Request返回Resp,否则直接调用,不返回应答
  343. // 消息处理如果耗时较长,则需要开线程来处理
  344. if (strncmp(topic, m_strEBusRoot.c_str(), m_strEBusRoot.length()) == 0 ||
  345. strncmp(topic, m_strCCOSDevicePath.c_str(), m_strCCOSDevicePath.length()) == 0)
  346. {
  347. //主题以ebus或者ccos开头,给我发的消息,进行处理
  348. ProcessSubscribeRequest(req);
  349. }
  350. else
  351. {
  352. //我主动订阅的外部模块的消息
  353. ProcessSubscribeMsg(req);
  354. }
  355. //CmdToLogicDev(req);
  356. });
  357. if (m_pMqttConntion != nullptr)
  358. {
  359. StartThread(); //启动Request线程
  360. SubscribeSelf();
  361. //DWORD dwThreadID = 0;
  362. //CreateThread(0, 0, Thread_Publish_Thread, this, 0, &dwThreadID);
  363. //////mLog::FINFO("[{$}] Publish Thread id [{$}]", m_strClientID, dwThreadID);
  364. }
  365. }
  366. RET_STATUS LogicDevice::ProcessSubscribeRequest(ResDataObject* req) {
  367. PACKET_TYPE type = PacketAnalizer::GetPacketType(req);
  368. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(req);
  369. switch (type) {
  370. case PACKET_TYPE_REQ:
  371. PacketAnalizer::GetPacketTransaction(req, m_strCurTransaction);
  372. ProcessRequest(req, cmd);//请求
  373. break;
  374. case PACKET_TYPE_RES:
  375. ProcessResponse(req, cmd);//应答
  376. break;
  377. case PACKET_TYPE_NOTIFY:
  378. ProcessNotify(req, cmd);
  379. //通知
  380. break;
  381. }
  382. return RET_FAILED;
  383. }
  384. RET_STATUS LogicDevice::ProcessSubscribeMsg(ResDataObject* pCmd)
  385. {
  386. ProcessSubscribeRequest(pCmd);
  387. return RET_SUCCEED;
  388. }
  389. RET_STATUS LogicDevice::ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd)
  390. {
  391. //Open命令
  392. if (cmd == PACKET_CMD_OPEN) {
  393. /* {
  394. "IDX": "40",
  395. "TYPE" : "0",
  396. "CMD" : "0",
  397. "HANDLE" :
  398. {
  399. "ROUTE": "1",
  400. "FLAGS" : "63",
  401. "LANG" : "en-US",
  402. "HANDLEID" : "0",
  403. "OWNERID" :
  404. {
  405. "EBUSID": "ImageSave",
  406. "MACHINEID" : "DESKTOP-FVD53H8",
  407. "PROCID" : "35408",
  408. "ADDR" : "2631366957696"
  409. },
  410. "DEVID":
  411. {
  412. "EBUSID": "ccosChannel",
  413. "MACHINEID" : "",
  414. "PROCID" : "0",
  415. "ADDR" : "0"
  416. }
  417. },
  418. "KEY": "\/ccosChannel"
  419. ResDataObject resRes, resResponse;
  420. ResDataObject resTopic;
  421. PacketAnalizer::GetContextTopic(pCmd, resTopic);
  422. if (cmd == PACKET_CMD_OPEN )
  423. {
  424. //std::cout << "publis " << (const char*)resTopic << "msg body" << resResponse.encode() << endl;
  425. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  426. return RET_SUCCEED;
  427. }
  428. if (cmd == PACKET_CMD_CLOSE)
  429. {
  430. //CLOSE 客户端主动断开
  431. }
  432. } */
  433. PacketArrived(pCmd);
  434. return RET_SUCCEED;
  435. }
  436. else if (cmd == PACKET_CMD_CLOSE)
  437. {
  438. ResDataObject resp;
  439. InnerOpenClose(*pCmd, resp, false);
  440. }
  441. else {
  442. PacketArrived(pCmd);
  443. }
  444. return RET_SUCCEED;
  445. }
  446. RET_STATUS LogicDevice::ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd)
  447. {
  448. return RET_SUCCEED;
  449. }
  450. RET_STATUS LogicDevice::ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd)
  451. {
  452. PacketArrived(pCmd);
  453. return RET_SUCCEED;
  454. }
  455. void LogicDevice::PacketArrived(ResDataObject* pRequest)
  456. {
  457. //m_pPacketReceivedQue->Lock();
  458. ////mLog::FINFO("PacketArrived msg [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  459. //if (Thread_Lock(5000) != WAIT_OBJECT_0)
  460. //{
  461. // ////mLog::FERROR("PacketArrived Lock Timeout for msg [id: {$} ]: cmd {$} key: ", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  462. // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  463. // return ;
  464. //}
  465. m_pPacketReceivedQue->InQueue(*pRequest);
  466. ////mLog::FINFO("PacketArrived msg INTO QUEUE [id: {$} ]: cmd {$} key: [{$}]", m_strClientID, (int)PacketAnalizer::GetPacketCmd(pRequest), PacketAnalizer::GetPacketKey(pRequest));
  467. //SetEvent(m_EvtNotify);//notify to user
  468. long nLast = 0;
  469. int released = sem_post(&m_SemphRequest); // 释放信号量,通知等待的线程
  470. if (released <= 0)
  471. {
  472. ////mLog::FERROR("PacketArrived ReleaseSemaphore failed {$} ", GetLastError());
  473. }
  474. //Thread_UnLock();
  475. //m_pPacketReceivedQue->UnLock();
  476. }
  477. bool LogicDevice::OnStartThread()
  478. {
  479. return true;
  480. }
  481. bool LogicDevice::OnEndThread()
  482. {
  483. return true;
  484. }
  485. //单一发送线程,暂时未用
  486. DWORD LogicDevice::Thread_Publish_Thread(void* pPara)
  487. {
  488. INT ret = 0;
  489. int waitevent = 0;
  490. LogicDevice* handle = (LogicDevice*)pPara;
  491. //prev work usleep(30000);
  492. ////mLog::FINFO("Thread Start [{$}]", handle->m_strClientID);
  493. while (!handle->m_ExitFlag->Wait(1))
  494. {
  495. //do work
  496. ResDataObject resSend;
  497. DWORD wait = handle->m_pPacketSendingQue->WaitForInQue(100);
  498. if (wait != WAIT_OBJECT_0)
  499. continue;
  500. bool bHasPacket = handle->m_pPacketSendingQue->DeQueue(resSend);
  501. if (bHasPacket)
  502. {
  503. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&resSend);
  504. PACKET_TYPE type = PacketAnalizer::GetPacketType(&resSend);
  505. ////mLog::FINFO(" [{$}] Publish key [{$}] type [{$}] cmd [{$}] ", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd);
  506. if (PACKET_CMD_NONE == cmd)
  507. {
  508. ////mLog::FDEBUG(" [{$}] Publish what packet {$} ", handle->m_strClientID, resSend.encode());
  509. }
  510. if (type == PACKET_TYPE_NOTIFY)
  511. {
  512. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  513. PacketAnalizer::UpdateNotifyHandle(resSend, *pHandle);
  514. ////mLog::FINFO("Notify Transaction: {$}", handle->m_strCurTransaction);
  515. PacketAnalizer::UpdatePacketTransaction(resSend, handle->m_strCurTransaction);
  516. PacketAnalizer::UpdateDeviceNotifyResponse(resSend, getLocalMachineId(), getLocalEbusId(), (UINT64)pthread_self(), (UINT64)handle->m_pMqttConntion);
  517. ////mLog::FDEBUG("Notify: {$}", resSend.encode());
  518. //printf("--> %s \n", pCmd->encode());
  519. PublishAction(&resSend, (handle->m_strEBusRoot + "/Notify").c_str(), handle->m_pMqttConntion);
  520. PublishAction(&resSend, (handle->m_strCCOSRoot + "/Notify/" + PacketAnalizer::GetPacketKey(&resSend)).c_str(), handle->m_pMqttConntion);
  521. delete pHandle;
  522. }
  523. else
  524. {
  525. ResDataObject resTopic;
  526. PacketAnalizer::GetPacketTopic(&resSend, resTopic);
  527. //PacketAnalizer::GetPacketTopicGetPacketTopic
  528. //PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic);
  529. ////mLog::FINFO(" [{$}] Publish [{$}] type [{$}] cmd [{$}] to [{$}]", handle->m_strClientID, PacketAnalizer::GetPacketKey(&resSend), (int)type, (int)cmd, (const char*)resTopic);
  530. PublishAction(&resSend, (const char*)resTopic, handle->m_pMqttConntion);
  531. }
  532. }
  533. }
  534. return 0;
  535. }
  536. RET_STATUS LogicDevice::DevOpen(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  537. {
  538. cout << "LogicDevice::DevOpen - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  539. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  540. ResDataObject req, reqParam;
  541. reqParam = pszGroup;
  542. cout << "LogicDevice::DevOpen - Creating OPEN request. Command: PACKET_CMD_OPEN" << endl;
  543. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_OPEN, &reqParam);
  544. RET_STATUS ret = InnerOpenClose(req, resRespons, true);
  545. cout << "LogicDevice::DevOpen - InnerOpenClose returned: " << ret << endl;
  546. return ret;
  547. }
  548. RET_STATUS LogicDevice::DevClose(const char* pszDevUri, const char* pszGroup, ResDataObject& resRespons)
  549. {
  550. cout << "LogicDevice::DevClose - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  551. << ", pszGroup: " << (pszGroup ? pszGroup : "nullptr") << endl;
  552. ResDataObject req, reqParam;
  553. reqParam = pszGroup;
  554. cout << "LogicDevice::DevClose - Creating CLOSE request. Command: PACKET_CMD_CLOSE" << endl;
  555. PacketAnalizer::MakeRequest(req, pszDevUri, PACKET_CMD_CLOSE, &reqParam);
  556. RET_STATUS ret = InnerOpenClose(req, resRespons, false);
  557. cout << "LogicDevice::DevClose - InnerOpenClose returned: " << ret << endl;
  558. return ret;
  559. }
  560. /// <summary>
  561. /// Get device property
  562. /// </summary>
  563. RET_STATUS LogicDevice::DevGet(const char* pszDevUri, const char* pszProperty, ResDataObject& resRespons)
  564. {
  565. cout << "LogicDevice::DevGet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  566. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr") << endl;
  567. ResDataObject req, reqParam;
  568. cout << "LogicDevice::DevGet - Creating GET request. Command: PACKET_CMD_GET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  569. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_GET, &reqParam);
  570. RET_STATUS ret = Request(&req, &resRespons);
  571. cout << "LogicDevice::DevGet - Request returned: " << ret << endl;
  572. return ret;
  573. }
  574. /// <summary>
  575. /// Set device property
  576. /// </summary>
  577. RET_STATUS LogicDevice::DevSet(const char* pszDevUri, const char* pszProperty, const char* pszValueSet, ResDataObject& resRespons)
  578. {
  579. cout << "LogicDevice::DevSet - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  580. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  581. << ", pszValueSet: " << (pszValueSet ? pszValueSet : "nullptr") << endl;
  582. ResDataObject req, reqParam;
  583. if (pszValueSet != nullptr && pszValueSet[0] != 0) {
  584. if (pszValueSet[0] == '{') {
  585. cout << "LogicDevice::DevSet - Decoding JSON value for property" << endl;
  586. reqParam.decode(pszValueSet);
  587. }
  588. else {
  589. cout << "LogicDevice::DevSet - Using raw value for property" << endl;
  590. reqParam = pszValueSet;
  591. }
  592. }
  593. else {
  594. cout << "LogicDevice::DevSet - No value provided for property" << endl;
  595. }
  596. cout << "LogicDevice::DevSet - Creating SET request. Command: PACKET_CMD_SET, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  597. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_SET, &reqParam);
  598. RET_STATUS ret = Request(&req, &resRespons);
  599. cout << "LogicDevice::DevSet - Request returned: " << ret << endl;
  600. return ret;
  601. }
  602. /// <summary>
  603. /// Update device property
  604. /// </summary>
  605. RET_STATUS LogicDevice::DevUpdate(const char* pszDevUri, const char* pszProperty, const char* pszValueUpdate, ResDataObject& resRespons)
  606. {
  607. cout << "LogicDevice::DevUpdate - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  608. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  609. << ", pszValueUpdate: " << (pszValueUpdate ? pszValueUpdate : "nullptr") << endl;
  610. ResDataObject req, reqParam;
  611. if (pszValueUpdate != nullptr && pszValueUpdate[0] != 0) {
  612. if (pszValueUpdate[0] == '{') {
  613. cout << "LogicDevice::DevUpdate - Decoding JSON value for update" << endl;
  614. reqParam.decode(pszValueUpdate);
  615. }
  616. else {
  617. cout << "LogicDevice::DevUpdate - Using raw value for update" << endl;
  618. reqParam = pszValueUpdate;
  619. }
  620. }
  621. else {
  622. cout << "LogicDevice::DevUpdate - No update value provided" << endl;
  623. }
  624. cout << "LogicDevice::DevUpdate - Creating UPDATE request. Command: PACKET_CMD_UPDATE, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  625. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_UPDATE, &reqParam);
  626. RET_STATUS ret = Request(&req, &resRespons);
  627. cout << "LogicDevice::DevUpdate - Request returned: " << ret << endl;
  628. return ret;
  629. }
  630. /// <summary>
  631. /// Add device property
  632. /// </summary>
  633. RET_STATUS LogicDevice::DevAdd(const char* pszDevUri, const char* pszProperty, const char* pszValueAdd, ResDataObject& resRespons)
  634. {
  635. cout << "LogicDevice::DevAdd - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  636. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  637. << ", pszValueAdd: " << (pszValueAdd ? pszValueAdd : "nullptr") << endl;
  638. ResDataObject req, reqParam;
  639. if (pszValueAdd != nullptr && pszValueAdd[0] != 0) {
  640. if (pszValueAdd[0] == '{') {
  641. cout << "LogicDevice::DevAdd - Decoding JSON value for addition" << endl;
  642. reqParam.decode(pszValueAdd);
  643. }
  644. else {
  645. cout << "LogicDevice::DevAdd - Using raw value for addition" << endl;
  646. reqParam = pszValueAdd;
  647. }
  648. }
  649. else {
  650. cout << "LogicDevice::DevAdd - No value provided for addition" << endl;
  651. }
  652. cout << "LogicDevice::DevAdd - Creating ADD request. Command: PACKET_CMD_ADD, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  653. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_ADD, &reqParam);
  654. RET_STATUS ret = Request(&req, &resRespons);
  655. cout << "LogicDevice::DevAdd - Request returned: " << ret << endl;
  656. return ret;
  657. }
  658. /// <summary>
  659. /// Delete device property
  660. /// </summary>
  661. RET_STATUS LogicDevice::DevDel(const char* pszDevUri, const char* pszProperty, const char* pszValueDel, ResDataObject& resRespons)
  662. {
  663. cout << "LogicDevice::DevDel - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  664. << ", pszProperty: " << (pszProperty ? pszProperty : "nullptr")
  665. << ", pszValueDel: " << (pszValueDel ? pszValueDel : "nullptr") << endl;
  666. ResDataObject req, reqParam;
  667. if (pszValueDel != nullptr && pszValueDel[0] != 0) {
  668. if (pszValueDel[0] == '{') {
  669. cout << "LogicDevice::DevDel - Decoding JSON value for deletion" << endl;
  670. reqParam.decode(pszValueDel);
  671. }
  672. else {
  673. cout << "LogicDevice::DevDel - Using raw value for deletion" << endl;
  674. reqParam = pszValueDel;
  675. }
  676. }
  677. else {
  678. cout << "LogicDevice::DevDel - No value provided for deletion" << endl;
  679. }
  680. cout << "LogicDevice::DevDel - Creating DEL request. Command: PACKET_CMD_DEL, Property: " << (pszProperty ? pszProperty : "nullptr") << endl;
  681. PacketAnalizer::MakeRequest(req, pszProperty, PACKET_CMD_DEL, &reqParam);
  682. RET_STATUS ret = Request(&req, &resRespons);
  683. cout << "LogicDevice::DevDel - Request returned: " << ret << endl;
  684. return ret;
  685. }
  686. /// <summary>
  687. /// Execute device action
  688. /// </summary>
  689. RET_STATUS LogicDevice::DevAction(const char* pszDevUri, const char* pszActionName, const char* pszParams, ResDataObject& resRespons)
  690. {
  691. cout << "LogicDevice::DevAction - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  692. << ", pszActionName: " << (pszActionName ? pszActionName : "nullptr")
  693. << ", pszParams: " << (pszParams ? pszParams : "nullptr") << endl;
  694. ResDataObject req, reqParam;
  695. if (pszParams != nullptr && pszParams[0] != 0) {
  696. if (pszParams[0] == '{') {
  697. cout << "LogicDevice::DevAction - Decoding JSON parameters for action" << endl;
  698. reqParam.decode(pszParams);
  699. }
  700. else {
  701. cout << "LogicDevice::DevAction - Using raw parameters for action" << endl;
  702. reqParam = pszParams;
  703. }
  704. }
  705. else {
  706. cout << "LogicDevice::DevAction - No parameters provided for action" << endl;
  707. }
  708. cout << "LogicDevice::DevAction - Creating EXE request. Command: PACKET_CMD_EXE, Action: " << (pszActionName ? pszActionName : "nullptr") << endl;
  709. PacketAnalizer::MakeRequest(req, pszActionName, PACKET_CMD_EXE, &reqParam);
  710. ResDataObject resResult;
  711. RET_STATUS ret = Request(&req, &resResult);
  712. cout << "LogicDevice::DevAction - Request returned: " << ret << endl;
  713. PacketAnalizer::GetPacketContext(&resResult, resRespons);
  714. cout << "LogicDevice::DevAction - Extracted packet context to response" << endl;
  715. return ret;
  716. }
  717. /// <summary>
  718. /// Send message to device
  719. /// </summary>
  720. RET_STATUS LogicDevice::DevMessage(const char* pszDevUri, const char* pszTopic, const char* pszMessageValue, ResDataObject& resRespons)
  721. {
  722. cout << "LogicDevice::DevMessage - Entering. pszDevUri: " << (pszDevUri ? pszDevUri : "nullptr")
  723. << ", pszTopic: " << (pszTopic ? pszTopic : "nullptr")
  724. << ", pszMessageValue: " << (pszMessageValue ? pszMessageValue : "nullptr") << endl;
  725. ResDataObject req, reqParam;
  726. if (pszMessageValue != nullptr && pszMessageValue[0] != 0) {
  727. if (pszMessageValue[0] == '{') {
  728. cout << "LogicDevice::DevMessage - Decoding JSON message value" << endl;
  729. reqParam.decode(pszMessageValue);
  730. }
  731. else {
  732. cout << "LogicDevice::DevMessage - Using raw message value" << endl;
  733. reqParam = pszMessageValue;
  734. }
  735. }
  736. else {
  737. cout << "LogicDevice::DevMessage - No message value provided" << endl;
  738. }
  739. cout << "LogicDevice::DevMessage - Creating MSG request. Command: PACKET_CMD_MSG, Topic: " << (pszTopic ? pszTopic : "nullptr") << endl;
  740. PacketAnalizer::MakeRequest(req, pszTopic, PACKET_CMD_MSG, &reqParam);
  741. RET_STATUS ret = Request(&req, &resRespons);
  742. cout << "LogicDevice::DevMessage - Request returned: " << ret << endl;
  743. return ret;
  744. }
  745. RET_STATUS LogicDevice::InnerOpenClose(ResDataObject& req, ResDataObject& resp, bool openOrClose)
  746. {
  747. if (openOrClose)
  748. {
  749. ResDataObject resContext;
  750. GetDeviceResource(&resp);
  751. LogicDevice::GetDeviceResource(&resp);
  752. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  753. if (PacketAnalizer::GetPacketContext(&req, resContext))
  754. {
  755. //如果有上线参数
  756. if (resContext.GetFirstOf("Online") >= 0)
  757. {
  758. ////mLog::FINFO("Got Online Request {$}", resContext.encode());
  759. int idx = resContext.GetFirstOf("Online");
  760. //如果有上线参数
  761. if (idx >= 0)
  762. {
  763. do
  764. {
  765. string wsName = resContext[idx].encode();
  766. ////mLog::FINFO("SubscribeGroupActions [{$}] ", wsName);
  767. SubscribeGroupActions(wsName, true);
  768. idx = resContext.GetNextOf("Online", idx);
  769. } while (idx >= 0);
  770. }
  771. resp.update("Online", m_rsOnlineGroup);
  772. }
  773. }
  774. }
  775. else
  776. {
  777. ResDataObject context;
  778. if (PacketAnalizer::GetPacketContext(&req, context))
  779. {
  780. int idx = context.GetFirstOf("Offline");
  781. //如果有上线参数
  782. if (idx >= 0)
  783. {
  784. do
  785. {
  786. string wsName = context[idx].encode();
  787. SubscribeGroupActions(wsName, false);
  788. idx = context.GetNextOf("Offline", idx);
  789. } while (idx >= 0);
  790. }
  791. resp.update("Online", m_rsOnlineGroup);
  792. }
  793. }
  794. return RET_SUCCEED;
  795. }
  796. bool LogicDevice::Exec(void)
  797. {
  798. struct timespec ts;
  799. clock_gettime(CLOCK_REALTIME, &ts);
  800. ts.tv_sec += 3;
  801. // 等待退出信号(3秒超时)
  802. DWORD dwRet = 0;
  803. dwRet = m_ExitFlag->Wait(3);
  804. if (dwRet == WAIT_OBJECT_0)
  805. {
  806. return false;
  807. }
  808. // 等待请求信号(3秒超时)
  809. int ret = sem_timedwait(&m_SemphRequest, &ts);
  810. if (ret == 0) {
  811. ////mLog::FDEBUG("[{$}] Got Packet Event ", m_strClientID);
  812. ResDataObject req;
  813. bool got = true;
  814. do {
  815. got = m_pPacketReceivedQue->DeQueue(req);
  816. if (got) {
  817. m_dwLastPacket = GetTickCount();
  818. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(&req);
  819. std::string keystr = PacketAnalizer::GetPacketKey(&req);
  820. ////mLog::FINFO(" {$} Got Request key {$} transaction {$}",
  821. // m_strClientID, keystr, m_strCurTransaction);
  822. if (cmd == PACKET_CMD_OPEN) {
  823. ResDataObject resRes, resResponse;
  824. InnerOpenClose(req, resRes, true);
  825. PacketAnalizer::MakeOpenResponse(req, resResponse, resRes);
  826. resResponse.update("Online", m_rsOnlineGroup);
  827. PacketAnalizer::UpdatePacketTransaction(resResponse, m_strCurTransaction);
  828. ResDataObject resTopic;
  829. PacketAnalizer::GetContextTopic(&req, resTopic);
  830. PacketAnalizer::UpdatePacketTopic(&resResponse, resTopic, m_strClientID.c_str());
  831. if (keystr.substr(0, 4) == "CCOS") {
  832. PacketAnalizer::UpdatePacketKey(&resResponse, m_strCCOSDevicePath.c_str());
  833. }
  834. else {
  835. PacketAnalizer::UpdateDeviceNotifyResponse(resResponse,
  836. getLocalMachineId(),
  837. getLocalEbusId(),
  838. (uint64_t)getpid(),
  839. (uint64_t)m_pMqttConntion);
  840. }
  841. PublishAction(&resResponse, (const char*)resTopic, m_pMqttConntion);
  842. // 发送连接状态通知
  843. ResDataObject NotifyData;
  844. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_UPDATE,
  845. "ConnectionStatus", "1");
  846. PacketAnalizer::UpdatePacketTransaction(NotifyData, m_strCurTransaction);
  847. CmdFromLogicDev(&NotifyData);
  848. }
  849. else {
  850. PacketAnalizer::GetPacketTransaction(&req, m_strCurTransaction);
  851. ResDataObject resPacket;
  852. RET_STATUS retStatus = RET_FAILED;
  853. if (cmd == PACKET_CMD_EXE && keystr == "UpdateDeviceResource") {
  854. ResDataObject devRes;
  855. if ((retStatus = GetDeviceResource(&devRes)) == RET_SUCCEED) {
  856. LogicDevice::GetDeviceResource(&devRes);
  857. PacketAnalizer::UpdatePacketContext(resPacket, devRes);
  858. }
  859. }
  860. else {
  861. retStatus = Request(&req, &resPacket);
  862. }
  863. PacketAnalizer::MakeRetCode(retStatus, &resPacket);
  864. PacketAnalizer::CloneTransaction(&req, &resPacket);
  865. ResDataObject resTopic;
  866. PacketAnalizer::GetContextTopic(&req, resTopic);
  867. PacketAnalizer::UpdatePacketTopic(&resPacket, resTopic, m_strClientID.c_str());
  868. PublishAction(&resPacket, (const char*)resTopic, m_pMqttConntion);
  869. }
  870. }
  871. } while (got);
  872. // 检查心跳(10分钟无数据发送心跳)
  873. if (GetTickCount() - m_dwLastPacket > 600000) {
  874. m_dwLastPacket = GetTickCount();
  875. ResDataObject heartBeat;
  876. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  877. "HeartBeat", m_strClientID.c_str());
  878. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  879. }
  880. return true;
  881. }
  882. // 检查心跳(即使没有请求)
  883. if (GetTickCount() - m_dwLastPacket > 600000) {
  884. m_dwLastPacket = GetTickCount();
  885. ResDataObject heartBeat;
  886. PacketAnalizer::MakeNotify(heartBeat, PACKET_CMD_ONLINE,
  887. "HeartBeat", m_strClientID.c_str());
  888. PublishAction(&heartBeat, "CCOS/DEVICE/HeartBeat", m_pMqttConntion);
  889. }
  890. return true;
  891. }
  892. void SYSTEM_CALL LogicDevice::CompleteUnInit()
  893. {
  894. StopThread();
  895. }
  896. int LogicDevice::GetDevice_Thread_Priority()
  897. {
  898. return THREAD_PRIORITY_NONE;
  899. }
  900. RET_STATUS LogicDevice::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  901. {
  902. //pDeviceResource->update("ClientType", DPC_UnitClient);
  903. GUID guid;
  904. string name;
  905. GetDeviceType(guid);
  906. guid_2_string(guid, name);
  907. pDeviceResource->update("DeviceType", name.c_str());
  908. //Get Unit Type (Unit GUID)
  909. if (pDeviceResource->GetFirstOf("LogicDevInstance")<0)
  910. {
  911. pDeviceResource->add("LogicDevInstance", m_pDevInstance);
  912. }
  913. ////
  914. size_t idx = (*pDeviceResource)["Attribute"].size();
  915. if (idx > 0)
  916. {
  917. int erroridx = (*pDeviceResource)["Attribute"].GetFirstOf("ErrorList");
  918. if (erroridx < 0)
  919. {
  920. (*pDeviceResource)["Attribute"].add("ErrorList", *m_pResErrorList);
  921. }
  922. else
  923. {
  924. (*pDeviceResource)["Attribute"]["ErrorList"] = *m_pResErrorList;
  925. }
  926. }
  927. else
  928. {
  929. ResDataObject Attribute;
  930. Attribute.add("ErrorList", *m_pResErrorList);
  931. pDeviceResource->add("Attribute", Attribute);
  932. }
  933. // (*pDeviceResource)["Action"].size();
  934. if (pDeviceResource->GetFirstOf("Action") < 0)
  935. {
  936. pDeviceResource->add("Action", m_Actions);
  937. }
  938. return RET_SUCCEED;
  939. }
  940. //notify from lower layer
  941. RET_STATUS LogicDevice::CmdFromLogicDev(ResDataObject *pCmd)
  942. {
  943. PACKET_CMD cmd = PacketAnalizer::GetPacketCmd(pCmd);
  944. PACKET_TYPE type = PacketAnalizer::GetPacketType(pCmd);
  945. if (type == PACKET_TYPE_NOTIFY)
  946. {
  947. CcosDevFileHandle* pHandle = new CcosDevFileHandle;
  948. PacketAnalizer::UpdateNotifyHandle(*pCmd, *pHandle);
  949. ////mLog::FDEBUG("Notify Transaction: {$}", m_strCurTransaction);
  950. PacketAnalizer::UpdatePacketTransaction(*pCmd, m_strCurTransaction);
  951. ;
  952. //if (m_strEBusRoot.length() <= 0)
  953. //{
  954. // //////mLog::FWARN("EBusRoot is null");
  955. //}
  956. PacketAnalizer::UpdateDeviceNotifyResponse(
  957. *pCmd,
  958. getLocalMachineId(),
  959. getLocalEbusId(),
  960. static_cast<uint64_t>(getpid()), // Linux 上获取进程 ID
  961. reinterpret_cast<uint64_t>(m_pMqttConntion) // 假设 m_pMqttConntion 在 Linux 上是指针类型
  962. );
  963. ////mLog::FDEBUG("[{$}] Notify: {$}", m_strClientID, pCmd->encode());
  964. //printf("--> %s \n", pCmd->encode());
  965. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_strClientID.c_str());
  966. PublishAction(pCmd, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  967. string strNotifyPath = "/Notify/" + PacketAnalizer::GetPacketKey(pCmd);
  968. PacketAnalizer::UpdatePacketTopic(pCmd, (m_strCCOSRoot + strNotifyPath).c_str(), m_strClientID.c_str());
  969. PublishAction(pCmd, (m_strCCOSDevicePath + strNotifyPath).c_str(), m_pMqttConntion);
  970. if (m_strAbstractPath.length() > 0)
  971. {
  972. PublishAction(pCmd, (m_strAbstractPath + strNotifyPath).c_str(), m_pMqttConntion);
  973. string realPath,devPath;
  974. devPath = m_strAbstractPath.substr(((string)"CCOS/DEVICE").length());
  975. for (int x = 0; x < m_rsOnlineGroup.size(); x++)
  976. {
  977. if ((int)m_rsOnlineGroup[x] == 1)
  978. {
  979. realPath = "CCOS/" +(string)m_rsOnlineGroup.GetKey(x) + devPath + strNotifyPath;
  980. PublishAction(pCmd, realPath.c_str(), m_pMqttConntion);
  981. }
  982. }
  983. } //m_pPacketSendingQue->InQueue(*pCmd);
  984. delete pHandle;
  985. }
  986. return RET_SUCCEED;
  987. /*
  988. if (pCmd && m_pSysLogic)
  989. {
  990. return m_pSysLogic->CmdFromLogicDev(pCmd);
  991. }
  992. //put log here
  993. return RET_FAILED;*/
  994. }
  995. std::wstring mb2wc_a(const char* mbstr)
  996. {
  997. std::wstring strVal;
  998. // 获取输入字符串的长度
  999. size_t mbstr_len = strlen(mbstr);
  1000. // 计算转换后宽字符字符串的长度
  1001. size_t size = mbstr_len + 1; // 需要多一个字符来存储结尾的 '\0'
  1002. wchar_t* wcstr = new wchar_t[size];
  1003. if (wcstr)
  1004. {
  1005. memset(wcstr, 0, size * sizeof(wchar_t));
  1006. // mbsrtowcs 返回转换后的字符数
  1007. size_t ret = mbsrtowcs(wcstr, &mbstr, size, nullptr);
  1008. if (ret != (size_t)-1) // mbsrtowcs 返回 (size_t)-1 表示错误
  1009. {
  1010. strVal = wcstr;
  1011. }
  1012. delete[] wcstr;
  1013. }
  1014. return strVal;
  1015. }
  1016. RET_STATUS HW_ACTION LogicDevice::AddErrorMessageUnicode(const char* DevInstance, const char* Code, int &Level, const wchar_t* ResInfo, const wchar_t* Description, int nMessageType)
  1017. {
  1018. string ResBase64, DesBase64;
  1019. wstring wResUTF = ResInfo;
  1020. wstring wDesUTF = Description;
  1021. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  1022. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  1023. return AddErrorMessage(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType);
  1024. }
  1025. RET_STATUS HW_ACTION LogicDevice::AddErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  1026. {
  1027. string ResBase64,DesBase64;
  1028. wstring wResUTF = mb2wc_a(ResInfo);
  1029. wstring wDesUTF = mb2wc_a(Description);
  1030. CBase64::Encode((const unsigned char *)wResUTF.c_str(), (unsigned long)wResUTF.size() * sizeof(wchar_t), ResBase64);
  1031. CBase64::Encode((const unsigned char *)wDesUTF.c_str(), (unsigned long)wDesUTF.size() * sizeof(wchar_t), DesBase64);
  1032. return AddErrorMessageBase(DevInstance, Code, Level, ResBase64.c_str(), DesBase64.c_str(), nMessageType, pAppId);
  1033. }
  1034. RET_STATUS LogicDevice::AddErrorMessageBase(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType, const char* pAppId)
  1035. {
  1036. //int ret = 1;
  1037. if (Code == 0 || (string)ResInfo == "")
  1038. {
  1039. ////mLog::FERROR("Code or ResInfo is empty");
  1040. return RET_FAILED;
  1041. }
  1042. MessageInfo info;
  1043. info.CodeID = Code;
  1044. info.Type = nMessageType;
  1045. info.Level = Level;
  1046. info.Resouceinfo = ResInfo;
  1047. info.Description = Description;
  1048. string strDevInstanceCode = DevInstance;
  1049. strDevInstanceCode += Code;
  1050. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1051. {
  1052. string strInstancekey = m_pResErrorList->GetKey(i);
  1053. if (strInstancekey == strDevInstanceCode)
  1054. {
  1055. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1056. //{
  1057. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1058. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1059. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1060. // {
  1061. //ret = 0;
  1062. ////mLog::FWARN("Same Code:%s with MessageType:%d already Exist", Code,nMessageType);
  1063. return RET_SUCCEED;
  1064. // }
  1065. //}
  1066. //ret = 2;
  1067. //break;
  1068. }
  1069. }
  1070. //if (ret==1)
  1071. {
  1072. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1073. //info.GetResDataObject(tempInfo);
  1074. //ErrorInfo.update(Code, tempInfo);
  1075. info.GetResDataObject(ErrorInfo);
  1076. struct timeval tv;
  1077. struct tm* tm_info;
  1078. char TimeTag[30];
  1079. // 获取当前时间
  1080. gettimeofday(&tv, NULL);
  1081. // 转换为本地时间
  1082. tm_info = localtime(&tv.tv_sec);
  1083. // 格式化时间为字符串
  1084. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1085. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1086. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1087. ErrorInfo.add("CreationTime", TimeTag);
  1088. ErrorInfo.add("AppId", pAppId);
  1089. ErrorInfo.add("InstanceId", DevInstance);
  1090. if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1091. {
  1092. m_pResErrorList->update(strDevInstanceCode.c_str(), ErrorInfo);
  1093. }
  1094. ResNotify.update(strDevInstanceCode.c_str(), ErrorInfo);
  1095. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1096. ////mLog::FWARN( "preposterror ErrorType:{$} {$}", nMessageType,NotifyData.encode());
  1097. CmdFromLogicDev(&NotifyData);
  1098. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1099. }
  1100. //else if (ret == 2)
  1101. //{
  1102. // ResDataObject NotifyData, ResNotify, ErrorInfo, tempInfo;
  1103. // info.GetResDataObject(tempInfo);
  1104. // ErrorInfo.update(Code, tempInfo);
  1105. // if (nMessageType == ERRORTYPE)//只有错误会增加到错误列表中,警告通知上层即可
  1106. // {
  1107. // (*m_pResErrorList)[DevInstance].update(Code, tempInfo);
  1108. // }
  1109. // ResNotify.update(DevInstance, ErrorInfo);
  1110. // PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_ADD, "ErrorList", ResNotify);
  1111. // RES_////mLog::FDEBUG(NotifyData, "preposterror RET:%d,ErrorType:%u", ret, nMessageType);
  1112. // CmdFromLogicDev(&NotifyData);
  1113. //}
  1114. //put log here
  1115. return RET_SUCCEED;
  1116. }
  1117. RET_STATUS LogicDevice::AddErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType, const char* pAppId)
  1118. {
  1119. AddErrorMessage(m_pDevInstance, Code, Level, ResInfo, "",nMessageType, pAppId);
  1120. //put log here
  1121. return RET_FAILED;
  1122. }
  1123. RET_STATUS LogicDevice::DelErrorMessage(const char* DevInstance, const char* Code, int &Level, const char* ResInfo, const char* Description, int nMessageType)
  1124. {
  1125. int index = -1;
  1126. bool m_bClearAll = false;
  1127. //trim empty code string
  1128. string CodeStr = Code;
  1129. if (CodeStr.size() == 0 || CodeStr == "0" || CodeStr == "")
  1130. {
  1131. CodeStr = "";
  1132. Code = CodeStr.c_str();
  1133. m_bClearAll = true;
  1134. }
  1135. //if ((string)Code == "0" || (string)Code == "")
  1136. //{
  1137. // m_bClearAll = true;
  1138. //}
  1139. string strDevInstanceCode = DevInstance;
  1140. strDevInstanceCode += Code;
  1141. if (m_bClearAll)
  1142. {
  1143. m_pResErrorList->clear();
  1144. }
  1145. else
  1146. {
  1147. MessageInfo info;
  1148. info.CodeID = Code;
  1149. info.Type = nMessageType;
  1150. info.Level = Level;
  1151. info.Resouceinfo = ResInfo;
  1152. info.Description = Description;
  1153. for (size_t i = 0; i < m_pResErrorList->size(); i++)
  1154. {
  1155. string strInstancekey = m_pResErrorList->GetKey(i);
  1156. if (strInstancekey == strDevInstanceCode)
  1157. {
  1158. //for (size_t j = 0; j < (*m_pResErrorList)[DevInstance].size(); j++)
  1159. //{
  1160. // string strCodekey = (*m_pResErrorList)[DevInstance].GetKey(j);
  1161. // string strtype = (*m_pResErrorList)[DevInstance][strCodekey.c_str()]["Type"];
  1162. // if (strCodekey == (string)Code && atoi(strtype.c_str()) == nMessageType)
  1163. // {
  1164. // index = (INT)j;
  1165. // break;
  1166. // }
  1167. //}
  1168. index = (int)i;
  1169. break;
  1170. }
  1171. }
  1172. }
  1173. MessageInfo info;
  1174. info.CodeID = Code;
  1175. info.Type = nMessageType;
  1176. info.Level = Level;
  1177. info.Resouceinfo = ResInfo;
  1178. info.Description = Description;
  1179. ResDataObject NotifyData, ResNotify, ErrorInfo/*, tempInfo*/;
  1180. //info.GetResDataObject(tempInfo);
  1181. //ErrorInfo.add(Code, tempInfo);
  1182. info.GetResDataObject(ErrorInfo);
  1183. ErrorInfo.add("InstanceId", DevInstance);
  1184. bool result = false;
  1185. if (index>=0)
  1186. {
  1187. //result = (*m_pResErrorList)[DevInstance].eraseOneOf(Code, index);
  1188. result = (*m_pResErrorList).eraseAllOf(strDevInstanceCode.c_str());
  1189. }
  1190. if (index >= 0 || m_bClearAll || nMessageType == WARNTYPE)
  1191. {
  1192. ResNotify.add(strDevInstanceCode.c_str(), ErrorInfo);
  1193. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_DEL, "ErrorList", ResNotify);
  1194. ////mLog::FWARN( "pre Del error ErrorCode:{$} {$}", Code ,NotifyData.encode());
  1195. CmdFromLogicDev(&NotifyData);
  1196. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1197. }
  1198. return RET_SUCCEED;
  1199. }
  1200. RET_STATUS LogicDevice::DelErrorMessage(const char* Code, int &Level, const char* ResInfo, int nMessageType)
  1201. {
  1202. DelErrorMessage(m_pDevInstance, Code, Level, ResInfo, "", nMessageType);
  1203. //put log here
  1204. return RET_FAILED;
  1205. }
  1206. RET_STATUS LogicDevice::EvtProcedure()
  1207. {
  1208. return RET_FAILED;
  1209. }
  1210. bool LogicDevice::CheckFeatureLicense(const char *pszFeatureId)
  1211. {
  1212. ////mLog::FERROR("NOT FINISHED YET");
  1213. return false;
  1214. }
  1215. RET_STATUS LogicDevice::IoSystemLog(int Level, const char* pCode, const char* pContext, size_t ContextSize, const char* pAppId)
  1216. {
  1217. std::string strResult = "";
  1218. //组Context包
  1219. //if (m_pLogger)
  1220. {
  1221. wstring wContect = mb2wc_a(pContext);
  1222. CBase64::Encode((const unsigned char*)wContect.c_str(), (unsigned long)wContect.size() * sizeof(wchar_t), strResult);
  1223. }
  1224. //Thread_Lock();
  1225. //if (NULL != fmt)
  1226. //{
  1227. // va_list marker = NULL;
  1228. // va_start(marker, fmt);
  1229. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1230. // std::vector<char> vBuffer(nLength, '\0');
  1231. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1232. // if (nWritten > 0)
  1233. // {
  1234. // strResult = &vBuffer[0];
  1235. // }
  1236. // va_end(marker);
  1237. //}
  1238. //Thread_UnLock();
  1239. ResDataObject SysLogNode;
  1240. string guidstr;
  1241. GUID DeviceGuid;
  1242. //组Log包
  1243. if (GetDeviceType(DeviceGuid))
  1244. {
  1245. guid_2_string(DeviceGuid, guidstr);
  1246. SysLogNode.add("Module", guidstr.c_str());
  1247. SysLogNode.add("AppId", pAppId);
  1248. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1249. if (pCode)
  1250. {
  1251. SysLogNode.add("BusinessKey", pCode);
  1252. }
  1253. else
  1254. {
  1255. SysLogNode.add("BusinessKey", "");
  1256. }
  1257. SysLogNode.add("IP", (const char*)getLocalIpAddress());
  1258. struct timeval tv;
  1259. struct tm* tm_info;
  1260. char TimeTag[30];
  1261. // 获取当前时间
  1262. gettimeofday(&tv, NULL);
  1263. // 转换为本地时间
  1264. tm_info = localtime(&tv.tv_sec);
  1265. // 格式化时间为字符串
  1266. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1267. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1268. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1269. SysLogNode.add("CreationTime", TimeTag);
  1270. string strLevel = SysLogLevel2str(Level);
  1271. SysLogNode.add("Level", strLevel.c_str());
  1272. SysLogNode.add("HostName", (const char*)getLocalMachineId());
  1273. SysLogNode.add("ProcessName", (const char*)GetModuleTitle());
  1274. SysLogNode.add("FreeText", strResult.c_str());
  1275. SysLogNode.add("Context", pCode);
  1276. ResDataObject NotifyData;
  1277. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1278. CmdFromLogicDev(&NotifyData);
  1279. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1280. }
  1281. else
  1282. {
  1283. ////mLog::FWARN("no Guid??");
  1284. return RET_FAILED;
  1285. }
  1286. //打印LOG
  1287. switch (Level)
  1288. {
  1289. case Syslog_Debug:
  1290. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1291. ////mLog::FDEBUG("SysLog {$} ", SysLogNode.encode());
  1292. break;
  1293. case Syslog_Information:
  1294. ///RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1295. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1296. break;
  1297. case Syslog_Warning:
  1298. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1299. ////mLog::Warn("SysLog {$} ", SysLogNode.encode());
  1300. break;
  1301. case Syslog_Error:
  1302. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1303. ////mLog::FERROR("SysLog {$} ", SysLogNode.encode());
  1304. break;
  1305. case Syslog_Fatal:
  1306. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1307. ////mLog::FINFO("SysLog {$} ", SysLogNode.encode());
  1308. break;
  1309. default:
  1310. ////mLog::FINFO("SysLog {$} " ,SysLogNode.encode() );
  1311. break;
  1312. }
  1313. return RET_SUCCEED;
  1314. }
  1315. RET_STATUS LogicDevice::SystemLog(SYSLOGLEVEL Level,const char *pCode, const char* fmt, ...)
  1316. {
  1317. std::string strResult = "";
  1318. //组Context包
  1319. //if (m_pLogger)
  1320. //{
  1321. // m_pLogger->Thread_Lock();
  1322. // if (NULL != fmt)
  1323. // {
  1324. // va_list marker = NULL;
  1325. // va_start(marker, fmt);
  1326. // size_t nLength = _vscprintf(fmt, marker) + 1;
  1327. // std::vector<char> vBuffer(nLength, '\0');
  1328. // int nWritten = vsnprintf_s(&vBuffer[0], vBuffer.size(), nLength, fmt, marker);
  1329. // if (nWritten > 0)
  1330. // {
  1331. // strResult = &vBuffer[0];
  1332. // }
  1333. // va_end(marker);
  1334. // }
  1335. // m_pLogger->Thread_UnLock();
  1336. //}
  1337. //else
  1338. {
  1339. Thread_Lock();
  1340. if (fmt != NULL) {
  1341. va_list marker;
  1342. va_start(marker, fmt);
  1343. // 获取格式化字符串所需的大小(不包括终止符)
  1344. int nLength = vsnprintf(NULL, 0, fmt, marker);
  1345. if (nLength >= 0) {
  1346. std::vector<char> vBuffer(nLength + 1, '\0'); // +1 为终止符
  1347. vsnprintf(&vBuffer[0], vBuffer.size(), fmt, marker);
  1348. strResult = &vBuffer[0]; // 将缓冲区内容赋值给结果字符串
  1349. }
  1350. va_end(marker);
  1351. }
  1352. Thread_UnLock();
  1353. }
  1354. ResDataObject SysLogNode;
  1355. string guidstr;
  1356. GUID DeviceGuid;
  1357. //组Log包
  1358. if (GetDeviceType(DeviceGuid))
  1359. {
  1360. guid_2_string(DeviceGuid, guidstr);
  1361. SysLogNode.add("Module", guidstr.c_str());
  1362. SysLogNode.add("AppId", "");
  1363. SysLogNode.add("ThreadId", GetCurrentThreadId());
  1364. if (pCode)
  1365. {
  1366. SysLogNode.add("BusinessKey", pCode);
  1367. }
  1368. else
  1369. {
  1370. SysLogNode.add("BusinessKey", "");
  1371. }
  1372. SysLogNode.add("IP", (const char *)getLocalIpAddress());
  1373. struct timeval tv;
  1374. struct tm* tm_info;
  1375. char TimeTag[30];
  1376. // 获取当前时间
  1377. gettimeofday(&tv, NULL);
  1378. // 转换为本地时间
  1379. tm_info = localtime(&tv.tv_sec);
  1380. // 格式化时间为字符串
  1381. snprintf(TimeTag, sizeof(TimeTag), "%04d-%02d-%02d %02d:%02d:%02d.%03ld",
  1382. tm_info->tm_year + 1900, tm_info->tm_mon + 1, tm_info->tm_mday,
  1383. tm_info->tm_hour, tm_info->tm_min, tm_info->tm_sec, tv.tv_usec / 1000);
  1384. SysLogNode.add("CreationTime", TimeTag);
  1385. SysLogNode.add("Level", Level);
  1386. SysLogNode.add("HostName", (const char *)getLocalMachineId());
  1387. SysLogNode.add("ProcessName", (const char *)GetModuleTitle());
  1388. SysLogNode.add("FreeText", strResult.c_str());
  1389. ResDataObject NotifyData;
  1390. PacketAnalizer::MakeNotify(NotifyData, PACKET_CMD_MSG, "Syslog", SysLogNode);
  1391. CmdFromLogicDev(&NotifyData);
  1392. //PublishAction(&NotifyData, (m_strEBusRoot + "/Notify").c_str(), m_pMqttConntion);
  1393. if(m_pParent != nullptr)
  1394. NotifyParent(&NotifyData, "/Notify");
  1395. }
  1396. else
  1397. {
  1398. ////mLog::FERROR("no Guid??");
  1399. return RET_FAILED;
  1400. }
  1401. //打印LOG
  1402. switch (Level)
  1403. {
  1404. case Syslog_Debug:
  1405. //RES_PRINTA_DEBUG(m_pLogger, SysLogNode, "SysLog");
  1406. ////mLog::FDEBUG("SysLog {$}", SysLogNode.encode());
  1407. break;
  1408. case Syslog_Information:
  1409. //RES_PRINTA_INFO(m_pLogger, SysLogNode, "SysLog");
  1410. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1411. break;
  1412. case Syslog_Warning:
  1413. //RES_PRINTA_WARN(m_pLogger, SysLogNode, "SysLog");
  1414. ////mLog::Warn("SysLog {$}", SysLogNode.encode());
  1415. break;
  1416. case Syslog_Error:
  1417. //RES_PRINTA_ERROR(m_pLogger, SysLogNode, "SysLog");
  1418. ////mLog::FERROR("SysLog {$}", SysLogNode.encode());
  1419. break;
  1420. case Syslog_Fatal:
  1421. //RES_PRINTA_FATAL(m_pLogger, SysLogNode, "SysLog");
  1422. ////mLog::FINFO("SysLog {$}", SysLogNode.encode());
  1423. break;
  1424. default:
  1425. ////mLog::FINFO( "SysLog {$}", SysLogNode.encode());
  1426. break;
  1427. }
  1428. return RET_SUCCEED;
  1429. }
  1430. /////////////////////////////////////////////////////////////////////////////
  1431. ccos_mqtt_connection* LogicDevice::CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg)
  1432. {
  1433. return nullptr;
  1434. }
  1435. std::string CurrentDateTime()
  1436. {
  1437. auto now = std::chrono::system_clock::now();
  1438. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  1439. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  1440. std::tm now_tm{};
  1441. localtime_r(&now_time_t, &now_tm); // 线程安全版本
  1442. char buf[64];
  1443. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &now_tm);
  1444. std::string now_time_str = buf;
  1445. // 格式化微秒部分并追加
  1446. snprintf(buf, sizeof(buf), ".%06lld", static_cast<long long>(now_us.count() % 1000000));
  1447. now_time_str += buf;
  1448. return now_time_str;
  1449. }
  1450. static string CurrentDateTime2()
  1451. {
  1452. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  1453. char buf[100] = { 0 };
  1454. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  1455. return buf;
  1456. }
  1457. bool LogicDevice::GetActions(ResDataObject& resAction)
  1458. {
  1459. for (int x = 0; x < m_Actions.size(); x++)
  1460. {
  1461. resAction.update(m_Actions.GetKey(x), "");
  1462. }
  1463. return true;
  1464. }
  1465. void LogicDevice::NotifyParent(ResDataObject* NotifyData, const char* pszTopic, ccos_mqtt_connection* hConnection)
  1466. {
  1467. if (m_pParent != nullptr)
  1468. {
  1469. if (hConnection == nullptr)
  1470. {
  1471. hConnection = m_pParent->m_pMqttConntion;
  1472. }
  1473. PublishAction(NotifyData, (m_pParent->GetRootPath() + pszTopic).c_str(), hConnection);
  1474. }
  1475. }
  1476. const mqtt_qos_t MQTT_QOS = QOS2;
  1477. /// <summary>
  1478. /// 工作位设备组 上线或者下线,设备路径为 CCOS/Table/Detector
  1479. /// CCOS/Demo/Detector
  1480. /// </summary>
  1481. /// <param name="bOnline"></param>
  1482. void LogicDevice::SubscribeGroupActions(string wsName, int bOnline)
  1483. {
  1484. if (m_rsOnlineGroup.GetFirstOf(wsName.c_str()) >= 0)
  1485. {
  1486. int lastOnline = (int)m_rsOnlineGroup[wsName.c_str()];
  1487. if (bOnline == lastOnline)
  1488. {
  1489. ////mLog::FWARN("Group Abstract is already ? Online {$}", bOnline);
  1490. return;
  1491. }
  1492. }
  1493. if ( m_strAbstractPath.length() > 0)
  1494. {
  1495. std::string pszAction, gpPath;
  1496. int ret = 0;
  1497. int num = m_Actions.size();
  1498. ////mLog::FINFO("Group Device onLine or offLine {$}", bOnline);
  1499. gpPath = m_strAbstractPath;
  1500. gpPath.replace(0, string("CCOS/DEVICE").length(), "CCOS/" + wsName);
  1501. pszAction = gpPath + m_strDevicePath;
  1502. pszAction += "/Action/+";
  1503. SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1504. if (bOnline)
  1505. {
  1506. ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1507. }
  1508. else
  1509. {
  1510. ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1511. }
  1512. //for (size_t i = 0; i < num; i++)
  1513. //{
  1514. // pszAction = gpPath + m_strDevicePath;
  1515. // pszAction += "/Action/";
  1516. // pszAction += (m_Actions.GetKey(i));
  1517. // SubScribeTopic(pszAction.c_str(), bOnline == 1);
  1518. // if (bOnline)
  1519. // {
  1520. // ////mLog::FINFO("{$} Subscribe Device Group Action {$} ", m_strClientID, pszAction);
  1521. // }
  1522. // else
  1523. // {
  1524. // ////mLog::FINFO("{$} UnSubscribe Device Group Action {$} ", m_strClientID, pszAction);
  1525. // }
  1526. //}
  1527. m_rsOnlineGroup.update(wsName.c_str(), bOnline);
  1528. }
  1529. else
  1530. {
  1531. ////mLog::FWARN("try Online but AbstractPath is none {$}", m_strClientID);
  1532. }
  1533. }
  1534. void SubscribeModule(ccos_mqtt_connection* pconn, string devuribBase)
  1535. {
  1536. string pszAction = devuribBase ;
  1537. pszAction += "/Get/+";
  1538. m_subscribedTopics.push_back(pszAction);
  1539. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1540. pszAction = devuribBase;
  1541. pszAction += "/Update/+";
  1542. m_subscribedTopics.push_back(pszAction);
  1543. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1544. pszAction = devuribBase;
  1545. pszAction += "/Set/+";
  1546. m_subscribedTopics.push_back(pszAction);
  1547. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1548. pszAction = devuribBase;
  1549. pszAction += "/Add/+";
  1550. m_subscribedTopics.push_back(pszAction);
  1551. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1552. pszAction = devuribBase;
  1553. pszAction += "/Del/+";
  1554. m_subscribedTopics.push_back(pszAction);
  1555. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1556. pszAction = devuribBase;
  1557. pszAction += "/Action/+";
  1558. m_subscribedTopics.push_back(pszAction);
  1559. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1560. pszAction = devuribBase;
  1561. pszAction += "/Message/+";
  1562. m_subscribedTopics.push_back(pszAction);
  1563. SubscribeTopic(pconn, m_subscribedTopics.back().c_str());
  1564. }
  1565. void LogicDevice::SubscribeActions()
  1566. {
  1567. ////mLog::FINFO("Begain");
  1568. if (nullptr == m_pMqttConntion)
  1569. return;
  1570. int num = m_Actions.size();
  1571. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*m_pMqttConntion);
  1572. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*m_pMqttConntion);
  1573. {
  1574. ResDataObject res,resAction;
  1575. GetDeviceResource(&res);
  1576. std::cout << "LogicDevice::SubscribeActions GetDeviceResource" << res.encode() << endl;
  1577. resAction = res["Action"];
  1578. ////mLog::FINFO("Actions from deviceresource [{$}]", resAction.size());
  1579. for (int x = 0; x < resAction.size(); x++)
  1580. {
  1581. string action = (string)resAction.GetKey(x);
  1582. if (action.length() > 0)
  1583. {
  1584. ////mLog::FINFO("Action from DeviceResource [{$}]", action);
  1585. m_Actions.update(action.c_str(), "");
  1586. }
  1587. }
  1588. num = m_Actions.size();
  1589. }
  1590. std::string pszAction;
  1591. int ret = 0;
  1592. SubscribeModule(m_pMqttConntion, m_strEBusRoot.c_str());
  1593. ////mLog::FINFO("{$} Subscribe EBus Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1594. if (m_strEBusRoot != m_strCCOSDevicePath)
  1595. {
  1596. pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1597. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1598. ////mLog::FINFO("{$} Subscribe CCOS Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1599. }
  1600. if (m_strAbstractPath.length() > 0)
  1601. {
  1602. pszAction = m_strAbstractPath + m_strDevicePath;
  1603. SubscribeModule(m_pMqttConntion, pszAction.c_str());
  1604. ////mLog::FINFO("{$} Subscribe Abstract Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1605. }
  1606. //for (size_t i = 0; i < num; i++)
  1607. //{
  1608. // //订阅ebus的路径
  1609. // pszAction = m_strEBusRoot;
  1610. // pszAction += "/Action/";
  1611. // pszAction += (m_Actions.GetKey(i));
  1612. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1613. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1614. //
  1615. // //订阅CCOS设备路径
  1616. // if (m_strEBusRoot != m_strCCOSDevicePath)
  1617. // {
  1618. // pszAction = m_strCCOSDevicePath + m_strDevicePath;
  1619. // pszAction += "/Action/";
  1620. // pszAction += (m_Actions.GetKey(i));
  1621. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1622. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1623. // }
  1624. // //抽象设备路径要一直存在
  1625. // //订阅抽象设备路径
  1626. // if ( m_strAbstractPath.length() > 0)
  1627. // {
  1628. // pszAction = m_strAbstractPath + m_strDevicePath;
  1629. // pszAction += "/Action/";
  1630. // pszAction += (m_Actions.GetKey(i));
  1631. // SubscribeTopic(m_pMqttConntion, pszAction.c_str());
  1632. // ////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1633. // }
  1634. //}
  1635. //pszAction = m_strEBusRoot;
  1636. //pszAction += "/Action/UpdateDeviceResource";
  1637. ////std::cout << "=====*******====== 【" << m_strClientID << "】 Subscribe Action " << pszAction << endl;
  1638. //pTopicList->push_back(pszAction);
  1639. //mqtt_subscribe(pMqttClient, pszAction.c_str(), MQTT_QOS, msgarrivd);
  1640. //////mLog::FINFO("{$} Subscribe Action {$} return {$} topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
  1641. //*/
  1642. }
  1643. /*
  1644. void connlost(void* context, char* cause)
  1645. {
  1646. //连接中断
  1647. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1648. std::cout << "Connection Lost .....[" << std::get<CLINET_ID_ID>(*connection) <<"] why?" << endl;
  1649. printf("\nConnection lost\n");
  1650. printf(" cause: %s\n", cause);
  1651. std::cout << CurrentDateTime() << "MQTT Server Connection lost...." << endl;
  1652. }
  1653. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  1654. {
  1655. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1656. std::cout << "Connection disconnected .....[" << std::get<CLINET_ID_ID>(*connection) << "] why?" << endl;
  1657. }
  1658. */
  1659. //发送成功
  1660. //void delivered(void* context, MQTTAsync_token dt)
  1661. //{
  1662. // printf("Message with token value %d delivery confirmed\n", dt);
  1663. // //deliveredtoken = dt;
  1664. //}
  1665. //void connlost(void* context, char* cause)
  1666. //{
  1667. // //printf("Connection 0X%08X Lost ...... ???? %s \n",, cause);
  1668. // //std::cout << "Connection Context [" << (UINT64)context << "] conn lost " << (cause==nullptr?"": cause) << endl;
  1669. //
  1670. // //连接中断
  1671. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1672. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1673. // std::cout << CurrentDateTime() << "Connection Lost 2 .....[" << std::get<CLINET_ID_ID>(*connection) << "] why? " << (cause == nullptr ? "" : cause) << endl;
  1674. // return;
  1675. //
  1676. // MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  1677. // int rc;
  1678. //
  1679. // printf("Reconnecting\n");
  1680. // conn_opts.keepAliveInterval = 20;
  1681. // conn_opts.cleansession = 1;
  1682. // if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  1683. // {
  1684. // printf("Failed to start connect, return code %d\n", rc);
  1685. // //finished = 1;
  1686. // }
  1687. //}
  1688. //重连成功
  1689. //void onReconnected(void* context, char* cause)
  1690. //{
  1691. // //printf("onReconnected 0X%08X Lost ...... ???? %s \n", (UINT64)context, cause);
  1692. // //std::cout << "onReconnected [" << (UINT64)context << "] conn " << (cause == nullptr ? "" : cause) << endl;
  1693. //
  1694. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1695. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1696. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1697. // mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1698. // const char* client_id = std::get<CLINET_ID_ID>(*connection);
  1699. // int rc;
  1700. //
  1701. // //printf(" %s \n", std::get<CLINET_ID_ID>(*connection));
  1702. // std::cout << "[" << client_id << "] Successful reconnection Use context [" << (UINT64)context << "]" << endl;
  1703. // //cout << "Connected ok.. by TID [" << GetCurrentThreadId() << "]" << endl;
  1704. // // 重新订阅,暂不重新订阅,看看
  1705. //
  1706. // ///*
  1707. // //printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1708. // // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1709. // //opts.onSuccess = onSubscribe;
  1710. // //opts.onFailure = onSubscribeFailure;
  1711. // //opts.context = client;
  1712. // auto it = pTopicList->begin();
  1713. // while(it != pTopicList->end())
  1714. // {
  1715. // rc = MQTTAsync_subscribe(client, (*it).c_str(), 1, &opts);
  1716. // printf("-------- [%s] re subscribe %s, return code %d\n", client_id, (*it).c_str(), rc);
  1717. // it++;
  1718. // }
  1719. // //*/
  1720. //
  1721. //}
  1722. //
  1723. ////MQTT 连接主动断开连接失败
  1724. //void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1725. //{
  1726. // printf("Disconnect failed, rc %d\n", response->code);
  1727. // //disc_finished = 1;
  1728. //}
  1729. //
  1730. ////MQTT 连接主动断开连接成功
  1731. //void onDisconnect(void* context, MQTTAsync_successData* response)
  1732. //{
  1733. // //printf("Successful disconnection\n");
  1734. // //disc_finished = 1;
  1735. //}
  1736. //
  1737. //void onSubscribe(void* context, MQTTAsync_successData* response)
  1738. //{
  1739. // printf("Subscribe succeeded\n");
  1740. // //subscribed = 1;
  1741. //}
  1742. //
  1743. //void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1744. //{
  1745. // printf("Subscribe failed, rc %d\n", response->code);
  1746. // //finished = 1;
  1747. //}
  1748. //
  1749. //
  1750. //void onConnectFailure(void* context, MQTTAsync_failureData* response)
  1751. //{
  1752. // printf("Connect failed, rc %d\n", response->code);
  1753. // //finished = 1;
  1754. //}
  1755. //void onConnect(void* context, MQTTAsync_successData* response)
  1756. //{
  1757. //
  1758. // ccos_mqtt_connection* connection = (ccos_mqtt_connection*)context;
  1759. // mqtt_client* client = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1760. //
  1761. // std::cout << "[" << std::get<CLINET_ID_ID>(*connection) << "] onConnect Context [" << (UINT64)context << "] " << endl;
  1762. //
  1763. // MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1764. // int rc;
  1765. //
  1766. // //printf("[%s] connect MQTT Successful connection\n", std::get< CLINET_ID_ID>(*connection));
  1767. // HANDLE hConnected = std::get< CONNECTED_HANDLE_ID>(*connection);
  1768. // if (hConnected != NULL)
  1769. // SetEvent(hConnected);
  1770. //
  1771. // /*
  1772. // printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  1773. // "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  1774. // opts.onSuccess = onSubscribe;
  1775. // opts.onFailure = onSubscribeFailure;
  1776. // opts.context = client;
  1777. // if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  1778. // {
  1779. // printf("Failed to start subscribe, return code %d\n", rc);
  1780. // finished = 1;
  1781. // }*/
  1782. //}
  1783. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  1784. int PublishActionWithoutLock(string &message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos = MQTT_QOS);
  1785. void resubscribe_topic(void* client, void* reconnect_date)
  1786. {
  1787. mqtt_client* pClient = (mqtt_client*)client;
  1788. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1789. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1790. string client_id = std::get<CLINET_ID_ID>(*connection);
  1791. ////mLog::FWARN("Connection Reconneted ok. {$}, topic num :{$}", client_id, pTopicList->size());
  1792. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1793. pLock->Thread_Lock();
  1794. for_each(pTopicList->begin(),pTopicList->end(), [&pClient] (string str)-> void {
  1795. ////mLog::FWARN("Resubscribe {$}", str);
  1796. mqtt_subscribe(pClient, str.c_str(), MQTT_QOS, msgarrivd);
  1797. });
  1798. pLock->Thread_UnLock();
  1799. }
  1800. //MQTT消息抵达
  1801. //int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1802. void msgarrivd(void* client, message_data_t* message)
  1803. {
  1804. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] "<<endl;
  1805. if (client == nullptr)
  1806. {
  1807. //printf("************************ somthing happend...");
  1808. return;
  1809. }
  1810. //消息抵达
  1811. ResDataObject req;
  1812. string topic = strlen( message->topic_name) > sizeof(message->topic_name)-1 ? string(message->topic_name, sizeof(message->topic_name) - 1) : message->topic_name; //topicName; //msg->get_topic().c_str();
  1813. mqtt_client* pClient = (mqtt_client*)client;
  1814. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pClient->mqtt_conn_context;
  1815. if (!connection) return;
  1816. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1817. if (pLock == nullptr)
  1818. return;
  1819. pLock->Thread_Lock();
  1820. //再次取Lock,如果发现是空了,什么都不干
  1821. CcosLock* pLockTemp = std::get<CONN_SEND_LOCK_ID>(*connection);
  1822. if (pLockTemp == nullptr)
  1823. {
  1824. pLock->Thread_UnLock();
  1825. return;
  1826. }
  1827. string client_id = std::get<CLINET_ID_ID>(*connection);
  1828. void* pc = std::get<MQTT_CLT_ID>(*connection);
  1829. if (pc == nullptr)
  1830. {
  1831. //printf("mqtt is closed now..\n");
  1832. pLock->Thread_UnLock();
  1833. return;
  1834. }
  1835. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get Msg from " << topic << /*" msg Body " << payload <<*/ endl;
  1836. //std::cout << " msg Body " << payload << endl;
  1837. if ((topic.length() >= MQTT_TOPIC_LEN_MAX && message->message->payloadlen > 16380) || message->message->payloadlen > 512*1024)
  1838. {
  1839. //std::string payloads((const char*)message->message->payload, 128);
  1840. ////mLog::FWARN("TID {$} : {$} Get Bad Msg Len[{$}] from {$}", GetCurrentThreadId(), client_id, message->message->payloadlen, topic.substr(0, 255));
  1841. pLock->Thread_UnLock();
  1842. return;
  1843. }
  1844. if((int)message->message->payloadlen > 16380)
  1845. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Get big Msg from " << topic << " msg Len " << message->message->payloadlen << /*" msg Body " << payload <<*/ endl;
  1846. ////mLog::FWARN("TID {$} : {$} Get big Msg from {$} msg Len {$}", GetCurrentThreadId(), client_id, topic, message->message->payloadlen);
  1847. ////mLog::FDEBUG("TID {$} : {$} Get Msg from {$} msg Body {$}", GetCurrentThreadId(), client_id, topic, payload.substr(0, 1024));
  1848. const char* pclient_id = client_id.c_str();
  1849. try
  1850. {
  1851. std::string strPayload((const char*)message->message->payload, message->message->payloadlen);
  1852. req.decode(strPayload.c_str());
  1853. }
  1854. catch (...)
  1855. {
  1856. pLock->Thread_UnLock();
  1857. ////mLog::FWARN("Decode Exception.. {$}", payload);
  1858. return;
  1859. }
  1860. //取消息钩子
  1861. void* pHook = std::get<MSG_HOOK_ID>(*connection);
  1862. if (pHook != nullptr) {
  1863. ccos_mqtt_msg_filter* filter = (ccos_mqtt_msg_filter*)(pHook);
  1864. ccos_mqtt_msg_filter_func func = std::get<FILTER_FUNC_ID>(*filter);
  1865. //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1866. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Process " << topic << endl;
  1867. ////mLog::FDEBUG("TID {$} : {$} Got Hook Process {$} ", GetCurrentThreadId(), client_id, topic);
  1868. //消息钩子函数存在
  1869. ResDataObject *pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
  1870. for(int x=0;x<pRequests->size();x++) {
  1871. //ccos_mqtt_msg_filter_func func = *(ccos_mqtt_msg_filter_func*)pfiterFunc;
  1872. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Hook Process Function " << topic << endl;
  1873. ////mLog::FDEBUG("TID {$} : {$} Hook Process Function {$} ", GetCurrentThreadId(), client_id, topic);
  1874. string keyAction = pRequests->GetKey(x);
  1875. ResDataObject resObj = (*pRequests)[x];
  1876. std::shared_ptr<LinuxEvent> hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1877. if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES && keyAction == PacketAnalizer::GetPacketKey(&req)) {
  1878. //勾住了,是该钩子的消息,则通知
  1879. //std::get<FILTER_FUNC_ID>(*filter) = nullptr;
  1880. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Got Hook Function Hooked.. " << topic << endl;
  1881. hEvent = LinuxEvent::OpenEvent(string(resObj).c_str());
  1882. if (hEvent==nullptr)
  1883. {
  1884. hEvent = std::get<FILTER_HANDLE_ID>(*filter);
  1885. }
  1886. ////mLog::FINFO("TID {$} : {$} Got Hook Function Hooked.. {$} Notify Event [{$}] Handle [{$}]", GetCurrentThreadId(), client_id, topic, string(resObj), hEvent);
  1887. //PacketAnalizer::GetPacketContext(&req, *resObj);
  1888. ResDataObject* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
  1889. pResponse->add(keyAction.c_str(), req);
  1890. if (!hEvent) {
  1891. std::cerr << "Failed to open shared event" << std::endl;
  1892. }
  1893. else {
  1894. hEvent->SetEvent();
  1895. }
  1896. //CloseHandle(hEvent);
  1897. pRequests->eraseOneOf(keyAction.c_str(), 0);
  1898. pLock->Thread_UnLock();
  1899. //MQTTAsync_freeMessage(&message);
  1900. //MQTTAsync_free(topicName);
  1901. return ;
  1902. }
  1903. }
  1904. }
  1905. std::cout << CurrentDateTime() << "msgarrivd:: TID [" << GetCurrentThreadId() << "] " << client_id << " Go on Process " << topic << endl;
  1906. pLock->Thread_UnLock();
  1907. ccos_mqtt_callback onmsg = std::get<USER_MSG_CAKBCK_ID>(*connection);
  1908. if (onmsg != nullptr)
  1909. (onmsg)(&req, topic.c_str(), connection);
  1910. else
  1911. {
  1912. ////mLog::FWARN("TID {$} : {$} USER_MSG_CAKBCK_ID is null {$} ", GetCurrentThreadId(), std::get<CLINET_ID_ID>(*connection), topic);
  1913. cout << "**** ----- **** USER_MSG_CAKBCK_ID is null" << std::get<CLINET_ID_ID>(*connection) << " When processing " << topic << endl;
  1914. }
  1915. //MQTTAsync_freeMessage(&message);
  1916. //MQTTAsync_free(topicName);
  1917. return ;
  1918. }
  1919. void* MqttSendThreadFunc(void* pPara)
  1920. {
  1921. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)pPara;
  1922. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  1923. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*connection);
  1924. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connection);
  1925. std::string client_id = std::get<CLINET_ID_ID>(*connection);
  1926. sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
  1927. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
  1928. // 线程启动日志
  1929. std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
  1930. while (std::get<THREAD_RUNNING_ID>(*connection)) {
  1931. struct timespec ts;
  1932. clock_gettime(CLOCK_REALTIME, &ts);
  1933. ts.tv_sec += 1;
  1934. // 等待信号量前的日志
  1935. //std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
  1936. if (sem_timedwait(hSemaphore, &ts) == 0) {
  1937. std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
  1938. pLock->Thread_Lock();
  1939. //std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
  1940. if (pList->empty()) {
  1941. std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
  1942. pLock->Thread_UnLock();
  1943. continue;
  1944. }
  1945. ResDataObject* pMsg = pList->front();
  1946. pList->pop_front();
  1947. //std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
  1948. std::string m_strSendMessage;
  1949. for (int x = 0; x < pMsg->size(); x++) {
  1950. const char* pTopic = pMsg->GetKey(x);
  1951. m_strSendMessage = (std::string)(*pMsg)[pTopic];
  1952. std::cout<< CurrentDateTime() << " [" << client_id << "] Prepare to publish messages to the topic: " << pTopic
  1953. << ", Message length: " << m_strSendMessage.length() << "Byte" << std::endl;
  1954. PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id, pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
  1955. //mqtt_message_t msg;
  1956. //memset(&msg, 0, sizeof(msg));
  1957. //msg.payload = (void*)message.c_str();
  1958. //msg.payloadlen = message.length();
  1959. //msg.qos = QOS1;
  1960. //// 发布消息
  1961. //int publishResult = mqtt_publish(pConn, pTopic, &msg);
  1962. //if (publishResult == 0) {
  1963. // std::cout << "[" << client_id << "] Message published successfully to the topic: " << pTopic << std::endl;
  1964. //}
  1965. //else {
  1966. // std::cout << "[" << client_id << "] Message publishing failed to the topic: " << pTopic
  1967. // << ", Error code: " << publishResult << std::endl;
  1968. //}
  1969. }
  1970. std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
  1971. delete pMsg;
  1972. pLock->Thread_UnLock();
  1973. //std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
  1974. }
  1975. else if (errno == ETIMEDOUT) {
  1976. //std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
  1977. continue;
  1978. }
  1979. else {
  1980. // 其他错误情况
  1981. std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
  1982. continue;
  1983. }
  1984. }
  1985. return NULL;
  1986. }
  1987. // 安全遍历MQTT消息处理程序的宏
  1988. #define mqtt_list_for_each_entry_safe(pos, n, head, member) \
  1989. for (pos = container_of((head)->next, typeof(*pos), member), \
  1990. n = container_of(pos->member.next, typeof(*pos), member); \
  1991. &pos->member != (head); \
  1992. pos = n, n = container_of(n->member.next, typeof(*n), member))
  1993. // 通过成员指针获取包含结构的指针
  1994. #define container_of(ptr, type, member) ({ \
  1995. const typeof(((type*)0)->member)* __mptr = (ptr); \
  1996. (type*)((char*)__mptr - offsetof(type, member)); })
  1997. // 重连回调函数 - 实现自动重订阅
  1998. static void reconnect_handler(void* client, void* reconnect_data) {
  1999. mqtt_client_t* c = (mqtt_client_t*)client;
  2000. message_handlers_t* pos, * n;
  2001. ccos_mqtt_connection* connection = (ccos_mqtt_connection*)c->mqtt_conn_context;
  2002. // 遍历所有已订阅的主题并重新订阅
  2003. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*connection);
  2004. pLock->Thread_Lock();
  2005. mqtt_list_for_each_entry_safe(pos, n, &c->mqtt_msg_handler_list, list) {
  2006. mqtt_subscribe(c, pos->topic_filter, pos->qos, pos->handler);
  2007. }
  2008. pLock->Thread_UnLock();
  2009. }
  2010. mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
  2011. std::cout << "[MQTT] Acquiring MQTT client instance..." << std::endl;
  2012. mqtt_client_t* pMqttClient = mqtt_lease();
  2013. if (!pMqttClient) {
  2014. std::cerr << "[ERROR] Failed to acquire MQTT client instance" << std::endl;
  2015. return nullptr;
  2016. }
  2017. std::cout << "[SUCCESS] MQTT client acquired: " << pMqttClient << std::endl;
  2018. // 确保 connection 指针有效(避免后续访问悬空指针)
  2019. if (!connection) {
  2020. std::cerr << "[ERROR] Invalid connection context (nullptr)" << std::endl;
  2021. mqtt_release(pMqttClient); // 释放客户端
  2022. return nullptr;
  2023. }
  2024. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2025. pMqttClient->mqtt_conn_context = connection;
  2026. std::string pszClientID = std::get<CLINET_ID_ID>(*connection);
  2027. std::cout << "[CONFIG] Setting connection parameters:"
  2028. << "\n\tClient ID: " << (!pszClientID.empty() ? pszClientID : "null")
  2029. << "\n\tHost: " << SERVER_ADDRESS
  2030. << "\n\tPort: 1883"
  2031. << std::endl;
  2032. // 设置MQTT连接参数
  2033. std::string host_str = SERVER_ADDRESS;
  2034. char* host_buf = new char[host_str.size() + 1];
  2035. strncpy(host_buf, host_str.c_str(), host_str.size() + 1);
  2036. mqtt_set_host(pMqttClient, host_buf);
  2037. char port_buf[6] = "1883";
  2038. uint8_t clean_session = 1;
  2039. uint16_t keep_alive = 50;
  2040. mqtt_set_port(pMqttClient, port_buf);
  2041. mqtt_set_user_name(pMqttClient, const_cast<char*>("zskkcc"));
  2042. mqtt_set_password(pMqttClient, const_cast<char*>("zskk1234"));
  2043. mqtt_set_client_id(pMqttClient, (char*)pszClientID.c_str());
  2044. mqtt_set_clean_session(pMqttClient, 0);
  2045. mqtt_set_keep_alive_interval(pMqttClient, keep_alive);
  2046. mqtt_set_cmd_timeout(pMqttClient, 5000);
  2047. mqtt_set_write_buf_size(pMqttClient, 4096+1);
  2048. mqtt_set_read_buf_size(pMqttClient, 4096+1);
  2049. // 设置重连回调(替代mqtt_set_resubscribe_handler)
  2050. //mqtt_subscribe();
  2051. mqtt_set_reconnect_handler(pMqttClient, reconnect_handler);
  2052. int rc = 0;
  2053. uint64_t start_time = GetTickCount();
  2054. const uint64_t timeout_ms = 10000; // 10秒超时
  2055. int attempt_count = 0;
  2056. std::cout << "[CONNECT] Initiating connection (timeout: " << timeout_ms << "ms)..." << std::endl;
  2057. try {
  2058. while (true) {
  2059. attempt_count++;
  2060. std::cout << CurrentDateTime()<< " [ATTEMPT #" << attempt_count << "] Trying MQTT connect..." << std::endl;
  2061. rc = mqtt_connect(pMqttClient);
  2062. if (rc != MQTT_SUCCESS_ERROR) {
  2063. uint64_t elapsed = GetTickCount() - start_time;
  2064. std::cerr << "[ERROR] Connection failed (code: " << rc << "), Elapsed: " << elapsed << "ms" << std::endl;
  2065. if (elapsed > timeout_ms) {
  2066. // 超时处理
  2067. std::cerr << "[FATAL] Connection timeout after " << timeout_ms << "ms" << std::endl;
  2068. delete[] host_buf;
  2069. mqtt_release(pMqttClient);
  2070. return nullptr;
  2071. }
  2072. // 计算剩余时间
  2073. uint64_t remaining = timeout_ms - elapsed;
  2074. uint64_t wait_time = (remaining > 2000) ? 2000 : remaining;
  2075. std::cout << "[RETRY] Waiting " << wait_time << "ms before next attempt..." << std::endl;
  2076. usleep(wait_time * 1000);
  2077. }
  2078. else {
  2079. uint64_t total_time = GetTickCount() - start_time;
  2080. std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl;
  2081. break;
  2082. }
  2083. }
  2084. }
  2085. catch (const std::exception& e) {
  2086. std::cerr << "[ERROR] " << e.what() << std::endl;
  2087. delete[] host_buf;
  2088. mqtt_release(pMqttClient);
  2089. return nullptr;
  2090. }
  2091. return pMqttClient;
  2092. }
  2093. ccos_mqtt_connection* LogicDevice::NewConnection(const char* pszServer,const char* pszServerPort, const char* pszUserName, const char* pszPassword, const char* pszClientID, ccos_mqtt_callback onmsg)
  2094. {
  2095. ////mLog::FINFO("TID {$} : {$} NewConnection {$}:{$} user: {$} password {$} ", GetCurrentThreadId(), pszClientID, pszServer, pszServerPort, pszUserName, pszPassword);
  2096. std::cout << "TID " << GetCurrentThreadId()
  2097. << " : " << pszClientID
  2098. << " NewConnection " << pszServer
  2099. << ":" << pszServerPort
  2100. << " user: " << pszUserName
  2101. << " password " << pszPassword
  2102. << std::endl;
  2103. //连接MQTT broker
  2104. DWORD dwTick = GetTickCount();
  2105. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2106. // 初始化元组字段
  2107. std::get<MQTT_TIPIC_LIST_ID>(*connection) = new mqtt_topic_list;
  2108. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2109. std::get<USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2110. CcosLock* pLock = new CcosLock();
  2111. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2112. pLock->Thread_Lock();
  2113. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2114. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2115. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2116. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2117. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2118. std::cout << "ALLOCATE Connection [" << (UINT64) connection << "] filter [" << (UINT64)pfilter << "] .. for " << pszClientID << endl;
  2119. mqtt_client* pMqttClient = InnerConnect(connection);
  2120. if (pMqttClient == nullptr)
  2121. {
  2122. pLock->Thread_UnLock();
  2123. delete connection;
  2124. delete pfilter;
  2125. return nullptr;
  2126. }
  2127. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2128. //pMqttClient->mqtt_conn_context = connection;
  2129. //设置MQTT连接参数
  2130. //mqtt_set_port(pMqttClient, (char*)pszServerPort);
  2131. //mqtt_set_host(pMqttClient, (char*)pszServer);
  2132. //sprintf(szClentName, "%s_%d_%d", server ? "MQTT_TEST_SERVER" : "TEST_CLIENT", GetCurrentProcessId(), xc);
  2133. //mqtt_set_client_id(pMqttClient, (char*)pszClientID);
  2134. //mqtt_set_user_name(pMqttClient, (char*)pszUserName);
  2135. //mqtt_set_password(pMqttClient, (char*)pszPassword);
  2136. //mqtt_set_clean_session(pMqttClient, 1);
  2137. //mqtt_set_version(pMqttClient, 5);
  2138. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2139. //mqtt_set_write_buf_size(pMqttClient, 8192);
  2140. //mqtt_set_read_buf_size(pMqttClient, 8192);
  2141. //mqtt_set_write_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  2142. //mqtt_set_read_buf_size(pMqttClient, MQTT_DEFAULT_BUF_SIZE);
  2143. //if (pszUserName != nullptr && strlen(pszUserName) > 0)
  2144. // conn_opts.username = pszUserName;
  2145. //if (pszPassword != nullptr && strlen(pszPassword) > 0)
  2146. // conn_opts.password = pszPassword;
  2147. //设置回调函数
  2148. /*
  2149. int rc = 0;
  2150. dwTick = GetTickCount();
  2151. while (true)
  2152. {
  2153. int rc = mqtt_connect(pMqttClient);
  2154. if (rc != MQTT_SUCCESS_ERROR)
  2155. {
  2156. //默认10s内尝试
  2157. if (GetTickCount() - dwTick > 10000)
  2158. {
  2159. ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2160. pLock->Thread_UnLock();
  2161. delete connection;
  2162. delete pfilter;
  2163. return nullptr;
  2164. }
  2165. else
  2166. {
  2167. ////mLog::FINFO(" TID {$} {$} Failed 1 to connect to the MQTT server Try again 2s later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2168. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed 1 to connect to the MQTT server Try again 2s later...【" << pszClientID << "】error : " << rc << endl;
  2169. Sleep(2000);
  2170. }
  2171. }
  2172. else
  2173. {
  2174. break;
  2175. }
  2176. }
  2177. */
  2178. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2179. // 创建POSIX信号量
  2180. sem_t* semaphore = new sem_t;
  2181. sem_init(semaphore, 0, 0);
  2182. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2183. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2184. // 创建发送线程
  2185. pthread_t threadId;
  2186. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2187. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2188. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2189. // 错误处理...
  2190. sem_destroy(semaphore);
  2191. delete semaphore;
  2192. pLock->Thread_UnLock();
  2193. delete connection;
  2194. return nullptr;
  2195. }
  2196. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2197. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2198. pLock->Thread_UnLock();
  2199. uint64_t dwWaitTick = GetTickCount() - dwTick;
  2200. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2201. << dwWaitTick << " ms" << std::endl;
  2202. return connection;
  2203. }
  2204. //创建额外连接,需要提供回调函数
  2205. LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout, bool async)
  2206. {
  2207. DWORD dwTick = GetTickCount();
  2208. ////mLog::FINFO("TID {$} : {$} NewConnection2 {$}:{$} ", GetCurrentThreadId(), pszClientID, SERVER_ADDRESS, 1883);
  2209. //mqtt_client* pMqttClient = nullptr;
  2210. //pMqttClient = mqtt_lease();
  2211. ccos_mqtt_connection* connection = new ccos_mqtt_connection;
  2212. //HANDLE hConneted = CreateEvent(NULL, FALSE, FALSE, NULL);
  2213. //std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2214. mqtt_topic_list* pTopicList = new std::list<string>;
  2215. std::get<MQTT_TIPIC_LIST_ID>(*connection) = pTopicList;
  2216. CcosLock* pLock = new CcosLock();
  2217. std::get<CONN_SEND_LOCK_ID>(*connection) = pLock;
  2218. pLock->Thread_Lock();
  2219. ccos_mqtt_msg_filter* pfilter = new ccos_mqtt_msg_filter;
  2220. std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2221. std::get<MSG_HOOK_ID>(*connection) = pfilter;
  2222. std::get<FILTER_RES_OBJ_ID>(*pfilter) = new ResDataObject;
  2223. std::get<FILTER_RESPONS_OBJ_ID>(*pfilter) = new ResDataObject;
  2224. std::cout << "ALLOCATE Connection 2 [" << (UINT64)connection << "] filter [" << (UINT64)pfilter << "] ..for " << pszClientID << endl;
  2225. std::get<CLINET_ID_ID>(*connection) = pszClientID;
  2226. std::get< USER_MSG_CAKBCK_ID>(*connection) = onmsg;
  2227. mqtt_client* pMqttClient = InnerConnect(connection);
  2228. if (pMqttClient == nullptr)
  2229. {
  2230. pLock->Thread_UnLock();
  2231. delete connection;
  2232. delete pfilter;
  2233. return nullptr;
  2234. }
  2235. std::get<MQTT_CLT_ID>(*connection) = pMqttClient;
  2236. //pMqttClient->mqtt_conn_context = connection;
  2237. //设置回调函数
  2238. //设置MQTT连接参数
  2239. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2240. /*
  2241. //连接Broker
  2242. int rc = 0;
  2243. dwTick = GetTickCount();
  2244. while (true)
  2245. {
  2246. rc = mqtt_connect(pMqttClient);
  2247. if (rc != MQTT_SUCCESS_ERROR)
  2248. {
  2249. if (GetTickCount() - dwTick > dwOpenTimeout)
  2250. {
  2251. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2252. pLock->Thread_UnLock();
  2253. delete connection;
  2254. delete pfilter;
  2255. return nullptr;
  2256. }
  2257. else
  2258. {
  2259. ////mLog::FINFO(" TID {$} {$} Failed 2 to connect to the MQTT server Try again 20ms later...... {$} return : ", GetCurrentThreadId(), pszClientID, rc);
  2260. //std::cout << CurrentDateTime() << " TID [" << GetCurrentThreadId() << "] Failed to connect to the MQTT server Try again 2s later..." << pszClientID << endl;
  2261. Sleep(20);
  2262. }
  2263. }
  2264. else
  2265. {
  2266. break;
  2267. }
  2268. }*/
  2269. DWORD dwWaitTick = GetTickCount() - dwTick;
  2270. ////mLog::FWARN("MQTT {$} try ConnMqtt Succecced Use Time ****** {$} ms*** wait: {$} ms*** TID ", pszClientID, GetTickCount() - dwTick, dwWaitTick, GetCurrentThreadId());
  2271. std::get<MSG_LIST_ID>(*connection) = new mqtt_msg_list;
  2272. // 创建POSIX信号量
  2273. sem_t* semaphore = new sem_t;
  2274. sem_init(semaphore, 0, 0);
  2275. std::get<SEMAPHORE_HANDLE_ID>(*connection) = semaphore;
  2276. std::get<THREAD_RUNNING_ID>(*connection) = true;
  2277. // 创建发送线程
  2278. pthread_t threadId;
  2279. if (pthread_create(&threadId, NULL, MqttSendThreadFunc, connection) != 0) {
  2280. std::cerr << "Failed to create MQTT send thread" << std::endl;
  2281. std::get<THREAD_RUNNING_ID>(*connection) = false;
  2282. // 错误处理...
  2283. sem_destroy(semaphore);
  2284. delete semaphore;
  2285. pLock->Thread_UnLock();
  2286. delete connection;
  2287. return nullptr;
  2288. }
  2289. std::get<CONNECTED_HANDLE_ID>(*connection) = threadId;
  2290. std::cout << "MqttSendThreadFunc thread id [" << threadId << "]" << std::endl;
  2291. pLock->Thread_UnLock();
  2292. std::cout << "MQTT " << pszClientID << " try ConnMqtt Succeeded. Use Time: "
  2293. << dwWaitTick << " ms" << std::endl;
  2294. return connection;
  2295. }
  2296. //重置连接
  2297. LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection)
  2298. {
  2299. if (hConnection == nullptr)
  2300. {
  2301. return;
  2302. }
  2303. ////mLog::FWARN(" Reset Mqtt Connection..", std::get<CLINET_ID_ID>(*hConnection));
  2304. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " Close Mqtt Connection.." << endl;
  2305. mqtt_client* pconn = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2306. if (pconn != nullptr) {
  2307. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2308. pLock->Thread_Lock();
  2309. //mqtt_do_reconnect(pconn);
  2310. mqtt_disconnect(pconn);
  2311. /*mqtt_set_resubscribe_handler(pconn, resubscribe_topic);
  2312. mqtt_connect(pconn);*/
  2313. mqtt_release(pconn);
  2314. pconn = InnerConnect(hConnection);
  2315. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2316. if (pconn != nullptr)
  2317. {
  2318. //std::get<MQTT_CLT_ID>(*hConnection) = pconn;
  2319. //resubscribe_topic(pconn, pconn->mqtt_conn_context);
  2320. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2321. }
  2322. pLock->Thread_UnLock();
  2323. }
  2324. }
  2325. //关闭并释放连接
  2326. LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection)
  2327. {
  2328. if (!hConnection) return;
  2329. auto clientId = std::get<CLINET_ID_ID>(*hConnection);
  2330. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection.." << endl;
  2331. std::get<THREAD_RUNNING_ID>(*hConnection) = false;
  2332. sem_t* semaphore = std::get<SEMAPHORE_HANDLE_ID>(*hConnection);
  2333. if (semaphore) {
  2334. sem_post(semaphore); // 发送信号量唤醒线程
  2335. }
  2336. pthread_t sendThread = std::get<CONNECTED_HANDLE_ID>(*hConnection);
  2337. if (sendThread != 0) {
  2338. // 等待线程退出,超时时间设为1秒
  2339. struct timespec timeout;
  2340. clock_gettime(CLOCK_REALTIME, &timeout);
  2341. timeout.tv_sec += 1;
  2342. int joinResult = pthread_timedjoin_np(sendThread, nullptr, &timeout);
  2343. if (joinResult != 0) {
  2344. std::cerr << CurrentDateTime() << " " << clientId
  2345. << " Warning: Send thread did not exit in time, force cancel" << std::endl;
  2346. pthread_cancel(sendThread); // 超时后强制终止(最后的备选方案)
  2347. pthread_join(sendThread, nullptr);
  2348. }
  2349. std::get<CONNECTED_HANDLE_ID>(*hConnection) = 0; // 重置线程ID
  2350. }
  2351. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2352. if (nullptr != pLock)
  2353. pLock->Thread_Lock();
  2354. else
  2355. return;
  2356. mqtt_client* pMqttClient = static_cast<mqtt_client*>(std::get<MQTT_CLT_ID>(*hConnection));
  2357. if (pMqttClient) {
  2358. mqtt_disconnect(pMqttClient); // 断开连接
  2359. mqtt_release(pMqttClient); // 释放客户端实例
  2360. pMqttClient = NULL;
  2361. std::get<MQTT_CLT_ID>(*hConnection) = nullptr;
  2362. }
  2363. ccos_mqtt_msg_filter* pFilter = static_cast<ccos_mqtt_msg_filter*>(std::get<MSG_HOOK_ID>(*hConnection));
  2364. if (pFilter) {
  2365. std::cout << "Free Connection filter [" << (UINT64)pFilter << "] .." << std::endl;
  2366. // 释放filter内部的资源对象
  2367. delete std::get<FILTER_RES_OBJ_ID>(*pFilter);
  2368. delete std::get<FILTER_RESPONS_OBJ_ID>(*pFilter);
  2369. delete pFilter;
  2370. std::get<MSG_HOOK_ID>(*hConnection) = nullptr;
  2371. }
  2372. mqtt_msg_list* pMsgList = std::get<MSG_LIST_ID>(*hConnection);
  2373. if (pMsgList) {
  2374. // 清理剩余未发送的消息
  2375. while (!pMsgList->empty()) {
  2376. ResDataObject* pMsg = pMsgList->front();
  2377. pMsgList->pop_front();
  2378. delete pMsg; // 释放消息对象
  2379. }
  2380. delete pMsgList;
  2381. std::get<MSG_LIST_ID>(*hConnection) = nullptr;
  2382. }
  2383. if (semaphore) {
  2384. sem_destroy(semaphore); // 销毁信号量
  2385. delete semaphore;
  2386. std::get<SEMAPHORE_HANDLE_ID>(*hConnection) = nullptr;
  2387. }
  2388. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2389. if (pTopicList) {
  2390. pTopicList->clear(); // 清空列表
  2391. delete pTopicList;
  2392. std::get<MQTT_TIPIC_LIST_ID>(*hConnection) = nullptr;
  2393. }
  2394. if (pLock) {
  2395. pLock->Thread_UnLock(); // 确保锁被释放
  2396. delete pLock;
  2397. std::get<CONN_SEND_LOCK_ID>(*hConnection) = nullptr;
  2398. }
  2399. delete hConnection;
  2400. std::cout << CurrentDateTime() << clientId << " Close Mqtt Connection over.." << endl;
  2401. }
  2402. //主动订阅主题
  2403. LOGICDEVICE_API int SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare)
  2404. {
  2405. std::cout << "SubscribeTopic called. Topic: " << (pszTopic ? pszTopic : "null")
  2406. << ", isShare: " << (isShare ? "true" : "false") << std::endl;
  2407. if (hConnection == nullptr)
  2408. {
  2409. std::cout << "SubscribeTopic error: hConnection is nullptr" << std::endl;
  2410. return 0;
  2411. }
  2412. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2413. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2414. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2415. pLock->Thread_Lock();
  2416. pTopicList->push_back(std::string(pszTopic));
  2417. //pMqttClient->subscribe(pszTopic, 1, opts);
  2418. int rc = MQTT_SUCCESS_ERROR;
  2419. DWORD dwTick = GetTickCount();
  2420. int nTryTimes = 0;
  2421. do
  2422. {
  2423. if (pTopicList->size() < 1)
  2424. {
  2425. const char* topicToSubscribe = pTopicList->back().c_str();
  2426. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2427. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2428. ////mLog::FWARN("mqtt {$} Subscribe First {$} return {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2429. std::cout << "mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe First " << pszTopic << endl;
  2430. }
  2431. else
  2432. {
  2433. const char* topicToSubscribe = pTopicList->back().c_str();
  2434. int ret = mqtt_subscribe(pMqttClient, topicToSubscribe, MQTT_QOS, msgarrivd);
  2435. //int ret = mqtt_subscribe(pMqttClient, pszTopic, MQTT_QOS, msgarrivd);
  2436. ////mLog::FWARN("mqtt {$} Subscribe ReUse {$} topic num {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret, pTopicList->size());
  2437. std::cout << CurrentDateTime() << " mqtt [" << std::get<CLINET_ID_ID>(*hConnection) << " ]Subscribe ReUse " << pszTopic << " ret: "<<ret <<endl;
  2438. }
  2439. if (rc != MQTT_SUCCESS_ERROR)
  2440. {
  2441. ////mLog::FWARN("try do mqtt reconnect ..");
  2442. //mqtt_do_reconnect(pMqttClient);
  2443. mqtt_disconnect(pMqttClient);
  2444. //mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2445. mqtt_release(pMqttClient);
  2446. pMqttClient = InnerConnect(hConnection);
  2447. int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2448. if (pMqttClient != nullptr)
  2449. {
  2450. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2451. //resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2452. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2453. }
  2454. usleep(2000 * 1000);
  2455. }
  2456. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2457. pLock->Thread_UnLock();
  2458. return 0;
  2459. }
  2460. //主题订阅取消
  2461. LOGICDEVICE_API int UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
  2462. {
  2463. if (hConnection == nullptr)
  2464. {
  2465. return 0;
  2466. }
  2467. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2468. //if (!pMqttClient->is_connected()) {
  2469. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2470. // return 0;
  2471. //}
  2472. //MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2473. //auto ret = pMqttClient->unsubscribe(pszTopic);
  2474. mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*hConnection);
  2475. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2476. pLock->Thread_Lock();
  2477. //MQTTAsync_responseOptions resp;
  2478. pTopicList->remove(pszTopic);
  2479. int rc = MQTT_SUCCESS_ERROR;
  2480. DWORD dwTick = GetTickCount();
  2481. int nTryTimes = 0;
  2482. do
  2483. {
  2484. int ret = mqtt_unsubscribe(pMqttClient, pszTopic);
  2485. ////mLog::FWARN("mqtt {$} Unsubscribe {$} return {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, ret);
  2486. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
  2487. pLock->Thread_UnLock();
  2488. return 2;
  2489. }
  2490. //往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
  2491. LOGICDEVICE_API int PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout)
  2492. {
  2493. char pszClientID[256];
  2494. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2495. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2496. getpid(), GetCurrentThreadId());
  2497. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [](ResDataObject*, const char*, void* conn) {
  2498. });
  2499. mqtt_client* pConn = (mqtt_client*)std::get<MQTT_CLT_ID>(*connObj);
  2500. PacketAnalizer::UpdatePacketTopic(pCmd, pszTopic, pszClientID);
  2501. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2502. string pLoad = pCmd->encode();
  2503. int rc = PublishActionWithoutLock(pLoad, pszTopic, pConn, pszSenderName);
  2504. std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2505. ////mLog::FDEBUG("CLT {$} Publish to {$} send result {$}", pszClientID, pszTopic, rc);
  2506. CloseConnection(connObj);
  2507. return rc;
  2508. }
  2509. //往指定主题发送CCOS协议包整包,使用已创建的连接发送
  2510. LOGICDEVICE_API int PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection, DWORD dwTimeout)
  2511. {
  2512. if (hConnection == nullptr)
  2513. {
  2514. ////mLog::FDEBUG("Who ????? Publish to {$} Action Body: {$}", pszTopic, pAction->encode());
  2515. std::cout << CurrentDateTime() << "Who ????? " << "Publish to [" << pszTopic << "] Action Body: "<< pAction->encode() << endl; //<< pAction->encode()
  2516. return 0;
  2517. }
  2518. std::cout << CurrentDateTime()<<" " << std::get<CLINET_ID_ID>(*hConnection) << " Publish Action to [" << pszTopic << "] Action Body: " << endl; //<< pAction->encode()
  2519. string topic = pszTopic;
  2520. if (topic.length() <= 0)
  2521. {
  2522. ////mLog::FWARN("ignore empty topic packet {$}", pAction->encode());
  2523. return 2;
  2524. }
  2525. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2526. //if (!pMqttClient->is_connected()) {
  2527. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2528. // return 0;
  2529. //}
  2530. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2531. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2532. pLock->Thread_Lock();
  2533. ////mLog::FDEBUG("{$} Publish Action to {$} Action Body: {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, pAction->encode());
  2534. //string org_publisher = PacketAnalizer::GetPacketPublisher(pAction);
  2535. //if(org_publisher.length() <= 0)
  2536. PacketAnalizer::UpdatePacketTopic(pAction, pszTopic, client_id.c_str() );
  2537. ResDataObject* pPacket = new ResDataObject();
  2538. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2539. pPacket->add(pszTopic, pAction->encode());
  2540. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2541. pList->push_back(pPacket);
  2542. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2543. std::cout << "try publish " << pAction->encode() << endl;
  2544. //pMqttClient->publish(pszTopic, pAction->encode());
  2545. /*
  2546. //pConn->publish(pszTopic, pCmd->encode());
  2547. std::cout << "try publish " << pAction->encode() << endl;
  2548. const char* pLoad = pAction->encode();
  2549. int len = strlen(pLoad);
  2550. //MQTTAsync_responseOptions resp;
  2551. //MQTTAsync_responseOptions resp = MQTTAsync_responseOptions_initializer;
  2552. mqtt_message_t msg;
  2553. memset(&msg, 0, sizeof(msg));
  2554. msg.payload = (void*)pLoad;
  2555. msg.qos = MQTT_QOS;
  2556. msg.payloadlen = len;
  2557. int rc = MQTT_SUCCESS_ERROR;
  2558. DWORD dwTick = GetTickCount();
  2559. int nTryTimes = 0;
  2560. do
  2561. {
  2562. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2563. nTryTimes++;
  2564. if (rc != MQTT_SUCCESS_ERROR)
  2565. {
  2566. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect .. {$}",rc, client_id);
  2567. //mqtt_do_reconnect(pMqttClient);
  2568. mqtt_disconnect(pMqttClient);
  2569. rc = mqtt_connect(pMqttClient);
  2570. if (rc == MQTT_SUCCESS_ERROR)
  2571. {
  2572. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2573. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2574. }
  2575. Sleep(2);
  2576. }
  2577. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < dwTimeout);
  2578. //std::cout << "CLT [" << pszClientID << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << rc << endl;
  2579. ////mLog::FINFO("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2580. //int rc = MQTTAsync_send(pMqttClient, pszTopic, len, pLoad, 0, 0, &resp);
  2581. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Use mqtt_client " << (UINT64)pMqttClient << " Publish to [" << pszTopic << "] Send result " << rc << endl;
  2582. if (rc < 0)
  2583. {
  2584. ////mLog::FERROR("{$} PublishAction failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2585. //std::cout << " ErrorCode " << rc << " Send Msg : " << pLoad << endl;
  2586. pLock->Thread_UnLock();
  2587. return rc;
  2588. }*/
  2589. //MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, NULL, NULL);
  2590. //std::cout << "CLT [" << client_id << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2591. ////mLog::FDEBUG("CLT {$} PublishAction to {$} Send List has {$} packets ", client_id, pszTopic, pList->size());
  2592. pLock->Thread_UnLock();
  2593. return 2;
  2594. }
  2595. //往指定主题发送Action包,携带参数,并指定答复的Topic,异步模式处理Resp,需要在对应连接的回调中处理
  2596. int PublishActionWithoutLock(string& message, const char* pszTopic, mqtt_client* pMqttClient, std::string client_id, mqtt_qos_t qos)
  2597. {
  2598. const int dwTimeout = 500;
  2599. if (pMqttClient == nullptr)
  2600. {
  2601. ////mLog::FERROR("Who ????? Publish 2 to {$} Action {$} Body: {$}", pszTopic, message);
  2602. return 0;
  2603. }
  2604. ////mLog::FDEBUG("{$} Publish with qos[{$}] Action 2 to {$} Body: {$} ", client_id, (int)qos, pszTopic, message);
  2605. const char* pLoad = message.c_str();
  2606. int len = message.length();
  2607. mqtt_message_t msg;
  2608. memset(&msg, 0, sizeof(msg));
  2609. msg.payload = (void*)pLoad;
  2610. msg.payloadlen = len;
  2611. msg.qos = qos;
  2612. int rc = MQTT_SUCCESS_ERROR;
  2613. DWORD dwTick = GetTickCount();
  2614. int nTryTimes = 0;
  2615. do
  2616. {
  2617. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2618. nTryTimes++;
  2619. if (rc != MQTT_SUCCESS_ERROR)
  2620. {
  2621. ////mLog::FWARN("try mqtt_publish ret {$} do mqtt reconnect .. {$}", rc, client_id);
  2622. usleep(10000 * 1000);
  2623. //mqtt_do_reconnect(pMqttClient);
  2624. ccos_mqtt_connection* hConnection = (ccos_mqtt_connection*)pMqttClient->mqtt_conn_context;
  2625. mqtt_disconnect(pMqttClient);
  2626. /*mqtt_set_resubscribe_handler(pMqttClient, resubscribe_topic);
  2627. mqtt_connect(pMqttClient);*/
  2628. mqtt_release(pMqttClient);
  2629. pMqttClient = InnerConnect(hConnection);
  2630. //int rc = MQTT_SUCCESS_ERROR;// mqtt_connect(pconn);
  2631. if (pMqttClient != nullptr)
  2632. {
  2633. //std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
  2634. //resubscribe_topic(pMqttClient, hConnection);
  2635. //////mLog::FWARN("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
  2636. }
  2637. }
  2638. } while (rc != MQTT_SUCCESS_ERROR && (nTryTimes <= 2 || GetTickCount() - dwTick < dwTimeout));
  2639. //if(nTryTimes > 1)
  2640. ////mLog::FWARN("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2641. cout << CurrentDateTime() << " CLT " << client_id.c_str() << " PublishAction to " << pszTopic << " send Times " << nTryTimes << " result " << rc << endl;
  2642. ////mLog::FDEBUG("CLT {$} PublishAction to {$} send Times {$} result {$}", client_id, pszTopic, nTryTimes, rc);
  2643. if (rc < 0)
  2644. {
  2645. ////mLog::FERROR("{$} PublishAction {$} failed {$} body: {$}", client_id, pszTopic, rc, pLoad);
  2646. return rc;
  2647. }
  2648. return 2;
  2649. }
  2650. /*
  2651. /// <summary>
  2652. /// 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,
  2653. /// 超时没收到应答返回失败,
  2654. /// 复用链接时须小心,该函数会接管回调函数,结束后恢复
  2655. /// </summary>
  2656. /// <param name="pAction">要发送的命令Action名</param>
  2657. /// <param name="pContext">命令携带的参数</param>
  2658. /// <param name="pszTopic">要发送的目标topic</param>
  2659. /// <param name="pszRespTopic">本次请求的应答接收Topic</param>
  2660. /// <param name="resObj">应答返回的参数结果</param>
  2661. /// <param name="dwWaitTime">等待超时时间</param>
  2662. /// <param name="hConnection">复用的MQTT链接句柄</param>
  2663. /// <param name="onmsg">复用的链接的消息处理函数</param>
  2664. /// <returns>成功返回2,其他返回错误码</returns>
  2665. LOGICDEVICE_API int ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic,
  2666. ResDataObject& resObj, DWORD dwWaitTime)
  2667. {
  2668. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction2 : " << pAction << " to " << pszTopic << endl;// << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2669. if (pszRespTopic != nullptr)
  2670. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2671. if (hConnection == nullptr)
  2672. {
  2673. return 0;
  2674. }
  2675. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2676. //if (!pMqttClient->is_connected()) {
  2677. // //////mLog::FERROR( " MQTT connection lost at subscribe %s ", topic);
  2678. // return 0;
  2679. //}
  2680. HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  2681. //SubscribeTopic()
  2682. auto action_func = [&resObj, pszRespTopic, pMqttClient, hConnection, hEvent](void* context, char* topicName, int topicLen, MQTTClient_message* message) {
  2683. string topic = topicName;//msg->get_topic().c_str();
  2684. if (strcmp(pszRespTopic, topic.c_str()) == 0) {
  2685. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " get Right Resp by " << topic << endl;
  2686. //应答到了,处理
  2687. ResDataObject req;
  2688. req.decode((const char*)message->payload);
  2689. PacketAnalizer::GetPacketContext(&req, resObj);
  2690. SetEvent(hEvent);
  2691. //处理结束 将函数指针换回去
  2692. void* oldFuncPointer = std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2693. //pMqttClient->set_message_callback(*(mqtt::async_client::message_handler*)oldFuncPointer);
  2694. MQTTAsync_setCallbacks(pMqttClient, hConnection, connlost, msgarrvd, NULL);
  2695. }
  2696. else
  2697. {
  2698. //MQTTClient_messageArrived *orgfunc = (MQTTClient_messageArrived*)std::get<MQTT_MSG_ARRIVED_ID>(*hConnection);
  2699. (msgarrvd)(context, topicName, topicLen, message);
  2700. }
  2701. };
  2702. //接管回调
  2703. // pMqttClient->set_message_callback(action_func);
  2704. MQTTClient_setCallbacks(pMqttClient, hConnection, connlost, (MQTTClient_messageArrived*)&action_func, delivered);
  2705. //pMqttClient->subscribe(pszRespTopic, 1);
  2706. MQTTSubscribe_options opts = MQTTSubscribe_options_initializer;
  2707. MQTTProperties prop = MQTTProperties_initializer;
  2708. std::cout << "mqtt 【" << std::get<CLINET_ID_ID>(*hConnection) << " 】Subscribe " << pszTopic << endl;
  2709. //pMqttClient->subscribe(pszTopic, 1, opts);
  2710. MQTTClient_subscribe5(pMqttClient, pszTopic, 1, NULL, NULL);
  2711. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection));
  2712. //pMqttClient->publish(pszTopic, req.encode());
  2713. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  2714. MQTTClient_message* m = NULL;
  2715. MQTTClient_deliveryToken dt;
  2716. MQTTProperty property;
  2717. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  2718. property.value.data.data = "test user property";
  2719. property.value.data.len = (int)strlen(property.value.data.data);
  2720. property.value.value.data = "test user property value";
  2721. property.value.value.len = (int)strlen(property.value.value.data);
  2722. MQTTProperties properties = MQTTProperties_initializer;
  2723. MQTTProperties_add(&properties, &property);
  2724. //pConn->publish(pszTopic, pCmd->encode());
  2725. const char* pLoad = req.encode();
  2726. MQTTResponse resp = MQTTClient_publish5(pMqttClient, pszTopic, strlen(pLoad), pLoad, 0, 0, &properties, &dt);
  2727. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish x to [" << pszTopic << "] result " << resp.reasonCode << endl;
  2728. DWORD ret = WaitForSingleObject(hEvent, dwWaitTime);
  2729. if (ret == WAIT_OBJECT_0) {
  2730. //等到应答了
  2731. return 2;
  2732. }
  2733. return 0;
  2734. }
  2735. */
  2736. /// <summary>
  2737. /// 往指定主题发送Action包,携带参数,复用Resp的Topic,同步等待resp,超时没收到应答返回失败,
  2738. /// </summary>
  2739. /// <param name="hConnection"></param>
  2740. /// <param name="pAction"></param>
  2741. /// <param name="pContext"></param>
  2742. /// <param name="pszTopic"></param>
  2743. /// <param name="resObj"></param>
  2744. /// <param name="hEvent"></param>
  2745. /// <param name="dwWaitTime"></param>
  2746. /// <returns></returns>
  2747. LOGICDEVICE_API int ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
  2748. const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
  2749. {
  2750. usleep(1000 * 1000);
  2751. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << " to " << pszTopic << "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
  2752. if (hConnection == nullptr)
  2753. {
  2754. return 0;
  2755. }
  2756. DWORD dwTick = GetTickCount();
  2757. mqtt_client* pMqttClient = (mqtt_client*)std::get<MQTT_CLT_ID>(*hConnection);
  2758. std::shared_ptr<LinuxEvent> hEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  2759. ResDataObject resContext;
  2760. string strResObject;
  2761. char* pszPad = 0;
  2762. char* pszPad2 = 0;
  2763. auto func = [pAction, hConnection, &pszPad, &resObj, &pszPad2, dwTick, dwWaitTime](ResDataObject* rsp) -> bool {
  2764. ////mLog::FINFO("{$} try check action resp {$} compared with {$}", std::get<CLINET_ID_ID>(*hConnection), pAction, PacketAnalizer::GetPacketKey(rsp));
  2765. std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << " try check action resp [" << pAction << "] compared with " << PacketAnalizer::GetPacketKey(rsp).c_str() << endl;
  2766. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES &&
  2767. strcmp(pAction, PacketAnalizer::GetPacketKey(rsp).c_str()) == 0)
  2768. {
  2769. if (GetTickCount() - dwTick < dwWaitTime)
  2770. {
  2771. ////mLog::FDEBUG("ActionAndRespWithConnDefalt Packet Content {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2772. resObj = *rsp;
  2773. return true;
  2774. }
  2775. else
  2776. {
  2777. ////mLog::FWARN("ActionAndRespWithConnDefalt Packet Content {$} Timeout content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2778. return false;
  2779. }
  2780. std::cout << " : " << PacketAnalizer::GetPacketKey(rsp) << " content " << rsp->encode() << endl;
  2781. }
  2782. ////mLog::FDEBUG("ActionAndRespWithConnDefalt what ? {$} content {$}", PacketAnalizer::GetPacketKey(rsp), rsp->encode());
  2783. std::cout << "ActionAndRespWithConnDefalt what ? " << PacketAnalizer::GetPacketKey(rsp) << endl;
  2784. return false;
  2785. };
  2786. CcosLock* pLock = std::get<CONN_SEND_LOCK_ID>(*hConnection);
  2787. pLock->Thread_Lock();
  2788. ////mLog::FDEBUG("{$} ActionAndRespWithConnDefalt {$} to Topic {$} body {$} ", std::get<CLINET_ID_ID>(*hConnection), pAction, pszTopic, pContext->encode());
  2789. ccos_mqtt_msg_filter* pfilter = (ccos_mqtt_msg_filter*)std::get<MSG_HOOK_ID>(*hConnection);
  2790. ResDataObject *resActions = std::get<FILTER_RES_OBJ_ID>(*pfilter);
  2791. resActions->add(pAction, pszTopic);
  2792. std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
  2793. std::get<FILTER_FUNC_ID>(*pfilter) = func;
  2794. std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
  2795. PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection).c_str());
  2796. const char* pLoad = req.encode();
  2797. ResDataObject* pPacket = new ResDataObject();
  2798. mqtt_msg_list* pList = std::get<MSG_LIST_ID>(*hConnection);
  2799. pPacket->add(pszTopic, pLoad);
  2800. ////mLog::FDEBUG(" {$} Try push packet to Send list: {$}", client_id, pPacket->encode());
  2801. pList->push_back(pPacket);
  2802. sem_post(std::get<SEMAPHORE_HANDLE_ID>(*hConnection));
  2803. /*
  2804. mqtt_message_t msg;
  2805. memset(&msg, 0, sizeof(msg));
  2806. msg.payload = (void*)pLoad;
  2807. msg.qos = MQTT_QOS;
  2808. msg.payloadlen = strlen(pLoad);
  2809. //int rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2810. int rc = MQTT_SUCCESS_ERROR;
  2811. dwTick = GetTickCount();
  2812. int nTryTimes = 0;
  2813. do
  2814. {
  2815. rc = mqtt_publish(pMqttClient, pszTopic, &msg);
  2816. nTryTimes++;
  2817. if (rc != MQTT_SUCCESS_ERROR)
  2818. {
  2819. ////mLog::FINFO("try mqtt_publish ret {$} do mqtt reconnect 3.. {$}", rc, client_id);
  2820. //mqtt_do_reconnect(pMqttClient);
  2821. mqtt_disconnect(pMqttClient);
  2822. rc = mqtt_connect(pMqttClient);
  2823. if (rc == MQTT_SUCCESS_ERROR)
  2824. {
  2825. resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
  2826. ////mLog::FINFO("mqtt_connect ret {$} do mqtt resubscribe {$}", rc, client_id);
  2827. }
  2828. Sleep(2);
  2829. }
  2830. } while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 3);
  2831. if (nTryTimes >= 1)
  2832. ////mLog::FINFO("CLT {$} Publish to {$} send Times {$} result {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, nTryTimes, rc);
  2833. */
  2834. pLock->Thread_UnLock();
  2835. dwTick = GetTickCount() - dwTick;
  2836. //////mLog::FINFO("CLT {$} Publish to {$} Use TID {$} Use Time {$} result {$} ", std::get<CLINET_ID_ID>(*hConnection), pszTopic, GetCurrentThreadId(), dwTick, rc);
  2837. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " Publish to [" << pszTopic << "] Use TID [" << GetCurrentThreadId() << "] Use Time[" << dwTick << "]ms" << endl;
  2838. dwTick = GetTickCount();
  2839. DWORD ret = hEvent->Wait(dwWaitTime);
  2840. if (ret) {
  2841. //等到应答了
  2842. dwTick = GetTickCount() - dwTick;
  2843. pLock->Thread_Lock();
  2844. ResDataObject* pResp = std::get<FILTER_RESPONS_OBJ_ID>(*pfilter);
  2845. for (int x = 0; x < pResp->size(); x++)
  2846. {
  2847. ResDataObject r = (*pResp)[x];
  2848. if (string(pAction) == string(pResp->GetKey(x)) && PacketAnalizer::GetPacketIdx(&req) == PacketAnalizer::GetPacketIdx(&r))
  2849. {
  2850. resObj = r;
  2851. pResp->eraseOneOf(pAction, x);
  2852. break;
  2853. }
  2854. }
  2855. pLock->Thread_UnLock();
  2856. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << " try [" << pszTopic << "] getresp ok Use Time[" << dwTick << "]ms" << endl;
  2857. ////mLog::FDEBUG("CLT {$} try {$} getresp ok Use Time {$}", std::get<CLINET_ID_ID>(*hConnection), pszTopic, dwTick);
  2858. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2859. return 2;
  2860. }
  2861. //std::get<FILTER_FUNC_ID>(*pfilter) = nullptr;
  2862. //resObj.decode(strResObject.c_str());
  2863. ////mLog::FERROR("CLT {$} try {$} getresp timeout ", std::get<CLINET_ID_ID>(*hConnection), pszTopic );
  2864. std::cout << "CLT [" << std::get<CLINET_ID_ID>(*hConnection) << "] at " << CurrentDateTime() << "try [" << pszTopic << "] " << endl;
  2865. //std::get<FILTER_RES_OBJ_ID>(*pfilter) = nullptr;
  2866. return 0;
  2867. }
  2868. /// <summary>
  2869. /// 新建MQTT连接发送Ation并等待应答
  2870. /// </summary>
  2871. /// <param name="pAction"></param>
  2872. /// <param name="pContext"></param>
  2873. /// <param name="pszTopic"></param>
  2874. /// <param name="pszRespTopic"></param>
  2875. /// <param name="resObj"></param>
  2876. /// <param name="dwWaitTime"></param>
  2877. /// <param name="pszSenderName"></param>
  2878. /// <returns></returns>
  2879. LOGICDEVICE_API int ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
  2880. DWORD dwWaitTime, const char* pszSenderName)
  2881. {
  2882. ResDataObject req;
  2883. if (pszRespTopic != nullptr)
  2884. PacketAnalizer::UpdateContextTopic(pContext, pszRespTopic);
  2885. //临时创建连接并发送和接收应答
  2886. char pszClientID[256];
  2887. snprintf(pszClientID, sizeof(pszClientID), "TEMP_%s_%d_0X%08lX",
  2888. (pszSenderName == nullptr) ? "ANONYMOUS" : pszSenderName,
  2889. getpid(), GetCurrentThreadId());
  2890. sem_t sem;
  2891. if (sem_init(&sem, 0, 0) != 0) {
  2892. return 0; // 如果初始化失败,返回0
  2893. }
  2894. std::cout << " ActionAndResp->NewConnection " << endl;
  2895. ccos_mqtt_connection* connObj = NewConnection(pszClientID, [&resObj, &sem](ResDataObject* req, const char* topic, void* conn) {
  2896. //应答到了,处理
  2897. PacketAnalizer::GetPacketContext(req, resObj);
  2898. sem_post(&sem);
  2899. });
  2900. ////发布消息,并等待应答
  2901. PublishAction(&req, pszTopic, connObj);
  2902. // 等待应答或超时
  2903. struct timespec ts;
  2904. clock_gettime(CLOCK_REALTIME, &ts);
  2905. ts.tv_sec += dwWaitTime / 1000; // 转换为秒
  2906. ts.tv_nsec += (dwWaitTime % 1000) * 1000000; // 转换为纳秒
  2907. // 等待信号量或者超时
  2908. int ret = 0;
  2909. while (ret == 0) {
  2910. ret = sem_timedwait(&sem, &ts); // 超时或者接收到信号量
  2911. if (ret == -1 && errno == ETIMEDOUT) {
  2912. // 超时
  2913. sem_destroy(&sem);
  2914. CloseConnection(connObj);
  2915. return 0; // 超时返回0
  2916. }
  2917. }
  2918. // 等到应答了
  2919. sem_destroy(&sem); // 销毁信号量
  2920. CloseConnection(connObj);
  2921. return 2; // 返回2表示应答成功
  2922. }