LogicClient.cpp 53 KB


  1. //#include "SysIF.h"
  2. #include "LocalConfig.h"
  3. #include "LogicClient.h"
  4. #include "common_api.h"
  5. #include "PacketAnalizer.h"
  6. //#include "Logger.h"
  7. #define ATTRIBUTE ("Attribute")
  8. #include <iostream>
  9. #include<chrono>
  10. #ifndef _CRT_SECURE_NO_WARNINGS
  11. #define _CRT_SECURE_NO_WARNINGS
  12. #endif
  13. //Log4CPP::Logger* ////mLog::gLogger = nullptr;
  14. string LogHost = "";
  15. #ifndef GET_CURRENT_THREAD_ID_DEFINED
  16. #define GET_CURRENT_THREAD_ID_DEFINED
  17. inline DWORD GetCurrentThreadId() {
  18. return static_cast<DWORD>(syscall(SYS_gettid));
  19. }
  20. #endif
  21. // Linux下获取进程ID
  22. inline DWORD GetCurrentProcessId() {
  23. return getpid();
  24. }
  25. inline string CurrentDateTime2()
  26. {
  27. std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  28. char buf[100] = { 0 };
  29. std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now));
  30. return buf;
  31. }
  32. inline std::string CurrentDateTime()
  33. {
  34. // 获取当前时间点
  35. auto now = std::chrono::system_clock::now();
  36. // 将时间长度转换为微秒数
  37. auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
  38. // 再转成tm格式
  39. auto now_time_t = std::chrono::system_clock::to_time_t(now);
  40. auto now_tm = std::localtime(&now_time_t);
  41. // 可以直接输出到标准输出
  42. // std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl;
  43. // 格式化字符,年月日时分秒
  44. std::string now_time_str;
  45. char buf[64];
  46. std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm);
  47. now_time_str += buf;
  48. // 格式化微秒
  49. //snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000);
  50. snprintf(buf, sizeof(buf), ".%06ld ", now_us.count() % 1000000);
  51. now_time_str += buf;
  52. //printf("%s\n", now_time_str.c_str());
  53. return now_time_str;
  54. }
  55. LogicClient::LogicClient(string szClientName, string szTransaction, string szType, bool bNeedNotify)
  56. {
  57. m_ReqIdx = 0;
  58. pLogicDeivce = NULL;
  59. m_SyncMode = ACTION_SYNC;
  60. m_FileFlags = 0;
  61. m_ClosedFlag = true;
  62. m_OwnerThreadId = 0;
  63. m_pActionName = new string();
  64. m_pFilePath = new string();
  65. m_pLastMsgInfo = new string();
  66. m_pPacketWaitQue = new MsgQueue<ResDataObject>();
  67. m_pPacketReceivedQue = new MsgQueue<ResDataObject>();
  68. m_pPacketNotifyQue = new MsgQueue<ResDataObject>();
  69. m_NotifyEvent = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false);
  70. m_ResponseEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  71. m_bNeedNotify = bNeedNotify;
  72. //m_pSysIF = new ClientSysIF();
  73. m_pMqttConn = nullptr;
  74. m_strClientName = szClientName;
  75. if (m_strClientName.length() <= 0) {
  76. string stream;
  77. stream = "LogicClient_" + to_string( GetCurrentProcessId()) + "_" + string( (const char*)getLocalEbusId()) + "_" +to_string( GetCurrentThreadId());
  78. m_strClientName = stream;
  79. stream.clear();
  80. string stream2;
  81. stream2 = CCOS_CLIENT_ROOT_TOPIC + string( "/Resp_") + to_string( GetCurrentProcessId()) + "_" + string( (const char*)getLocalEbusId()) + "_" + to_string( GetCurrentThreadId());
  82. m_strDefaultTopic = stream2;
  83. }
  84. else
  85. {
  86. stringstream stream;
  87. stream << "LogicClient_" << szClientName << "_" << (const char*)getLocalEbusId() << "_" << GetCurrentProcessId();
  88. m_strClientName = stream.str();
  89. stream.clear();
  90. stringstream stream2;
  91. stream2 << CCOS_CLIENT_ROOT_TOPIC << (szType.length()<=0?"":"/") << (szType.length()<=0?"":szType) << "/" << szClientName << "/Resp_" << (const char*)getLocalEbusId() << "_" << GetCurrentProcessId();
  92. m_strDefaultTopic = stream2.str();
  93. }
  94. m_strCurTransaction = szTransaction;
  95. m_pFileHandle = new CcosDevFileHandle();
  96. m_ClientLock = (HANDLE)new CcosLock();
  97. m_Router = CCOS_PACKET_ROUTE_ANY;
  98. m_hDeviceOpenOK = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false);// CreateEvent(NULL, FALSE, FALSE, NULL);
  99. m_hDeviceCloseOK = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  100. m_hReOpen = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false);
  101. m_pPacketArrivedCallbackFunc = nullptr;
  102. m_openCallbackFunc = nullptr;
  103. SetName(m_strClientName.c_str());
  104. }
  105. LogicClient::~LogicClient(void)
  106. {
  107. ////mLog::FWARN("Free LogicClient Object [{$}]", m_strClientName);
  108. cout << "Free LogicClient Object == " << m_strClientName << endl;
  109. if (IsClosed() == false)
  110. {
  111. //make close
  112. Close();
  113. }
  114. //delete m_pSysIF;
  115. CloseConnection(m_pMqttConn);
  116. //m_pSysIF = NULL;
  117. m_pMqttConn = nullptr;
  118. delete m_pActionName;
  119. m_pActionName = NULL;
  120. delete m_pFilePath;
  121. m_pFilePath = NULL;
  122. delete m_pLastMsgInfo;
  123. m_pLastMsgInfo = NULL;
  124. delete m_pPacketWaitQue;
  125. m_pPacketWaitQue = NULL;
  126. delete m_pPacketReceivedQue;
  127. m_pPacketReceivedQue = NULL;
  128. delete m_pPacketNotifyQue;
  129. m_pPacketNotifyQue = NULL;
  130. delete m_pFileHandle;
  131. m_pFileHandle = NULL;
  132. delete (CcosLock*)m_ClientLock;
  133. m_ClientLock = NULL;
  134. }
  135. bool LogicClient::GetFileHandle(CcosDevFileHandle &Handle)
  136. {
  137. if (IsClosed() == false)
  138. {
  139. Lock();
  140. Handle = (*m_pFileHandle);
  141. UnLock();
  142. return true;
  143. }
  144. //make client handle
  145. Handle[CCOS_PACK_HANDLE_LANG] = "en-US";//NOT FINISHED YET
  146. Handle[CCOS_PACK_HANDLE_ROUTE] = m_Router;//route
  147. Handle[CCOS_PACK_HANDLE_FLAGS] = 0;//
  148. Handle[CCOS_PACKET_HANDLE_KEY] = 0;//
  149. //target
  150. Handle.GetTarget(false)[CCOS_PACK_TARGET_ADDR] = (UINT64)0;
  151. Handle.GetTarget(false)[CCOS_PACK_TARGET_PROCID] = (UINT64)0;
  152. Handle.GetTarget(false)[CCOS_PACK_TARGET_BUSID] = "";
  153. Handle.GetTarget(false)[CCOS_PACK_TARGET_MACHINEID] = "";
  154. //local
  155. Handle.GetTarget()[CCOS_PACK_TARGET_ADDR] = (UINT64)(PVOID*)m_pMqttConn;
  156. Handle.GetTarget()[CCOS_PACK_TARGET_PROCID] = (UINT64)GetCurrentProcessId();
  157. Handle.GetTarget()[CCOS_PACK_TARGET_BUSID] = (const char*)getLocalEbusId();
  158. Handle.GetTarget()[CCOS_PACK_TARGET_MACHINEID] = (const char*)getLocalMachineId();
  159. return false;
  160. }
  161. bool LogicClient::SetRouter(CCOS_PACKET_ROUTE Route)
  162. {
  163. if (IsClosed())
  164. {
  165. m_Router = Route;
  166. return true;
  167. }
  168. return false;
  169. }
  170. bool LogicClient::Lock(DWORD timeout)
  171. {
  172. if (m_ClientLock == NULL)
  173. {
  174. ////mLog::FERROR("{$} @ Trans {$} Lock handle is null", m_strClientName, m_strCurTransaction);
  175. return false;
  176. }
  177. bool LockRet = (((CcosLock*)m_ClientLock)->Thread_Lock(timeout) == WAIT_OBJECT_0);
  178. std::cout << m_strClientName << " [LLKK] "<< (UINT64)this << " Try Lock Result " << LockRet << " With timeout " << timeout << endl;
  179. if (!LockRet)
  180. std::cout << m_strClientName << " [LLKK] " << (UINT64)this << " Lock Failed" << endl;
  181. return LockRet;
  182. }
  183. void LogicClient::UnLock()
  184. {
  185. if (m_ClientLock == NULL)
  186. {
  187. ////mLog::FERROR("{$} @ Trans {$} UnLock handle is null", m_strClientName, m_strCurTransaction);
  188. return ;
  189. }
  190. std::cout << m_strClientName << " [LLKK] " << (UINT64)this << " Unlock " << endl;
  191. ((CcosLock*)m_ClientLock)->Thread_UnLock();
  192. }
  193. bool LogicClient::InitClient(const char *pPath, int flags)
  194. {
  195. std::cout <<"Enter InitClient:pPath:"<< pPath << endl;
  196. ResDataObject rMsg;
  197. m_strDevicePath = pPath;
  198. m_OwnerThreadId = 0;
  199. (*m_pLastMsgInfo) = "";
  200. std::cout << "Start ClearQueue_Internal()" << endl;
  201. ClearQueue_Internal();
  202. ////mLog::FWARN("m_DeviceResource.clear() [{$}]", m_strDevicePath);
  203. m_DeviceResource.clear();
  204. //m_pFileHandle->m_Lang.SetVal("en-US");//NOT FINISHED YET
  205. //m_pFileHandle->m_Route = m_Router;//route
  206. //m_pFileHandle->m_Flags = flags;
  207. //m_pFileHandle->m_HandleId = 0;
  208. (*m_pFileHandle)[CCOS_PACK_HANDLE_LANG] = "en-US";//NOT FINISHED YET
  209. (*m_pFileHandle)[CCOS_PACK_HANDLE_ROUTE] = m_Router;//route
  210. (*m_pFileHandle)[CCOS_PACK_HANDLE_FLAGS] = flags;
  211. (*m_pFileHandle)[CCOS_PACKET_HANDLE_KEY] = (UINT64)0;//
  212. //target
  213. m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_MACHINEID] = "";
  214. m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_BUSID] = "";
  215. m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_PROCID] = (UINT64)0;
  216. m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_ADDR] = (UINT64)0;
  217. //m_pFileHandle->m_Dev.m_Addr = (UINT64)0;
  218. //m_pFileHandle->m_Dev.m_ProcID = (UINT64)0;
  219. //m_pFileHandle->m_Dev.m_busID = "";
  220. //m_pFileHandle->m_Dev.m_MachineID = "";
  221. //local
  222. m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_MACHINEID] = (const char*)getLocalMachineId();
  223. m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_BUSID] = (const char*)getLocalEbusId();
  224. m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_PROCID] = (UINT64)GetCurrentProcessId();
  225. m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_ADDR] = (UINT64)(PVOID*)m_pMqttConn;
  226. //m_pFileHandle->m_Owner.m_Addr = (UINT64)(PVOID*)m_pSysIF;
  227. //m_pFileHandle->m_Owner.m_ProcID = (UINT64)GetCurrentProcessId();
  228. //m_pFileHandle->m_Owner.m_busID = (const char*)getLocalEbusId();
  229. //m_pFileHandle->m_Owner.m_MachineID = (const char*)getLocalMachineId();
  230. m_FileFlags = flags;
  231. if (m_strDevicePath.length() <= 0)
  232. {
  233. //允许透传消息
  234. m_SyncMode = ACTION_SYNC;
  235. }
  236. if (pPath)
  237. {
  238. string busId;
  239. std::cout << "Start getBusIdFromFilePath()" << endl;
  240. if (getBusIdFromFilePath(pPath, busId) == false)
  241. {
  242. (*m_pFilePath) = "";
  243. //return false;
  244. }
  245. //m_pFileHandle->m_Dev.m_busID = busId;
  246. m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_BUSID] = busId.c_str();
  247. (*m_pFilePath) = pPath;
  248. }
  249. //ClientSysIF *p = (ClientSysIF*)m_pSysIF;
  250. //p->Clear();
  251. //if (////mLog::gLogger == nullptr)
  252. //{
  253. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  254. // LogHost = ((string)getLogRootpath()).c_str();
  255. // if (LogHost.length() <= 1)
  256. // {
  257. // char szName[256];
  258. // sprintf(szName, "/LogicClient_%08d", GetCurrentProcessId());
  259. // LogHost = szName;
  260. // }
  261. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicClient");
  262. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  263. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  264. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  265. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicClient");
  266. // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
  267. //}
  268. //else
  269. //{
  270. // string strRoot = ((string)getLogRootpath()).c_str();
  271. // if (strRoot.length() > 1 && strRoot != LogHost)
  272. // {
  273. // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)";
  274. // LogHost = strRoot;
  275. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicClient");
  276. // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str());
  277. // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1);
  278. // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str());
  279. // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicClient");
  280. // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
  281. // }
  282. //}
  283. //string version;
  284. //if (GetVersion(version, hMyModule))
  285. // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str());
  286. //else
  287. ////mLog::FWARN("\n{$}=============== LogicClient log begin : version:1.0.0.0 ===================\n", m_strClientName);
  288. return true;
  289. }
  290. bool LogicClient::GetFilePath(ResDataObject &path)
  291. {
  292. if (IsClosed() == false)
  293. {
  294. path = (*m_pFilePath).c_str();
  295. return true;
  296. }
  297. return false;
  298. }
  299. RET_STATUS LogicClient::ProcessActionResponse(ResDataObject &packet, ResDataObject &OutputParam)
  300. {
  301. RET_STATUS ret = RET_FAILED;
  302. if (PacketAnalizer::GetPacketRetCode(&packet, ret))
  303. {
  304. //get error or warning Msg here
  305. string errInfo;
  306. PacketAnalizer::GetPacketMsgInfo(&packet, errInfo);
  307. if (errInfo.size() > 0)
  308. {
  309. string actionname = PacketAnalizer::GetPacketKey(&packet);
  310. (*m_pLastMsgInfo) = actionname + ":" + errInfo;
  311. }
  312. if (ret >= RET_SUCCEED)
  313. {
  314. if (PacketAnalizer::GetPacketContext(&packet, OutputParam))
  315. {
  316. //succeed
  317. //printf("Thread:%d,Action OK\n", GetCurrentThreadId());
  318. return ret;
  319. }
  320. }
  321. if (ret == RET_INVALID)
  322. {
  323. //DisconnectCDI();
  324. }
  325. }
  326. return ret;
  327. }
  328. RET_STATUS LogicClient::ProcessResource(ResDataObject &packet)
  329. {
  330. ResDataObject resParam;
  331. RET_STATUS ret = RET_FAILED;
  332. //got match response
  333. if (PacketAnalizer::GetPacketRetCode(&packet, ret))
  334. {
  335. //get error or warning Msg here
  336. PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo));
  337. if (ret >= RET_SUCCEED)
  338. {
  339. //whole device resource
  340. if (PacketAnalizer::GetPacketContext(&packet, m_DeviceResource))
  341. {
  342. ////mLog::FDEBUG("Update Device {$} Resource [{$}]", m_strDevicePath, m_DeviceResource.encode());
  343. return ret;
  344. }
  345. }
  346. }
  347. ////mLog::FERROR("Device Open Result Error", packet.encode());
  348. //put some log here
  349. ////mLog::FINFO("m_DeviceResource.clear() [{$}] after Device Open Result Error", m_strDevicePath);
  350. m_DeviceResource.clear();
  351. return RET_FAILED;
  352. }
  353. RET_STATUS LogicClient::ProcessClose(ResDataObject &packet)
  354. {
  355. //we have disconnect here
  356. PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo));
  357. //make the object status as CLOSED
  358. //DisconnectCDI();
  359. //clear the info
  360. //m_pPacketReceivedQue->Lock();
  361. m_pPacketWaitQue->Clear();
  362. m_pPacketReceivedQue->Clear();
  363. //m_DeviceResource.clear();
  364. //m_pPacketReceivedQue->UnLock();
  365. //m_pPacketNotifyQue->Lock();
  366. m_pPacketNotifyQue->Clear();
  367. //m_pPacketNotifyQue->UnLock();
  368. return RET_INVALID;
  369. }
  370. RET_STATUS LogicClient::ProcessOpenResponse(ResDataObject &packet)
  371. {
  372. ResDataObject resParam;
  373. RET_STATUS ret = RET_FAILED;
  374. //got match response
  375. if (PacketAnalizer::GetPacketRetCode(&packet, ret))
  376. {
  377. //get error or warning Msg here
  378. PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo));
  379. if (ret >= RET_SUCCEED)
  380. {
  381. if (PacketAnalizer::GetPacketHandle(&packet, resParam))
  382. {
  383. if (m_pFileHandle == NULL)
  384. return RET_FAILED;
  385. //got file handle
  386. if (m_pFileHandle->SetResDataObject(resParam))
  387. {
  388. //succeed here
  389. //Get Device Resource
  390. if (ProcessResource(packet) >= RET_SUCCEED)
  391. {
  392. string res = packet.encode();
  393. ////mLog::FDEBUG("OpenResponse ok {$}", res.substr(0, 16384));
  394. //////mLog::FDEBUG("OpenResponse ok\n");
  395. if (m_bNeedNotify && m_ClosedFlag) {
  396. const char* realPath = NULL;
  397. if (m_strDevicePath[0] == '/')
  398. realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
  399. else
  400. realPath = m_strDevicePath.c_str();
  401. string notifyTopic = realPath;
  402. notifyTopic += "/Notify/#";
  403. SubscribeTopic(m_pMqttConn, notifyTopic.c_str());
  404. }
  405. if (m_openCallbackFunc != nullptr)
  406. m_openCallbackFunc(this, m_strDevicePath.c_str(), 1);
  407. m_OwnerThreadId = (DWORD)GetCurrentThreadId();
  408. std::cout << "TID [" << m_OwnerThreadId << "] LogicClient::ProcessOpenResponse OpenResponse ok." << m_strClientName << endl;
  409. ////mLog::FDEBUG("LogicClient::ProcessOpenResponse {$} OpenResponse ok. {$}", m_OwnerThreadId, m_strClientName);
  410. m_hDeviceOpenOK->SetEvent();
  411. SetThreadOnTheRun(true); //线程可以正常运转
  412. m_ClosedFlag = false;
  413. return ret;
  414. }
  415. }
  416. }
  417. }
  418. }
  419. ////mLog::FERROR("Thread:{$} ,OpenResponse ng ",GetCurrentThreadId());
  420. //printf("Thread:%d,OpenResponse ng\n", GetCurrentThreadId());
  421. //put some log here
  422. return RET_FAILED;
  423. }
  424. RET_STATUS LogicClient::CloseDevice()
  425. {
  426. RET_STATUS ret = RET_FAILED;
  427. const char* realPath = NULL;
  428. if (m_strDevicePath[0] == '/')
  429. realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
  430. else
  431. realPath = m_strDevicePath.c_str();
  432. ////mLog::FDEBUG("{$} CloseDevice {$} and use default topic {$}", m_strClientName, realPath, m_strDefaultTopic);
  433. if (Lock(1000) == false)
  434. {
  435. ////mLog::FERROR("CloseDev Lock Timeout for Dev: {$} ", m_strClientName);
  436. //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath);
  437. return RET_TIMEOUT;
  438. }
  439. if (m_ClosedFlag)
  440. {
  441. UnSubscribe(m_pMqttConn, m_strDefaultTopic.c_str());
  442. if (m_bNeedNotify) {
  443. string notifyTopic = realPath;
  444. notifyTopic += "/Notify/#";
  445. UnSubscribe(m_pMqttConn, notifyTopic.c_str());
  446. }
  447. }
  448. UnLock();
  449. m_hDeviceOpenOK->ResetEvent();
  450. m_hDeviceCloseOK->SetEvent();
  451. //标志
  452. //m_ClosedFlag = true;
  453. ////make close req
  454. ResDataObject req;
  455. PacketAnalizer::MakeCloseRequest(req, (*m_pFileHandle));
  456. if ((m_FileFlags & CCOS_FILE_FLAGS::ABSCRACT_ONLINE) == CCOS_FILE_FLAGS::ABSCRACT_ONLINE)
  457. {
  458. if (m_strWS.length() > 0)
  459. {
  460. //需要显式 上设备上线
  461. ResDataObject resContext;
  462. resContext.add("Offline", m_strWS.c_str());
  463. PacketAnalizer::UpdatePacketContext(req, resContext);
  464. PublishAction(&req, m_pFilePath->c_str(), m_pMqttConn);
  465. }
  466. }
  467. ////send it
  468. ////ret = pLogicDeivce->CmdFromLogicDev(&req);
  469. ////UnLock();
  470. //DWORD dwret = WaitForSingleObject(m_hCloseOK, 1000);
  471. //if (dwret != WAIT_OBJECT_0)
  472. //{
  473. // //printf("Thread:%d,Send Close OK\n", GetCurrentThreadId());
  474. //}
  475. //else
  476. //{
  477. // printf("Thread:%d,Send Close Failed\n", GetCurrentThreadId());
  478. //}
  479. return ret;
  480. }
  481. RET_STATUS LogicClient::OpenDevice(bool bAsync)
  482. {
  483. RET_STATUS ret = RET_FAILED;
  484. ResDataObject req, res, resParam;
  485. const char* realPath = NULL;
  486. if (m_strDevicePath.length() <= 0)
  487. {
  488. //支持不打开设备的客户端
  489. m_OwnerThreadId = (DWORD)GetCurrentThreadId();
  490. std::cout << "TID [" << m_OwnerThreadId << "] LogicClient::ProcessOpenResponse OpenResponse ok." << m_strClientName << endl;
  491. ////mLog::FINFO("Open null device {$} OpenResponse ok. {$}", m_OwnerThreadId, m_strClientName);
  492. m_hDeviceOpenOK->SetEvent();
  493. SetThreadOnTheRun(true); //线程可以正常运转
  494. m_ClosedFlag = false;
  495. return RET_SUCCEED;
  496. }
  497. if (m_strDevicePath[0] == '/')
  498. realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
  499. else
  500. realPath = m_strDevicePath.c_str();
  501. if (Lock(1000) == false)
  502. {
  503. ////mLog::FERROR("OpenDev Lock Timeout for Dev: {$} ", m_strClientName);
  504. //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath);
  505. return RET_TIMEOUT;
  506. }
  507. if (m_ClosedFlag)
  508. {
  509. SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str());
  510. ////mLog::FDEBUG("{$} OpenDevice {$} and use default topic {$} with Async", m_strClientName, realPath, m_strDefaultTopic, bAsync);
  511. }
  512. else
  513. {
  514. ////mLog::FDEBUG("{$} OpenDevice {$} and use default topic {$} with Async and No ReSubscribe", m_strClientName, realPath, m_strDefaultTopic, bAsync);
  515. }
  516. DWORD reqidx = PacketAnalizer::MakeOpenRequest(req, (*m_pFileHandle), realPath);
  517. PacketAnalizer::UpdateOpenRequest(req, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)m_pMqttConn);
  518. PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str());
  519. PacketAnalizer::UpdatePacketTopic(&req, realPath, m_strClientName.c_str());
  520. if ((m_FileFlags & CCOS_FILE_FLAGS::ABSCRACT_ONLINE) == CCOS_FILE_FLAGS::ABSCRACT_ONLINE)
  521. {
  522. if (m_strWS.length() > 0)
  523. {
  524. //需要显式 上设备上线
  525. ResDataObject resContext;
  526. PacketAnalizer::GetPacketContext(&req, resContext);
  527. resContext.add("Online", m_strWS.c_str());
  528. PacketAnalizer::UpdatePacketContext(req, resContext);
  529. }
  530. }
  531. int result = PublishAction(&req, realPath, m_pMqttConn);
  532. UnLock();
  533. if (result < 0)
  534. {
  535. ////mLog::FERROR("Publish Open Packet Failed . try Reopen. Frist UnSubscribe {$}", m_strDefaultTopic);
  536. //UnSubscribe(m_pMqttConn, m_strDefaultTopic.c_str());
  537. //SetEvent(m_hReOpen);
  538. return RET_FAILED;
  539. }
  540. //UnLock();
  541. if (!bAsync)
  542. {
  543. ////mLog::FINFO("try Wait for Open ok.{$}", m_strClientName);
  544. std::cout << "try Wait for Open ok." <<m_strClientName << endl;
  545. if (m_hDeviceOpenOK->Wait(4000))
  546. {
  547. std::cout << "Open ok. done" << m_strClientName << endl;
  548. ////mLog::FINFO("Open ok. done{$}", m_strClientName);
  549. if (m_openCallbackFunc != nullptr)
  550. m_openCallbackFunc(this, realPath, 1);
  551. return RET_SUCCEED;
  552. }
  553. }
  554. else
  555. {
  556. ////mLog::FINFO("No Wait for Open ok and try Reopen.{$}", m_strClientName);
  557. //SetEvent(m_hReOpen);
  558. return RET_FAILED;
  559. }
  560. return ret;
  561. }
  562. bool LogicClient::OnStartThread()
  563. {
  564. RET_STATUS ret = RET_FAILED;
  565. const char* realPath = NULL;
  566. std::cout << "OnStartThreadL:m_strDevicePath: " << m_strDevicePath << std::endl;
  567. if (m_strDevicePath[0] == '/')
  568. realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
  569. else
  570. realPath = m_strDevicePath.c_str();
  571. //if (Lock(10000) == false)
  572. //{
  573. // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  574. // return false;
  575. //}
  576. //GPRINTA_DEBUG("Thread:%d,Open Entry", GetCurrentThreadId());
  577. //make connection to CDI
  578. //ConnectCDI();
  579. ResDataObject req, res, resParam;
  580. //make open req
  581. //HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  582. // 新增调试输出
  583. if (m_pMqttConn) {
  584. std::cout << "m_pMqttConn Memory address: " << m_pMqttConn << std::endl;
  585. // 解包元组内容并输出
  586. void* mqttClient = std::get<0>(*m_pMqttConn);
  587. mqtt_topic_list* topicList = std::get<1>(*m_pMqttConn);
  588. void* msgHook = std::get<2>(*m_pMqttConn);
  589. const char* clientId = std::get<3>(*m_pMqttConn);
  590. ccos_mqtt_callback callback = std::get<4>(*m_pMqttConn);
  591. pthread_t threadId = std::get<5>(*m_pMqttConn);
  592. CcosLock* connLock = std::get<6>(*m_pMqttConn);
  593. mqtt_msg_list* msgList = std::get<7>(*m_pMqttConn);
  594. sem_t* semaphore = std::get<8>(*m_pMqttConn);
  595. std::cout << "MQTT Client Handle: " << mqttClient << std::endl;
  596. std::cout << "MQTT Topic List: " << topicList << std::endl;
  597. std::cout << "Message Hook ID: " << msgHook << std::endl;
  598. std::cout << "Client ID: " << (clientId ? clientId : "null") << std::endl;
  599. std::cout << "Thread ID: " << threadId << std::endl;
  600. std::cout << "Connection Lock: " << connLock << std::endl;
  601. std::cout << "Message List: " << msgList << std::endl;
  602. std::cout << "Semaphore Handle: " << semaphore << std::endl;
  603. }
  604. else {
  605. std::cout << "m_pMqttConn is nullptr" << std::endl;
  606. }
  607. //HANDLE hOpened = CreateEvent(NULL, FALSE, FALSE, NULL);
  608. if (m_pMqttConn == nullptr)
  609. {
  610. ////mLog::FINFO("LogicClient::Open {$} try open path:[{$}]", m_strClientName, realPath );
  611. std::cout << CurrentDateTime() << " LogicClient::Open [" << m_strClientName << "] try open path: " << realPath << endl << endl;
  612. m_pMqttConn = NewConnection(m_strClientName.c_str(), [&ret, realPath, this](ResDataObject* rsp, const char* topic, void* conn_void) {
  613. //m_pMqttConn = CreateConnection
  614. std::cout << "OnStartThread::NewConnnect callback in "<< std::endl;
  615. if (m_pMqttConn == NULL)
  616. return;
  617. //回调函数不再加锁,要确保每一步都是ok的
  618. /*
  619. if (Lock(1000) == false)
  620. {
  621. ////mLog::FERROR("callback Lock Timeout for Dev: {$} ", m_strClientName);
  622. //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath);
  623. return ;
  624. }*/
  625. ccos_mqtt_connection* conn = (ccos_mqtt_connection*)conn_void;
  626. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES)
  627. {
  628. //check cmd
  629. string path = realPath;
  630. if ((PacketAnalizer::GetPacketCmd(rsp) == PACKET_CMD_OPEN) &&
  631. (PacketAnalizer::GetPacketKey(rsp).substr(0, path.length()) == path || PacketAnalizer::GetPacketKey(rsp) == realPath))
  632. {
  633. ////mLog::FINFO("{$} LogicClient::Open {$} OK ", m_strClientName, realPath);
  634. std::cout << "\n\n-----------" << CurrentDateTime() << "LogicClient::Open OK [" << m_strClientName << "] try open path:" << realPath << endl << endl;
  635. if (Lock(2000))
  636. {
  637. ret = ProcessOpenResponse(*rsp);
  638. UnLock();
  639. }
  640. else
  641. {
  642. ////mLog::FERROR("{$} Process Open Result with Lock Client failed ", m_strClientName);
  643. }
  644. }
  645. else
  646. {
  647. ////mLog::FINFO("Unkown Packet: cmd {$} packet key: {$}", (int)PacketAnalizer::GetPacketCmd(rsp), PacketAnalizer::GetPacketKey(rsp) );
  648. std::cout << m_strClientName << CurrentDateTime() << "Try Conn callback get unkown packet pointer [" << (UINT64)this << endl;
  649. std::cout << "Unkown Packet: cmd [" << PacketAnalizer::GetPacketCmd(rsp) << "] packet key: " << PacketAnalizer::GetPacketKey(rsp) << "" << endl;
  650. if (m_pPacketArrivedCallbackFunc != nullptr)
  651. {
  652. m_pPacketArrivedCallbackFunc(rsp, topic, conn);
  653. }
  654. else
  655. {
  656. //m_pPacketReceivedQue->Lock();
  657. m_pPacketReceivedQue->InQueue(*rsp);
  658. //SetEvent(m_NotifyEvent);//notify to user
  659. m_ResponseEvent->SetEvent();
  660. //m_pPacketReceivedQue->UnLock();
  661. }
  662. }
  663. }
  664. else
  665. {
  666. //got other one
  667. //printf("Thread:%d,in open.got other packet\n", GetCurrentThreadId());
  668. //GPRINTA_DEBUG("in open.got other packet\n\n");
  669. //check priority one => CLOSE
  670. if (PacketAnalizer::GetPacketCmd((ResDataObject*)rsp) == PACKET_CMD_CLOSE)
  671. {
  672. std::cout << CurrentDateTime() << m_strClientName << "Got Close Paket ..." << endl;
  673. //we have disconnect here
  674. //GPRINTA_DEBUG("in open.got Close packet\n\n");
  675. ret = ProcessClose(*rsp);
  676. }
  677. else {
  678. std::cout << "*****-----***** " << CurrentDateTime() << m_strClientName << " Received msg " << rsp->encode() << endl;
  679. if (m_pPacketArrivedCallbackFunc != nullptr)
  680. m_pPacketArrivedCallbackFunc(rsp, topic, conn);
  681. else
  682. {
  683. if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_NOTIFY )
  684. {
  685. PACKET_CMD cmd = PACKET_CMD_NONE;
  686. PACKET_TYPE rType;
  687. string key;
  688. rType = PacketAnalizer::GetPacketType(rsp);
  689. cmd = PacketAnalizer::GetPacketCmd(rsp);
  690. if (rType == PACKET_TYPE_NOTIFY /*&& m_pActionName->size() <= 0*/ && cmd == PACKET_CMD_UPDATE)
  691. {
  692. //只有在非Action期间,更新,如果有遗漏,再说 2024.9.2
  693. cmd = PacketAnalizer::GetPacketCmd(rsp);
  694. key = PacketAnalizer::GetPacketKey(rsp);
  695. //2.notify
  696. UpdateDeviceResource(cmd, *rsp);
  697. }
  698. else
  699. {
  700. ////mLog::FWARN("{$} Got unexpected.. Packet {$}", m_strClientName, rsp->encode());
  701. }
  702. //m_pPacketNotifyQue->Lock();
  703. m_pPacketNotifyQue->InQueue(*rsp);
  704. //SetEvent(m_NotifyEvent);//notify to user
  705. //m_pPacketNotifyQue->UnLock();
  706. m_NotifyEvent->SetEvent();
  707. }
  708. else
  709. PacketArrived(rsp);
  710. }
  711. }
  712. }
  713. //UnLock();
  714. });
  715. }
  716. if (m_pMqttConn == nullptr) {
  717. ////mLog::FERROR("LogicClient {$} Connect Mqtt client failed with open device : {$}", m_strClientName, realPath);
  718. std::cout << "XXXXXX XXXXXX [" << m_strClientName << "] LogicClient Connect Mqtt client failed with open device [" << realPath << "]" << endl;
  719. //UnLock();
  720. if (m_openCallbackFunc != nullptr)
  721. m_openCallbackFunc(this,realPath, 0);
  722. return false;
  723. }
  724. //ret = RET_SUCCEED;
  725. //SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str());
  726. //PacketAnalizer::UpdateOpenRequest(req, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)m_pMqttConn);
  727. //PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str());
  728. //PacketAnalizer::UpdatePacketTopic(&req, realPath, m_strClientName.c_str());
  729. //PublishAction(&req, realPath, m_pMqttConn);
  730. ////UnLock();
  731. //DWORD dwret = WaitForSingleObject(m_hOpenOK, 4000);
  732. //if (dwret == WAIT_OBJECT_0)
  733. //{
  734. // if (m_openCallbackFunc != nullptr)
  735. // m_openCallbackFunc(this, realPath, 1);
  736. // return true;
  737. //}
  738. if (OpenDevice() == RET_SUCCEED)
  739. return true;
  740. ////mLog::FINFO("LogicClient {$} Open Device {$} failed ", m_strClientName, realPath);
  741. std::cout << "XXXXXX XXXXXX [" << m_strClientName << "]LogicClient Open Device [" << realPath << "] failed " << endl;
  742. if (m_openCallbackFunc != nullptr)
  743. m_openCallbackFunc(this, realPath, 0);
  744. CloseConnection(m_pMqttConn);
  745. m_pMqttConn = nullptr;
  746. return false;
  747. }
  748. bool LogicClient::OnEndThread()
  749. {
  750. std::cout << "Thread End..." << endl;
  751. ////mLog::FINFO("End Client Thread");
  752. //CloseDevice();
  753. //ResDataObject req;
  754. ////make close req
  755. //PacketAnalizer::MakeCloseRequest(req, (*m_pFileHandle));
  756. ////send it
  757. ////ret = pLogicDeivce->CmdFromLogicDev(&req);
  758. //PublishAction(&req, m_pFilePath->c_str(), m_pMqttConn);
  759. ////UnLock();
  760. //DWORD dwret = WaitForSingleObject(m_hCloseOK, 1000);
  761. //if (dwret != WAIT_OBJECT_0)
  762. //{
  763. // //printf("Thread:%d,Send Close OK\n", GetCurrentThreadId());
  764. //
  765. //}
  766. //else
  767. //{
  768. // printf("Thread:%d,Send Close Failed\n", GetCurrentThreadId());
  769. //}
  770. //std::cout << CurrentDateTime() << "[" << m_strClientName << "] try Close Mqtt " << endl;
  771. //CloseConnection(m_pMqttConn);
  772. m_pMqttConn = nullptr;
  773. return true;
  774. }
  775. bool LogicClient::Exec(void)
  776. {
  777. //ResDataObject req;
  778. //if (ReadCmd(req))
  779. //{
  780. // return true;
  781. //}
  782. std::vector<std::shared_ptr<LinuxEvent>> hWait = {
  783. m_ExitFlag,
  784. m_hReOpen,
  785. };
  786. DWORD dwRet = LinuxEvent::WaitForMultipleEvents(hWait, 10);//WaitForSingleObject(m_ExitFlag, 100);
  787. if (dwRet == WAIT_OBJECT_0)
  788. {
  789. std::cout << m_strClientName << " MQTT Prepare Exit " << endl;
  790. return false;
  791. }
  792. else if (dwRet == WAIT_OBJECT_0 + 1)
  793. {
  794. ////mLog::FDEBUG(" {$} try Real ReOpen", m_strClientName);
  795. //CloseDevice();
  796. m_hDeviceOpenOK->ResetEvent();
  797. OpenDevice(false);
  798. ////mLog::FDEBUG(" {$} try ReOpen over", m_strClientName);
  799. }
  800. usleep(1000);
  801. return true;
  802. }
  803. int LogicClient::ReOpenDevice()
  804. {
  805. ////mLog::FINFO(" {$} set to ReOpen",m_strClientName );
  806. m_hDeviceOpenOK->ResetEvent();
  807. m_hReOpen->SetEvent();
  808. return RET_SUCCEED;
  809. }
  810. int LogicClient::Open(const char *pPath, int flags, const char* pszWS, DWORD timeout, logic_client_open_callback calback)
  811. {
  812. std::cout << CurrentDateTime() << "[" << m_strClientName << "] try Open " << pPath << endl;
  813. if (Lock(timeout) == false)
  814. {
  815. std::cout << "FERROR::OpenDev Lock Timeout for Dev[flag: " << flags << " ]: " << pPath << endl;
  816. ////mLog::FERROR("OpenDev Lock Timeout for Dev[flag: {$} ]: {$} ", flags, pPath);
  817. //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath);
  818. return RET_TIMEOUT;
  819. }
  820. m_strWS = pszWS;
  821. if (InitClient(pPath, flags) == false)
  822. {
  823. UnLock();
  824. std::cout << "FERROR::InitClient Failed for Dev [flag: " << flags << " ]: " << pPath << endl;
  825. ////mLog::FERROR("InitClient Failed for Dev [flag: {$} ]: {$} ", flags, pPath);
  826. //GPRINTA_ERROR("InitClient Failed for Dev[flag:%d]:%s", flags, pPath);
  827. return RET_FAILED;
  828. }
  829. std::cout <<"IsClosed" << endl;
  830. if (IsClosed() == false)
  831. {
  832. std::cout << "LogicClient::Open " << m_strClientName << " try is Already Opend: " << m_strDevicePath << endl;
  833. ////mLog::FINFO("LogicClient::Open {$} try is Already Opend: ", m_strClientName, m_strDevicePath);
  834. std::cout << CurrentDateTime() << " LogicClient::Open [" << m_strClientName << "] try is Already Opend: " << m_strDevicePath << endl;
  835. if (Close() == RET_PENDING)
  836. {
  837. UnLock();
  838. std::cout << "OpenDev Close Method is onGoing.. for Dev [flag: " << flags << " ]: " << pPath << endl;
  839. ////mLog::FERROR("OpenDev Close Method is onGoing.. for Dev [flag: {$} ]: {$} ", flags, pPath);
  840. //GPRINTA_ERROR("OpenDev Close Method is onGoing.. for Dev[flag:%d]:%s", flags, pPath);
  841. return RET_PENDING;
  842. }
  843. }
  844. m_openCallbackFunc = calback;
  845. std::cout << "StartThread" << endl;
  846. StartThread(false, true);
  847. std::cout << "UnLock" << endl;
  848. UnLock();
  849. if (calback == nullptr)
  850. {
  851. if (WaitTheThreadOnTheRun(timeout))
  852. return RET_SUCCEED;
  853. std::cout << "WaitTheThreadOnTheRun Timeout " << timeout << " for Open " << pPath << endl;
  854. ////mLog::FERROR("WaitTheThreadOnTheRun Timeout {$} for Open {$}", timeout, pPath);
  855. //GPRINTA_ERROR("WaitTheThreadOnTheRun Timeout %i for Open %s", timeout, pPath);
  856. return RET_FAILED;
  857. }
  858. return RET_SUCCEED;
  859. }
  860. int LogicClient::Close()
  861. {
  862. RET_STATUS ret = RET_SUCCEED;
  863. ////mLog::FINFO(" LogicClient::Close {$} try Close path:",m_strClientName , m_strDevicePath);
  864. std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] try Close path: " << m_strDevicePath << endl;
  865. if (Lock(1000) == false)
  866. {
  867. ////mLog::FINFO("close wait timeout");
  868. std::cout << "close wait timeout " << endl;
  869. //return RET_PENDING;
  870. }
  871. if (IsClosed() == false)
  872. {
  873. CloseConnection(m_pMqttConn);
  874. }
  875. StopThread();
  876. //printf("Thread:%d,Close Entry\n", GetCurrentThreadId());
  877. UnLock();
  878. ////mLog::FINFO(" LogicClient::Close {$} CloseConnection : {$}", m_strClientName, m_strDevicePath);
  879. std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] CloseConnection : " << m_strDevicePath << endl;
  880. //Disconnect
  881. //DisconnectCDI();
  882. //底层MQTT连接并不关闭,除非显式关闭,或者对象被释放
  883. m_hDeviceOpenOK->ResetEvent();
  884. m_pMqttConn = nullptr;
  885. m_ClosedFlag = true;
  886. ////mLog::FINFO(" LogicClient::Close {$} Close Over.:", m_strClientName, m_strDevicePath);
  887. std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] Close Over.: " << m_strDevicePath << endl;
  888. return ret;
  889. }
  890. const char *LogicClient::GetLastErrorInfo()
  891. {
  892. return m_pLastMsgInfo->c_str();
  893. }
  894. bool LogicClient::IsClosed()
  895. {
  896. return m_ClosedFlag;
  897. }
  898. //bool LogicClient::IsOpenOK()
  899. //{
  900. // if (m_ClosedFlag)
  901. // return false;
  902. // DWORD dwRet = WaitForSingleObject(m_hDeviceOpenOK, 0);
  903. // if (dwRet == WAIT_OBJECT_0)
  904. // {
  905. // return true;
  906. // }
  907. // else
  908. // {
  909. // return false;
  910. // }
  911. //}
  912. /*
  913. bool LogicClient::IsAllReqFinished()
  914. {
  915. ClientSysIF *p = (ClientSysIF*)m_pSysIF;
  916. bool ret = false;
  917. ret = p->IsAllRequestFinished();
  918. return (ret);
  919. }
  920. bool LogicClient::SetRequestSyncMode(ACTION_SYNC_MODE Sync)
  921. {
  922. if (IsAllReqFinished())
  923. {
  924. m_SyncMode = Sync;
  925. return true;
  926. }
  927. return false;
  928. }
  929. ACTION_SYNC_MODE LogicClient::GetRequestSyncMode()
  930. {
  931. return m_SyncMode;
  932. }*/
  933. RET_STATUS LogicClient::UpdateDeviceResource(DWORD timeout)
  934. {
  935. RET_STATUS ret = RET_FAILED;
  936. Lock();
  937. ResDataObject req, res;
  938. try {
  939. ret = (RET_STATUS)Action("UpdateDeviceResource", req, res, timeout);
  940. if (ret >= RET_SUCCEED)
  941. {
  942. ////mLog::FINFO("m_DeviceResource.clear() [{$}]", m_strDevicePath);
  943. //update
  944. m_DeviceResource.clear();
  945. m_DeviceResource = res;
  946. ////mLog::FINFO("m_DeviceResource clear [{$}] .set [{$}]", m_strDevicePath, res.encode());
  947. }
  948. }
  949. catch (...)
  950. {
  951. ret = RET_FAILED;
  952. }
  953. UnLock();
  954. return ret;
  955. }
  956. bool LogicClient::HasAction(const char* pszAction)
  957. {
  958. if (string(pszAction) == "UpdateDeviceResource")
  959. return true;
  960. int actid = m_DeviceResource.GetFirstOf("Action");
  961. if (actid < 0)
  962. {
  963. ////mLog::FINFO("Device [{$}] has no Action Items for Action [{$}] in [{$}]", m_strDevicePath, pszAction, m_DeviceResource.encode());
  964. return false;
  965. }
  966. ResDataObject actions = m_DeviceResource[actid];
  967. if (actions.GetFirstOf(pszAction) >= 0)
  968. return true;
  969. ////mLog::FINFO("Device [{$}] Action no Items for Action [{$}] in [{$}]", m_strDevicePath, pszAction, actions.encode());
  970. return false;
  971. }
  972. bool LogicClient::HasProperty(const char* pszProperty)
  973. {
  974. int atrid = m_DeviceResource.GetFirstOf("Attribute");
  975. if (atrid < 0)
  976. return false;
  977. ResDataObject actions = m_DeviceResource[atrid];
  978. if (actions.GetFirstOf(pszProperty) >= 0)
  979. return true;
  980. return false;
  981. }
  982. bool LogicClient::GetProperty(const char* pszProperty, ResDataObject& resRet)
  983. {
  984. int atrid = m_DeviceResource.GetFirstOf("Attribute");
  985. if (atrid < 0)
  986. return false;
  987. ResDataObject propertys = m_DeviceResource[atrid];
  988. if (propertys.GetFirstOf(pszProperty) >= 0)
  989. {
  990. resRet = propertys[pszProperty];
  991. return true;
  992. }
  993. return false;
  994. }
  995. RET_STATUS LogicClient::GetDeviceType(GUID &DevType)
  996. {
  997. RET_STATUS ret = RET_FAILED;
  998. Lock();
  999. try {
  1000. if (IsClosed() == false)
  1001. {
  1002. if (string_2_guid((const char *)m_DeviceResource["DeviceType"], DevType))
  1003. {
  1004. ret = RET_SUCCEED;
  1005. }
  1006. }
  1007. }
  1008. catch (...)
  1009. {
  1010. ret = RET_FAILED;
  1011. }
  1012. UnLock();
  1013. return ret;
  1014. }
  1015. RET_STATUS LogicClient::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource)
  1016. {
  1017. RET_STATUS ret = RET_FAILED;
  1018. bool blocked = Lock();
  1019. try {
  1020. if (IsClosed() == false)
  1021. {
  1022. (*pDeviceResource) = m_DeviceResource;
  1023. string res = m_DeviceResource.encode();
  1024. ////mLog::FINFO("GetDeviceResource ok. {$}", res.substr(0, 16384));
  1025. ret = RET_SUCCEED;
  1026. UnLock();
  1027. return ret;
  1028. }
  1029. }
  1030. catch (...)
  1031. {
  1032. ret = RET_FAILED;
  1033. }
  1034. UnLock();
  1035. ////mLog::FINFO("{$} GetDeviceResource failed with closedflag [{$}]",m_strClientName, m_ClosedFlag?1:0);
  1036. return ret;
  1037. }
  1038. void LogicClient::PacketArrived(ResDataObject *pResponse)
  1039. {
  1040. //two kind packet.
  1041. //1.response
  1042. //1.1 open response
  1043. //1.2 close response
  1044. //no Locks!!!!!!
  1045. //assert(0);
  1046. //check priority one => CLOSE
  1047. //if (PacketAnalizer::GetPacketCmd(pResponse) == PACKET_CMD_CLOSE)
  1048. //{
  1049. // //we have disconnect here
  1050. // printf("Thread:%d,Got Close Notify.Passive Close\n", GetCurrentThreadId());
  1051. // ProcessClose(*pResponse);
  1052. // SetEvent(m_NotifyEvent);//notify to user
  1053. // SetEvent(m_ResponseEvent);//notify to user
  1054. // return;
  1055. //}
  1056. //1.3 update response
  1057. //1.4 action response
  1058. //1.5 data ??
  1059. //2.notify
  1060. //2.1 data notify
  1061. //m_pPacketReceivedQue->Lock();
  1062. m_pPacketReceivedQue->InQueue(*pResponse);
  1063. //for test
  1064. //GPRINTA_DEBUG("Client:%d,Thread %d:Got Packet--JG:%s",this, GetCurrentThreadId(), pResponse->encode());
  1065. if (m_ReqIdx != 0)
  1066. {
  1067. if (m_ReqIdx == PacketAnalizer::GetPacketIdx(pResponse))
  1068. {
  1069. if (PacketAnalizer::GetPacketType(pResponse) == PACKET_TYPE_RES)
  1070. {
  1071. //check cmd
  1072. if ((PacketAnalizer::GetPacketCmd(pResponse) == PACKET_CMD_EXE) &&
  1073. (PacketAnalizer::GetPacketKey(pResponse) == (*m_pActionName)))
  1074. {
  1075. //got match response
  1076. //for test
  1077. //GPRINTA_DEBUG("Client:%d,Got Response--JG",this);
  1078. m_ResponseEvent->SetEvent();
  1079. //m_pPacketReceivedQue->UnLock();
  1080. return;
  1081. }
  1082. }
  1083. ////mLog::FERROR(" {$} Action Resp Error {$}", m_strClientName, pResponse->encode());
  1084. std::cout << "[" << m_strClientName << "] Action Resp Error " << pResponse->encode() << endl;
  1085. }
  1086. else
  1087. {
  1088. ////mLog::FDEBUG(" {$} When Actioned.. Get Notify {$} ", m_strClientName, pResponse->encode());
  1089. std::cout << "[" << m_strClientName << "] When Actioned.. Get Notify " << pResponse->encode() << endl;
  1090. }
  1091. }
  1092. else {
  1093. ////mLog::FDEBUG(" {$} Get Notify {$}", m_strClientName, pResponse->encode());
  1094. std::cout << "[" << m_strClientName << "] Get Notify " << pResponse->encode() << endl;
  1095. }
  1096. m_ResponseEvent->SetEvent();//notify to user
  1097. //m_pPacketReceivedQue->UnLock();
  1098. }
  1099. int LogicClient::Action_Trans(ResDataObject* pOrgReq, DWORD timeout)
  1100. {
  1101. RET_STATUS ret = RET_FAILED;
  1102. //ResDataObject req;
  1103. if (Lock(timeout) == false)
  1104. {
  1105. return RET_TIMEOUT;
  1106. }
  1107. //printf("Thread:%d,Action Entry\n", GetCurrentThreadId());
  1108. if (m_ClosedFlag)
  1109. {
  1110. UnLock();
  1111. return ret;
  1112. }
  1113. if (IsThreadSafe() == false)
  1114. {
  1115. UnLock();
  1116. return RET_THREAD_INVALID;
  1117. }
  1118. //must be in SYNC mode
  1119. if (m_SyncMode != ACTION_SYNC)
  1120. {
  1121. UnLock();
  1122. return ret;
  1123. }
  1124. //make packet request
  1125. //m_ReqIdx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode);
  1126. //(*m_pActionName) = pActionName;
  1127. ////mLog::FDEBUG(" {$} at {$} Action_Trans Req: {$} ", m_strClientName, m_pFilePath->c_str(), pOrgReq->encode());
  1128. PacketAnalizer::UpdateContextTopic(pOrgReq, m_strDefaultTopic.c_str());
  1129. string actionName = PacketAnalizer::GetPacketKey(pOrgReq);
  1130. PacketAnalizer::UpdatePacketTransaction(*pOrgReq, m_strCurTransaction);
  1131. //send packet
  1132. //ret = pLogicDeivce->CmdFromLogicDev(&req);
  1133. int nret = PublishAction(pOrgReq, PacketAnalizer::GetActionTopic(m_pFilePath->c_str(), actionName.c_str()).c_str(), m_pMqttConn);
  1134. if (nret > 0)
  1135. {
  1136. ret = RET_SUCCEED;
  1137. }
  1138. else
  1139. {
  1140. m_ReqIdx = 0;
  1141. (*m_pActionName) = "";
  1142. }
  1143. UnLock();
  1144. return ret;
  1145. }
  1146. int LogicClient::Action_Req(const char *pActionName, ResDataObject &ReqParams, DWORD timeout, const char* pszDevicePath)
  1147. {
  1148. RET_STATUS ret = RET_FAILED;
  1149. ResDataObject req;
  1150. if (Lock(timeout) == false)
  1151. {
  1152. ////mLog::FERROR("try Lock failed for :Action[{$}]", pActionName);
  1153. return RET_TIMEOUT;
  1154. }
  1155. //printf("Thread:%d,Action Entry\n", GetCurrentThreadId());
  1156. if (m_ClosedFlag)
  1157. {
  1158. ////mLog::FERROR("Clinet {$} aready closed. :Action[{$}]", m_strClientName, pActionName);
  1159. UnLock();
  1160. return ret;
  1161. }
  1162. if (IsThreadSafe() == false)
  1163. {
  1164. ////mLog::FERROR("IsThreadSafe() == false ThreadName {$}. :Action[{$}]", m_strName, pActionName);
  1165. UnLock();
  1166. return RET_THREAD_INVALID;
  1167. }
  1168. if (pszDevicePath == nullptr && !HasAction(pActionName))
  1169. {
  1170. ////mLog::FERROR("pszDevicePath == nullptr && !HasAction(pActionName) ThreadName {$}. :Action[{$}] with devPath[{$}]", m_strName, pActionName, pszDevicePath);
  1171. return RET_SUCCEED;
  1172. }
  1173. //must be in SYNC mode
  1174. if (m_SyncMode != ACTION_SYNC)
  1175. {
  1176. ////mLog::FERROR("m_SyncMode != ACTION_SYNC {$} for :Action[{$}]", m_strClientName, pActionName);
  1177. UnLock();
  1178. return ret;
  1179. }
  1180. if (m_ReqIdx != 0)
  1181. {
  1182. ////mLog::FERROR(" {$} try call more than once Action_Req.Ignore first ActionReq Idx[{$}]:Action[{$}]", m_strClientName, m_ReqIdx, m_pActionName->c_str());
  1183. //GPRINTA_ERROR("try call more than once Action_Req.Ignore first ActionReq Idx[%u]:Action[%s] ",m_ReqIdx,m_pActionName->c_str());
  1184. m_ReqIdx = 0;
  1185. (*m_pActionName) = "";
  1186. }
  1187. ////mLog::FDEBUG(" {$} at {$} Action_Req {$} Req: {$} ", m_strClientName, m_pFilePath->c_str() , pActionName, ReqParams.encode());
  1188. //make packet request
  1189. m_ReqIdx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode);
  1190. (*m_pActionName) = pActionName;
  1191. PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str());
  1192. PacketAnalizer::UpdatePacketTransaction(req, m_strCurTransaction);
  1193. //send packet
  1194. //ret = pLogicDeivce->CmdFromLogicDev(&req);
  1195. int nret = PublishAction(&req, PacketAnalizer::GetActionTopic(pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName).c_str(), m_pMqttConn);
  1196. if (nret > 0)
  1197. {
  1198. ret = RET_SUCCEED;
  1199. }
  1200. else
  1201. {
  1202. m_ReqIdx = 0;
  1203. (*m_pActionName) = "";
  1204. }
  1205. UnLock();
  1206. return ret;
  1207. }
  1208. int LogicClient::Action_Res(const char *pActionName, ResDataObject &ResParams, DWORD timeout)
  1209. {
  1210. RET_STATUS ret = RET_FAILED;
  1211. ResDataObject res;
  1212. if (Lock(timeout) == false)
  1213. {
  1214. return RET_TIMEOUT;
  1215. }
  1216. if (m_ClosedFlag)
  1217. {
  1218. //for test
  1219. //GPRINTA_DEBUG("Client:%d,Action_Res Closed ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1220. m_ReqIdx = 0;
  1221. (*m_pActionName) = "";
  1222. UnLock();
  1223. return RET_INVALID;
  1224. }
  1225. if (IsThreadSafe() == false)
  1226. {
  1227. //for test
  1228. //GPRINTA_DEBUG("Client:%d,Action_Res ThreadUnSafe ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1229. m_ReqIdx = 0;
  1230. (*m_pActionName) = "";
  1231. UnLock();
  1232. return RET_THREAD_INVALID;
  1233. }
  1234. //must be in SYNC mode
  1235. if (m_SyncMode != ACTION_SYNC)
  1236. {
  1237. //for test
  1238. //GPRINTA_DEBUG("Client:%d,Action_Res No Sync ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1239. m_ReqIdx = 0;
  1240. (*m_pActionName) = "";
  1241. UnLock();
  1242. return ret;
  1243. }
  1244. if (m_ReqIdx == 0)
  1245. {
  1246. //for test
  1247. //GPRINTA_DEBUG("Client:%d,Action_Res ReqIdx == 0 ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1248. (*m_pActionName) = "";
  1249. UnLock();
  1250. return ret;
  1251. }
  1252. //ReqIdx must not zero
  1253. while (m_pPacketReceivedQue->WaitForInQue(timeout) == WAIT_OBJECT_0)
  1254. {
  1255. //got response
  1256. if (m_pPacketReceivedQue->DeQueue(res))
  1257. {
  1258. //update notify evt
  1259. IsDataArrived();
  1260. if ((m_ReqIdx == PacketAnalizer::GetPacketIdx(&res)) &&
  1261. (PacketAnalizer::GetPacketType(&res) == PACKET_TYPE_RES))
  1262. {
  1263. //check cmd
  1264. if ((PacketAnalizer::GetPacketCmd(&res) != PACKET_CMD_EXE) ||
  1265. (PacketAnalizer::GetPacketKey(&res) != pActionName))
  1266. {
  1267. //wrong packet
  1268. continue;
  1269. }
  1270. //got match response
  1271. ret = ProcessActionResponse(res, ResParams);
  1272. //for test
  1273. //GPRINTA_DEBUG("Client:%d,Action_Res Finished ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1274. m_ReqIdx = 0;
  1275. (*m_pActionName) = "";
  1276. UnLock();
  1277. return ret;
  1278. }
  1279. else
  1280. {
  1281. //got other one
  1282. //check priority one => CLOSE
  1283. if (PacketAnalizer::GetPacketCmd(&res) == PACKET_CMD_CLOSE)
  1284. {
  1285. //we have disconnect here
  1286. ret = ProcessClose(res);
  1287. //for test
  1288. //GPRINTA_DEBUG("Client:%d,Action_Res Got Closed ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1289. m_ReqIdx = 0;
  1290. (*m_pActionName) = "";
  1291. UnLock();
  1292. return ret;
  1293. }
  1294. ////mLog::FERROR("{$} got device[{$}] what packet {$}",m_strClientName, m_strDevicePath, res.encode());
  1295. m_pPacketWaitQue->InQueue(res);
  1296. IsDataArrived();
  1297. }
  1298. }
  1299. }
  1300. ret = RET_TIMEOUT;
  1301. //for test
  1302. //GPRINTA_DEBUG("Client:%d,Action_Res Got Timeout ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str());
  1303. //m_ReqIdx = 0;
  1304. //(*m_pActionName) = "";
  1305. UnLock();
  1306. return ret;
  1307. }
  1308. std::shared_ptr<LinuxEvent> LogicClient::GetResponseHandle()
  1309. {
  1310. return m_ResponseEvent;
  1311. }
  1312. void LogicClient::SubScribeTopic(const char* pszTopic, bool bSubscribe)
  1313. {
  1314. if (bSubscribe)
  1315. SubscribeTopic(m_pMqttConn, pszTopic);
  1316. else
  1317. UnSubscribe(m_pMqttConn, pszTopic);
  1318. }
  1319. int LogicClient::Action(const char *pActionName, ResDataObject &ReqParams, ResDataObject &ResParams, DWORD timeout, const char* pszDevicePath)
  1320. {
  1321. RET_STATUS ret = RET_FAILED;
  1322. ResDataObject req, res;
  1323. if (Lock(timeout) == false)
  1324. {
  1325. ////mLog::FERROR("{$} try Action {$} LockTimeout", m_strClientName, pActionName);
  1326. return RET_TIMEOUT;
  1327. }
  1328. //printf("Thread:%d,Action Entry\n", GetCurrentThreadId());
  1329. if (m_ClosedFlag)
  1330. {
  1331. ////mLog::FERROR("{$} try Action {$} Aready Closed", m_strClientName, pActionName);
  1332. UnLock();
  1333. return ret;
  1334. }
  1335. if (IsThreadSafe() == false)
  1336. {
  1337. ////mLog::FERROR("{$} try Action {$} IsThreadSafe false", m_strClientName, pActionName);
  1338. UnLock();
  1339. return RET_THREAD_INVALID;
  1340. }
  1341. if (pszDevicePath == nullptr && !HasAction(pActionName))
  1342. {
  1343. ////mLog::FERROR("{$} try Action {$} with pszDevicePath == nullptr && !HasAction(pActionName)", m_strClientName, pActionName);
  1344. UnLock();
  1345. return RET_SUCCEED;
  1346. }
  1347. ////mLog::FDEBUG(" {$} at {$} Action {$} Req: {$} ", m_strClientName, pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName, ReqParams.encode());
  1348. //make packet request
  1349. DWORD reqidx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode);
  1350. PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str());
  1351. PacketAnalizer::UpdatePacketTransaction(req, m_strCurTransaction);
  1352. UnLock();
  1353. //send packet int ActionAndResp(ccos_mqtt_connection* hConnection,const char* pAction, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime )
  1354. //ret = pLogicDeivce->CmdFromLog icDev(&req);
  1355. int nret = ActionAndRespWithConnDefalt(m_pMqttConn, pActionName,req, &ReqParams,
  1356. PacketAnalizer::GetActionTopic(pszDevicePath==nullptr?m_pFilePath->c_str():pszDevicePath,pActionName).c_str(),
  1357. res, timeout);
  1358. if (nret > 0)
  1359. {
  1360. ret = RET_SUCCEED;
  1361. DWORD retId = PacketAnalizer::GetPacketIdx(&res);
  1362. PACKET_TYPE tpRes = PacketAnalizer::GetPacketType(&res);
  1363. if ((reqidx == retId) &&
  1364. (tpRes == PACKET_TYPE_RES))
  1365. {
  1366. //check cmd
  1367. if ((PacketAnalizer::GetPacketCmd(&res) == PACKET_CMD_EXE))
  1368. {
  1369. ret = ProcessActionResponse(res, ResParams);
  1370. //UnLock();
  1371. return ret;
  1372. }
  1373. }
  1374. else
  1375. {
  1376. ////mLog::FINFO("{$} What Resp ? [{$}]", m_strClientName, PacketAnalizer::GetPacketKey(&res));
  1377. std::cout << "What Resp ?" << PacketAnalizer::GetPacketKey(&res) << endl;
  1378. }
  1379. }
  1380. ////mLog::FERROR("Failed={$} : {$} at {$} Action {$} Req: {$} ", (int)ret, m_strClientName, pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName, ReqParams.encode());
  1381. //else
  1382. //{
  1383. // ReOpenDevice();
  1384. //}
  1385. return ret;
  1386. }
  1387. bool LogicClient::IsThreadSafe()
  1388. {
  1389. return true;
  1390. //return (m_OwnerThreadId == GetCurrentThreadId());
  1391. }
  1392. std::shared_ptr<LinuxEvent> LogicClient::GetNotifyHandle()
  1393. {
  1394. return m_NotifyEvent;
  1395. }
  1396. void LogicClient::ClearQueue_Internal()
  1397. {
  1398. //m_pPacketReceivedQue->Lock();
  1399. m_pPacketReceivedQue->Clear();
  1400. m_pPacketWaitQue->Clear();
  1401. m_NotifyEvent->ResetEvent();
  1402. //m_pPacketReceivedQue->UnLock();
  1403. }
  1404. bool LogicClient::DeQueuePacket_Internal(ResDataObject &Packet)
  1405. {
  1406. bool ret = false;
  1407. //m_pPacketReceivedQue->Lock();
  1408. //wait que first
  1409. if (m_pPacketWaitQue->DeQueue(Packet))
  1410. {
  1411. ret = true;
  1412. }
  1413. else if (m_pPacketReceivedQue->DeQueue(Packet))
  1414. {
  1415. ret = true;
  1416. }
  1417. else
  1418. {
  1419. ret = false;
  1420. }
  1421. //set event
  1422. if (m_pPacketWaitQue->size() > 0 || m_pPacketReceivedQue->size() > 0)
  1423. {
  1424. m_NotifyEvent->SetEvent();
  1425. }
  1426. else
  1427. {
  1428. m_NotifyEvent->ResetEvent();
  1429. }
  1430. //m_pPacketReceivedQue->UnLock();
  1431. return ret;
  1432. }
  1433. void LogicClient::CleanupQueue()
  1434. {
  1435. if (Lock(1000) == false)
  1436. {
  1437. //printf("Can't Lock Que for cleanup\n");
  1438. return;
  1439. }
  1440. ClearQueue_Internal();
  1441. UnLock();
  1442. }
  1443. bool LogicClient::IsDataArrived()
  1444. {
  1445. bool ret = false;
  1446. //the packet queue must be encapsulated
  1447. //m_pPacketReceivedQue->Lock();
  1448. if (m_pPacketWaitQue->size() > 0 || m_pPacketReceivedQue->size() > 0)
  1449. {
  1450. ////mLog::FDEBUG("{$} have packet to process ..m_pPacketWaitQue {$} .m_pPacketReceivedQue {$}.", m_strClientName, m_pPacketWaitQue->size(), m_pPacketReceivedQue->size());
  1451. ret = true;
  1452. m_ResponseEvent->SetEvent();
  1453. }
  1454. else
  1455. {
  1456. m_ResponseEvent->ResetEvent();
  1457. }
  1458. //m_pPacketReceivedQue->UnLock();
  1459. //m_pPacketNotifyQue->Lock();
  1460. if (m_pPacketNotifyQue->size() > 0 )
  1461. {
  1462. ////mLog::FDEBUG("{$} have notify to process ..m_pPacketNotifyQue {$} .", m_strClientName, m_pPacketNotifyQue->size());
  1463. ret = true;
  1464. m_NotifyEvent->SetEvent();
  1465. }
  1466. else
  1467. {
  1468. m_NotifyEvent->ResetEvent();
  1469. }
  1470. //m_pPacketNotifyQue->UnLock();
  1471. return ret;
  1472. }
  1473. /*
  1474. * PACKET_CMD CommonLogicClient::ReadCmd(ResDataObject &CmdObject)
  1475. {
  1476. PACKET_CMD ret = PACKET_CMD_NONE;
  1477. while (IsDataArrived())
  1478. {
  1479. ret = LogicClient::ReadCmd(CmdObject);
  1480. if (ret != PACKET_CMD_NONE)
  1481. {
  1482. return ret;
  1483. }
  1484. }
  1485. return ret;
  1486. }
  1487. */
  1488. /// <summary>
  1489. /// 子类CommonLogicClient ReadCMD判断 需要检查逻辑,进行兼容性修改
  1490. /// </summary>
  1491. /// <param name="CmdObject"></param>
  1492. /// <returns></returns>
  1493. PACKET_CMD LogicClient::ReadCmd(ResDataObject &CmdObject)
  1494. {
  1495. PACKET_CMD cmd = PACKET_CMD_NONE;
  1496. //check connection
  1497. if (IsClosed() == true)
  1498. {
  1499. cmd = PACKET_CMD_CLOSE;
  1500. ClearQueue_Internal();
  1501. return cmd;
  1502. }
  1503. if(IsThreadSafe() == false)
  1504. {
  1505. ClearQueue_Internal();
  1506. return cmd;
  1507. }
  1508. if (Lock(100) == false)
  1509. {
  1510. std::cout << "LogicClient::ReadCmd " << m_strClientName << "] Lock timeout Receved Que has [" << m_pPacketReceivedQue->size() << "] Object" << endl;
  1511. return cmd;
  1512. }
  1513. //read receive que second
  1514. RET_STATUS ret = RET_NOSUPPORT;
  1515. PACKET_TYPE rType;
  1516. ResDataObject rMsg;
  1517. string key;
  1518. //read wait que first
  1519. while (DeQueuePacket_Internal(rMsg))
  1520. {
  1521. rType = PacketAnalizer::GetPacketType(&rMsg);
  1522. cmd = PacketAnalizer::GetPacketCmd(&rMsg);
  1523. key = PacketAnalizer::GetPacketKey(&rMsg);
  1524. std::cout << "ReadCmd [" << m_strClientName << "] Type: " << rType << "cmd: " << cmd << " Key: " << key << endl;
  1525. ////mLog::FINFO("{$} ReadCmd Type: {$} cmd: {$} Key: {$} ", m_strClientName, (int)rType, (int)cmd, key);
  1526. if (cmd > PACKET_CMD_NONE && cmd < PACKET_CMD_MAX)
  1527. {
  1528. if (rType == PACKET_TYPE_RES)
  1529. {
  1530. //1.response
  1531. CmdObject = rMsg;
  1532. //[open] need care
  1533. if (cmd == PACKET_CMD_OPEN)
  1534. {
  1535. ret = ProcessOpenResponse(rMsg);
  1536. if (ret >= RET_SUCCEED)
  1537. {
  1538. UnLock();
  1539. return cmd;
  1540. }
  1541. }
  1542. }
  1543. else if (rType == PACKET_TYPE_NOTIFY)
  1544. {
  1545. //2.notify
  1546. UpdateDeviceResource(cmd, rMsg);
  1547. CmdObject = rMsg;
  1548. UnLock();
  1549. return cmd;
  1550. }
  1551. else
  1552. {
  1553. //ignore it
  1554. cmd = PACKET_CMD_NONE;
  1555. }
  1556. }
  1557. rMsg.clear();
  1558. }
  1559. UnLock();
  1560. return cmd;
  1561. }
  1562. PACKET_CMD LogicClient::ReadNotify(ResDataObject& CmdObject)
  1563. {
  1564. CmdObject.clear();
  1565. //m_pPacketNotifyQue->Lock();
  1566. if (m_pPacketNotifyQue->DeQueue(CmdObject))
  1567. {
  1568. //m_pPacketNotifyQue->UnLock();
  1569. return PacketAnalizer::GetPacketCmd(&CmdObject);
  1570. }
  1571. m_NotifyEvent->ResetEvent();
  1572. if(m_pPacketReceivedQue->DeQueue(CmdObject))
  1573. {
  1574. return PacketAnalizer::GetPacketCmd(&CmdObject);
  1575. }
  1576. m_ResponseEvent->ResetEvent();
  1577. //m_pPacketNotifyQue->UnLock();
  1578. return PACKET_CMD_NONE;
  1579. }
  1580. RET_STATUS SYSTEM_CALL LogicClient::WaitForState(const char* state, DWORD timeout)
  1581. {
  1582. PACKET_CMD cmd;
  1583. ResDataObject data;
  1584. ResDataObject context;
  1585. string strKey, value;
  1586. DWORD firstTick = GetTickCount();
  1587. std::shared_ptr<LinuxEvent> notifyhandle = GetNotifyHandle();
  1588. while (true)
  1589. {
  1590. while (IsDataArrived())
  1591. {
  1592. data.clear();
  1593. cmd = ReadCmd(data);
  1594. strKey = PacketAnalizer::GetPacketKey(&data);
  1595. PacketAnalizer::GetPacketContext(&data, context);
  1596. value = (const char *)context;
  1597. if (strKey == "CurrentState")
  1598. {
  1599. if (value == string(state))
  1600. return RET_SUCCEED;
  1601. }
  1602. else if (strKey == string(state))
  1603. {
  1604. return RET_SUCCEED;
  1605. }
  1606. }
  1607. if ((GetTickCount() - firstTick) < timeout)
  1608. {
  1609. if (notifyhandle->Wait(timeout))
  1610. {
  1611. continue;
  1612. }
  1613. }
  1614. break;
  1615. }
  1616. return RET_FAILED;
  1617. }
  1618. RET_STATUS LogicClient::UpdateDeviceResource(int cmdType,ResDataObject &packet)
  1619. {
  1620. RET_STATUS ret = RET_SUCCEED;
  1621. Lock();
  1622. try {
  1623. string strKey = PacketAnalizer::GetPacketKey(&packet);
  1624. ResDataObject ResContext;
  1625. PacketAnalizer::GetPacketContext(&packet, ResContext);
  1626. if (cmdType == PACKET_CMD_UPDATE)
  1627. {
  1628. if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0)
  1629. {
  1630. m_DeviceResource[ATTRIBUTE].update(strKey.c_str(), ResContext);
  1631. }
  1632. }
  1633. else if (cmdType == PACKET_CMD_ADD)
  1634. {
  1635. if (strKey=="ErrorList")
  1636. {
  1637. /*pLogicDeivce->*/GetDeviceResource(&m_DeviceResource);
  1638. }
  1639. else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) < 0)
  1640. {
  1641. m_DeviceResource[ATTRIBUTE].add(strKey.c_str(), ResContext);
  1642. }
  1643. else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0 &&
  1644. (string)ResContext != "" &&m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str()) < 0)
  1645. {
  1646. m_DeviceResource[ATTRIBUTE][strKey.c_str()].add(((string)ResContext).c_str(), "");
  1647. }
  1648. }
  1649. else if (cmdType == PACKET_CMD_DEL)
  1650. {
  1651. if (strKey == "ErrorList")
  1652. {
  1653. /*pLogicDeivce->*/GetDeviceResource(&m_DeviceResource);
  1654. }
  1655. else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0 )
  1656. {
  1657. if ((string)ResContext != "" && m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str()) >= 0)
  1658. {
  1659. m_DeviceResource[ATTRIBUTE][strKey.c_str()].eraseOneOf(((string)ResContext).c_str(), m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str()));
  1660. }
  1661. else
  1662. {
  1663. m_DeviceResource[ATTRIBUTE].eraseOneOf(strKey.c_str(), m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()));
  1664. }
  1665. }
  1666. }
  1667. }
  1668. catch (...)
  1669. {
  1670. ret = RET_FAILED;
  1671. }
  1672. UnLock();
  1673. return ret;
  1674. }