//#include "SysIF.h" #include "LocalConfig.h" #include "LogicClient.h" #include "common_api.h" #include "PacketAnalizer.h" //#include "Logger.h" #define ATTRIBUTE ("Attribute") #include #include #ifndef _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_WARNINGS #endif //Log4CPP::Logger* ////mLog::gLogger = nullptr; string LogHost = ""; #ifndef GET_CURRENT_THREAD_ID_DEFINED #define GET_CURRENT_THREAD_ID_DEFINED inline DWORD GetCurrentThreadId() { return static_cast(syscall(SYS_gettid)); } #endif // Linux下获取进程ID inline DWORD GetCurrentProcessId() { return getpid(); } inline string CurrentDateTime2() { std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); char buf[100] = { 0 }; std::strftime(buf, sizeof(buf), " %Y-%m-%d %H:%M:%S ", std::localtime(&now)); return buf; } inline std::string CurrentDateTime() { // 获取当前时间点 auto now = std::chrono::system_clock::now(); // 将时间长度转换为微秒数 auto now_us = std::chrono::duration_cast(now.time_since_epoch()); // 再转成tm格式 auto now_time_t = std::chrono::system_clock::to_time_t(now); auto now_tm = std::localtime(&now_time_t); // 可以直接输出到标准输出 // std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl; // 格式化字符,年月日时分秒 std::string now_time_str; char buf[64]; std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm); now_time_str += buf; // 格式化微秒 //snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000); snprintf(buf, sizeof(buf), ".%06ld ", now_us.count() % 1000000); now_time_str += buf; //printf("%s\n", now_time_str.c_str()); return now_time_str; } LogicClient::LogicClient(string szClientName, string szTransaction, string szType, bool bNeedNotify) { m_ReqIdx = 0; pLogicDeivce = NULL; m_SyncMode = ACTION_SYNC; m_FileFlags = 0; m_ClosedFlag = true; m_OwnerThreadId = 0; m_pActionName = new string(); m_pFilePath = new string(); m_pLastMsgInfo = new string(); m_pPacketWaitQue = new MsgQueue(); m_pPacketReceivedQue = new MsgQueue(); m_pPacketNotifyQue = new MsgQueue(); m_NotifyEvent = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false); m_ResponseEvent = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); m_bNeedNotify = bNeedNotify; //m_pSysIF = new ClientSysIF(); m_pMqttConn = nullptr; m_strClientName = szClientName; if (m_strClientName.length() <= 0) { string stream; stream = "LogicClient_" + to_string( GetCurrentProcessId()) + "_" + string( (const char*)getLocalEbusId()) + "_" +to_string( GetCurrentThreadId()); m_strClientName = stream; stream.clear(); string stream2; stream2 = CCOS_CLIENT_ROOT_TOPIC + string( "/Resp_") + to_string( GetCurrentProcessId()) + "_" + string( (const char*)getLocalEbusId()) + "_" + to_string( GetCurrentThreadId()); m_strDefaultTopic = stream2; } else { stringstream stream; stream << "LogicClient_" << szClientName << "_" << (const char*)getLocalEbusId() << "_" << GetCurrentProcessId(); m_strClientName = stream.str(); stream.clear(); stringstream stream2; stream2 << CCOS_CLIENT_ROOT_TOPIC << (szType.length()<=0?"":"/") << (szType.length()<=0?"":szType) << "/" << szClientName << "/Resp_" << (const char*)getLocalEbusId() << "_" << GetCurrentProcessId(); m_strDefaultTopic = stream2.str(); } m_strCurTransaction = szTransaction; m_pFileHandle = new CcosDevFileHandle(); m_ClientLock = (HANDLE)new CcosLock(); m_Router = CCOS_PACKET_ROUTE_ANY; m_hDeviceOpenOK = LinuxEvent::CreateEvent(LinuxEvent::MANUAL_RESET, false);// CreateEvent(NULL, FALSE, FALSE, NULL); m_hDeviceCloseOK = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); m_hReOpen = LinuxEvent::CreateEvent(LinuxEvent::AUTO_RESET, false); m_pPacketArrivedCallbackFunc = nullptr; m_openCallbackFunc = nullptr; SetName(m_strClientName.c_str()); } LogicClient::~LogicClient(void) { ////mLog::FWARN("Free LogicClient Object [{$}]", m_strClientName); cout << "Free LogicClient Object == " << m_strClientName << endl; if (IsClosed() == false) { //make close Close(); } //delete m_pSysIF; CloseConnection(m_pMqttConn); //m_pSysIF = NULL; m_pMqttConn = nullptr; delete m_pActionName; m_pActionName = NULL; delete m_pFilePath; m_pFilePath = NULL; delete m_pLastMsgInfo; m_pLastMsgInfo = NULL; delete m_pPacketWaitQue; m_pPacketWaitQue = NULL; delete m_pPacketReceivedQue; m_pPacketReceivedQue = NULL; delete m_pPacketNotifyQue; m_pPacketNotifyQue = NULL; delete m_pFileHandle; m_pFileHandle = NULL; delete (CcosLock*)m_ClientLock; m_ClientLock = NULL; } bool LogicClient::GetFileHandle(CcosDevFileHandle &Handle) { if (IsClosed() == false) { Lock(); Handle = (*m_pFileHandle); UnLock(); return true; } //make client handle Handle[CCOS_PACK_HANDLE_LANG] = "en-US";//NOT FINISHED YET Handle[CCOS_PACK_HANDLE_ROUTE] = m_Router;//route Handle[CCOS_PACK_HANDLE_FLAGS] = 0;// Handle[CCOS_PACKET_HANDLE_KEY] = 0;// //target Handle.GetTarget(false)[CCOS_PACK_TARGET_ADDR] = (UINT64)0; Handle.GetTarget(false)[CCOS_PACK_TARGET_PROCID] = (UINT64)0; Handle.GetTarget(false)[CCOS_PACK_TARGET_BUSID] = ""; Handle.GetTarget(false)[CCOS_PACK_TARGET_MACHINEID] = ""; //local Handle.GetTarget()[CCOS_PACK_TARGET_ADDR] = (UINT64)(PVOID*)m_pMqttConn; Handle.GetTarget()[CCOS_PACK_TARGET_PROCID] = (UINT64)GetCurrentProcessId(); Handle.GetTarget()[CCOS_PACK_TARGET_BUSID] = (const char*)getLocalEbusId(); Handle.GetTarget()[CCOS_PACK_TARGET_MACHINEID] = (const char*)getLocalMachineId(); return false; } bool LogicClient::SetRouter(CCOS_PACKET_ROUTE Route) { if (IsClosed()) { m_Router = Route; return true; } return false; } bool LogicClient::Lock(DWORD timeout) { if (m_ClientLock == NULL) { ////mLog::FERROR("{$} @ Trans {$} Lock handle is null", m_strClientName, m_strCurTransaction); return false; } bool LockRet = (((CcosLock*)m_ClientLock)->Thread_Lock(timeout) == WAIT_OBJECT_0); std::cout << m_strClientName << " [LLKK] "<< (UINT64)this << " Try Lock Result " << LockRet << " With timeout " << timeout << endl; if (!LockRet) std::cout << m_strClientName << " [LLKK] " << (UINT64)this << " Lock Failed" << endl; return LockRet; } void LogicClient::UnLock() { if (m_ClientLock == NULL) { ////mLog::FERROR("{$} @ Trans {$} UnLock handle is null", m_strClientName, m_strCurTransaction); return ; } std::cout << m_strClientName << " [LLKK] " << (UINT64)this << " Unlock " << endl; ((CcosLock*)m_ClientLock)->Thread_UnLock(); } bool LogicClient::InitClient(const char *pPath, int flags) { std::cout <<"Enter InitClient:pPath:"<< pPath << endl; ResDataObject rMsg; m_strDevicePath = pPath; m_OwnerThreadId = 0; (*m_pLastMsgInfo) = ""; std::cout << "Start ClearQueue_Internal()" << endl; ClearQueue_Internal(); ////mLog::FWARN("m_DeviceResource.clear() [{$}]", m_strDevicePath); m_DeviceResource.clear(); //m_pFileHandle->m_Lang.SetVal("en-US");//NOT FINISHED YET //m_pFileHandle->m_Route = m_Router;//route //m_pFileHandle->m_Flags = flags; //m_pFileHandle->m_HandleId = 0; (*m_pFileHandle)[CCOS_PACK_HANDLE_LANG] = "en-US";//NOT FINISHED YET (*m_pFileHandle)[CCOS_PACK_HANDLE_ROUTE] = m_Router;//route (*m_pFileHandle)[CCOS_PACK_HANDLE_FLAGS] = flags; (*m_pFileHandle)[CCOS_PACKET_HANDLE_KEY] = (UINT64)0;// //target m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_MACHINEID] = ""; m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_BUSID] = ""; m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_PROCID] = (UINT64)0; m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_ADDR] = (UINT64)0; //m_pFileHandle->m_Dev.m_Addr = (UINT64)0; //m_pFileHandle->m_Dev.m_ProcID = (UINT64)0; //m_pFileHandle->m_Dev.m_busID = ""; //m_pFileHandle->m_Dev.m_MachineID = ""; //local m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_MACHINEID] = (const char*)getLocalMachineId(); m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_BUSID] = (const char*)getLocalEbusId(); m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_PROCID] = (UINT64)GetCurrentProcessId(); m_pFileHandle->GetTarget()[CCOS_PACK_TARGET_ADDR] = (UINT64)(PVOID*)m_pMqttConn; //m_pFileHandle->m_Owner.m_Addr = (UINT64)(PVOID*)m_pSysIF; //m_pFileHandle->m_Owner.m_ProcID = (UINT64)GetCurrentProcessId(); //m_pFileHandle->m_Owner.m_busID = (const char*)getLocalEbusId(); //m_pFileHandle->m_Owner.m_MachineID = (const char*)getLocalMachineId(); m_FileFlags = flags; if (m_strDevicePath.length() <= 0) { //允许透传消息 m_SyncMode = ACTION_SYNC; } if (pPath) { string busId; std::cout << "Start getBusIdFromFilePath()" << endl; if (getBusIdFromFilePath(pPath, busId) == false) { (*m_pFilePath) = ""; //return false; } //m_pFileHandle->m_Dev.m_busID = busId; m_pFileHandle->GetTarget(false)[CCOS_PACK_TARGET_BUSID] = busId.c_str(); (*m_pFilePath) = pPath; } //ClientSysIF *p = (ClientSysIF*)m_pSysIF; //p->Clear(); //if (////mLog::gLogger == nullptr) //{ // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; // LogHost = ((string)getLogRootpath()).c_str(); // if (LogHost.length() <= 1) // { // char szName[256]; // sprintf(szName, "/LogicClient_%08d", GetCurrentProcessId()); // LogHost = szName; // } // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicClient"); // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1); // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicClient"); // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__); //} //else //{ // string strRoot = ((string)getLogRootpath()).c_str(); // if (strRoot.length() > 1 && strRoot != LogHost) // { // string strLogPath = GetProcessDirectory() + R"(\Conf\Log4CPP.Config.xml)"; // LogHost = strRoot; // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogFileName"), "LogicClient"); // //Log4CPP::GlobalContext::Map::Set("LogHost", LogHost.c_str()); // Log4CPP::ThreadContext::Map::Set(ECOM::Utility::Hash("LogHost"), LogHost.c_str() + 1); // auto rc = Log4CPP::LogManager::LoadConfigFile(strLogPath.c_str()); // ////mLog::gLogger = Log4CPP::LogManager::GetLogger("LogicClient"); // ////mLog::FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__); // } //} //string version; //if (GetVersion(version, hMyModule)) // ////mLog::FINFO("\n===============log begin : version:{$} ===================\n", version.c_str()); //else ////mLog::FWARN("\n{$}=============== LogicClient log begin : version:1.0.0.0 ===================\n", m_strClientName); return true; } bool LogicClient::GetFilePath(ResDataObject &path) { if (IsClosed() == false) { path = (*m_pFilePath).c_str(); return true; } return false; } RET_STATUS LogicClient::ProcessActionResponse(ResDataObject &packet, ResDataObject &OutputParam) { RET_STATUS ret = RET_FAILED; if (PacketAnalizer::GetPacketRetCode(&packet, ret)) { //get error or warning Msg here string errInfo; PacketAnalizer::GetPacketMsgInfo(&packet, errInfo); if (errInfo.size() > 0) { string actionname = PacketAnalizer::GetPacketKey(&packet); (*m_pLastMsgInfo) = actionname + ":" + errInfo; } if (ret >= RET_SUCCEED) { if (PacketAnalizer::GetPacketContext(&packet, OutputParam)) { //succeed //printf("Thread:%d,Action OK\n", GetCurrentThreadId()); return ret; } } if (ret == RET_INVALID) { //DisconnectCDI(); } } return ret; } RET_STATUS LogicClient::ProcessResource(ResDataObject &packet) { ResDataObject resParam; RET_STATUS ret = RET_FAILED; //got match response if (PacketAnalizer::GetPacketRetCode(&packet, ret)) { //get error or warning Msg here PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo)); if (ret >= RET_SUCCEED) { //whole device resource if (PacketAnalizer::GetPacketContext(&packet, m_DeviceResource)) { ////mLog::FDEBUG("Update Device {$} Resource [{$}]", m_strDevicePath, m_DeviceResource.encode()); return ret; } } } ////mLog::FERROR("Device Open Result Error", packet.encode()); //put some log here ////mLog::FINFO("m_DeviceResource.clear() [{$}] after Device Open Result Error", m_strDevicePath); m_DeviceResource.clear(); return RET_FAILED; } RET_STATUS LogicClient::ProcessClose(ResDataObject &packet) { //we have disconnect here PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo)); //make the object status as CLOSED //DisconnectCDI(); //clear the info //m_pPacketReceivedQue->Lock(); m_pPacketWaitQue->Clear(); m_pPacketReceivedQue->Clear(); //m_DeviceResource.clear(); //m_pPacketReceivedQue->UnLock(); //m_pPacketNotifyQue->Lock(); m_pPacketNotifyQue->Clear(); //m_pPacketNotifyQue->UnLock(); return RET_INVALID; } RET_STATUS LogicClient::ProcessOpenResponse(ResDataObject &packet) { ResDataObject resParam; RET_STATUS ret = RET_FAILED; //got match response if (PacketAnalizer::GetPacketRetCode(&packet, ret)) { //get error or warning Msg here PacketAnalizer::GetPacketMsgInfo(&packet, (*m_pLastMsgInfo)); if (ret >= RET_SUCCEED) { if (PacketAnalizer::GetPacketHandle(&packet, resParam)) { if (m_pFileHandle == NULL) return RET_FAILED; //got file handle if (m_pFileHandle->SetResDataObject(resParam)) { //succeed here //Get Device Resource if (ProcessResource(packet) >= RET_SUCCEED) { string res = packet.encode(); ////mLog::FDEBUG("OpenResponse ok {$}", res.substr(0, 16384)); //////mLog::FDEBUG("OpenResponse ok\n"); if (m_bNeedNotify && m_ClosedFlag) { const char* realPath = NULL; if (m_strDevicePath[0] == '/') realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/" else realPath = m_strDevicePath.c_str(); string notifyTopic = realPath; notifyTopic += "/Notify/#"; SubscribeTopic(m_pMqttConn, notifyTopic.c_str()); } if (m_openCallbackFunc != nullptr) m_openCallbackFunc(this, m_strDevicePath.c_str(), 1); m_OwnerThreadId = (DWORD)GetCurrentThreadId(); std::cout << "TID [" << m_OwnerThreadId << "] LogicClient::ProcessOpenResponse OpenResponse ok." << m_strClientName << endl; ////mLog::FDEBUG("LogicClient::ProcessOpenResponse {$} OpenResponse ok. {$}", m_OwnerThreadId, m_strClientName); m_hDeviceOpenOK->SetEvent(); SetThreadOnTheRun(true); //线程可以正常运转 m_ClosedFlag = false; return ret; } } } } } ////mLog::FERROR("Thread:{$} ,OpenResponse ng ",GetCurrentThreadId()); //printf("Thread:%d,OpenResponse ng\n", GetCurrentThreadId()); //put some log here return RET_FAILED; } RET_STATUS LogicClient::CloseDevice() { RET_STATUS ret = RET_FAILED; const char* realPath = NULL; if (m_strDevicePath[0] == '/') realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/" else realPath = m_strDevicePath.c_str(); ////mLog::FDEBUG("{$} CloseDevice {$} and use default topic {$}", m_strClientName, realPath, m_strDefaultTopic); if (Lock(1000) == false) { ////mLog::FERROR("CloseDev Lock Timeout for Dev: {$} ", m_strClientName); //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath); return RET_TIMEOUT; } if (m_ClosedFlag) { UnSubscribe(m_pMqttConn, m_strDefaultTopic.c_str()); if (m_bNeedNotify) { string notifyTopic = realPath; notifyTopic += "/Notify/#"; UnSubscribe(m_pMqttConn, notifyTopic.c_str()); } } UnLock(); m_hDeviceOpenOK->ResetEvent(); m_hDeviceCloseOK->SetEvent(); //标志 //m_ClosedFlag = true; ////make close req ResDataObject req; PacketAnalizer::MakeCloseRequest(req, (*m_pFileHandle)); if ((m_FileFlags & CCOS_FILE_FLAGS::ABSCRACT_ONLINE) == CCOS_FILE_FLAGS::ABSCRACT_ONLINE) { if (m_strWS.length() > 0) { //需要显式 上设备上线 ResDataObject resContext; resContext.add("Offline", m_strWS.c_str()); PacketAnalizer::UpdatePacketContext(req, resContext); PublishAction(&req, m_pFilePath->c_str(), m_pMqttConn); } } ////send it ////ret = pLogicDeivce->CmdFromLogicDev(&req); ////UnLock(); //DWORD dwret = WaitForSingleObject(m_hCloseOK, 1000); //if (dwret != WAIT_OBJECT_0) //{ // //printf("Thread:%d,Send Close OK\n", GetCurrentThreadId()); //} //else //{ // printf("Thread:%d,Send Close Failed\n", GetCurrentThreadId()); //} return ret; } RET_STATUS LogicClient::OpenDevice(bool bAsync) { RET_STATUS ret = RET_FAILED; ResDataObject req, res, resParam; const char* realPath = NULL; if (m_strDevicePath.length() <= 0) { //支持不打开设备的客户端 m_OwnerThreadId = (DWORD)GetCurrentThreadId(); std::cout << "TID [" << m_OwnerThreadId << "] LogicClient::ProcessOpenResponse OpenResponse ok." << m_strClientName << endl; ////mLog::FINFO("Open null device {$} OpenResponse ok. {$}", m_OwnerThreadId, m_strClientName); m_hDeviceOpenOK->SetEvent(); SetThreadOnTheRun(true); //线程可以正常运转 m_ClosedFlag = false; return RET_SUCCEED; } if (m_strDevicePath[0] == '/') realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/" else realPath = m_strDevicePath.c_str(); if (Lock(1000) == false) { ////mLog::FERROR("OpenDev Lock Timeout for Dev: {$} ", m_strClientName); //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath); return RET_TIMEOUT; } if (m_ClosedFlag) { SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str()); ////mLog::FDEBUG("{$} OpenDevice {$} and use default topic {$} with Async", m_strClientName, realPath, m_strDefaultTopic, bAsync); } else { ////mLog::FDEBUG("{$} OpenDevice {$} and use default topic {$} with Async and No ReSubscribe", m_strClientName, realPath, m_strDefaultTopic, bAsync); } DWORD reqidx = PacketAnalizer::MakeOpenRequest(req, (*m_pFileHandle), realPath); PacketAnalizer::UpdateOpenRequest(req, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)m_pMqttConn); PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str()); PacketAnalizer::UpdatePacketTopic(&req, realPath, m_strClientName.c_str()); if ((m_FileFlags & CCOS_FILE_FLAGS::ABSCRACT_ONLINE) == CCOS_FILE_FLAGS::ABSCRACT_ONLINE) { if (m_strWS.length() > 0) { //需要显式 上设备上线 ResDataObject resContext; PacketAnalizer::GetPacketContext(&req, resContext); resContext.add("Online", m_strWS.c_str()); PacketAnalizer::UpdatePacketContext(req, resContext); } } int result = PublishAction(&req, realPath, m_pMqttConn); UnLock(); if (result < 0) { ////mLog::FERROR("Publish Open Packet Failed . try Reopen. Frist UnSubscribe {$}", m_strDefaultTopic); //UnSubscribe(m_pMqttConn, m_strDefaultTopic.c_str()); //SetEvent(m_hReOpen); return RET_FAILED; } //UnLock(); if (!bAsync) { ////mLog::FINFO("try Wait for Open ok.{$}", m_strClientName); std::cout << "try Wait for Open ok." <Wait(4000)) { std::cout << "Open ok. done" << m_strClientName << endl; ////mLog::FINFO("Open ok. done{$}", m_strClientName); if (m_openCallbackFunc != nullptr) m_openCallbackFunc(this, realPath, 1); return RET_SUCCEED; } } else { ////mLog::FINFO("No Wait for Open ok and try Reopen.{$}", m_strClientName); //SetEvent(m_hReOpen); return RET_FAILED; } return ret; } bool LogicClient::OnStartThread() { RET_STATUS ret = RET_FAILED; const char* realPath = NULL; std::cout << "OnStartThreadL:m_strDevicePath: " << m_strDevicePath << std::endl; if (m_strDevicePath[0] == '/') realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/" else realPath = m_strDevicePath.c_str(); //if (Lock(10000) == false) //{ // //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath); // return false; //} //GPRINTA_DEBUG("Thread:%d,Open Entry", GetCurrentThreadId()); //make connection to CDI //ConnectCDI(); ResDataObject req, res, resParam; //make open req //HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); // 新增调试输出 if (m_pMqttConn) { std::cout << "m_pMqttConn Memory address: " << m_pMqttConn << std::endl; // 解包元组内容并输出 void* mqttClient = std::get<0>(*m_pMqttConn); mqtt_topic_list* topicList = std::get<1>(*m_pMqttConn); void* msgHook = std::get<2>(*m_pMqttConn); const char* clientId = std::get<3>(*m_pMqttConn); ccos_mqtt_callback callback = std::get<4>(*m_pMqttConn); pthread_t threadId = std::get<5>(*m_pMqttConn); CcosLock* connLock = std::get<6>(*m_pMqttConn); mqtt_msg_list* msgList = std::get<7>(*m_pMqttConn); sem_t* semaphore = std::get<8>(*m_pMqttConn); std::cout << "MQTT Client Handle: " << mqttClient << std::endl; std::cout << "MQTT Topic List: " << topicList << std::endl; std::cout << "Message Hook ID: " << msgHook << std::endl; std::cout << "Client ID: " << (clientId ? clientId : "null") << std::endl; std::cout << "Thread ID: " << threadId << std::endl; std::cout << "Connection Lock: " << connLock << std::endl; std::cout << "Message List: " << msgList << std::endl; std::cout << "Semaphore Handle: " << semaphore << std::endl; } else { std::cout << "m_pMqttConn is nullptr" << std::endl; } //HANDLE hOpened = CreateEvent(NULL, FALSE, FALSE, NULL); if (m_pMqttConn == nullptr) { ////mLog::FINFO("LogicClient::Open {$} try open path:[{$}]", m_strClientName, realPath ); std::cout << CurrentDateTime() << " LogicClient::Open [" << m_strClientName << "] try open path: " << realPath << endl << endl; m_pMqttConn = NewConnection(m_strClientName.c_str(), [&ret, realPath, this](ResDataObject* rsp, const char* topic, void* conn_void) { //m_pMqttConn = CreateConnection std::cout << "OnStartThread::NewConnnect callback in "<< std::endl; if (m_pMqttConn == NULL) return; //回调函数不再加锁,要确保每一步都是ok的 /* if (Lock(1000) == false) { ////mLog::FERROR("callback Lock Timeout for Dev: {$} ", m_strClientName); //GPRINTA_ERROR("OpenDev Lock Timeout for Dev:%s", pPath); return ; }*/ ccos_mqtt_connection* conn = (ccos_mqtt_connection*)conn_void; if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_RES) { //check cmd string path = realPath; if ((PacketAnalizer::GetPacketCmd(rsp) == PACKET_CMD_OPEN) && (PacketAnalizer::GetPacketKey(rsp).substr(0, path.length()) == path || PacketAnalizer::GetPacketKey(rsp) == realPath)) { ////mLog::FINFO("{$} LogicClient::Open {$} OK ", m_strClientName, realPath); std::cout << "\n\n-----------" << CurrentDateTime() << "LogicClient::Open OK [" << m_strClientName << "] try open path:" << realPath << endl << endl; if (Lock(2000)) { ret = ProcessOpenResponse(*rsp); UnLock(); } else { ////mLog::FERROR("{$} Process Open Result with Lock Client failed ", m_strClientName); } } else { ////mLog::FINFO("Unkown Packet: cmd {$} packet key: {$}", (int)PacketAnalizer::GetPacketCmd(rsp), PacketAnalizer::GetPacketKey(rsp) ); std::cout << m_strClientName << CurrentDateTime() << "Try Conn callback get unkown packet pointer [" << (UINT64)this << endl; std::cout << "Unkown Packet: cmd [" << PacketAnalizer::GetPacketCmd(rsp) << "] packet key: " << PacketAnalizer::GetPacketKey(rsp) << "" << endl; if (m_pPacketArrivedCallbackFunc != nullptr) { m_pPacketArrivedCallbackFunc(rsp, topic, conn); } else { //m_pPacketReceivedQue->Lock(); m_pPacketReceivedQue->InQueue(*rsp); //SetEvent(m_NotifyEvent);//notify to user m_ResponseEvent->SetEvent(); //m_pPacketReceivedQue->UnLock(); } } } else { //got other one //printf("Thread:%d,in open.got other packet\n", GetCurrentThreadId()); //GPRINTA_DEBUG("in open.got other packet\n\n"); //check priority one => CLOSE if (PacketAnalizer::GetPacketCmd((ResDataObject*)rsp) == PACKET_CMD_CLOSE) { std::cout << CurrentDateTime() << m_strClientName << "Got Close Paket ..." << endl; //we have disconnect here //GPRINTA_DEBUG("in open.got Close packet\n\n"); ret = ProcessClose(*rsp); } else { std::cout << "*****-----***** " << CurrentDateTime() << m_strClientName << " Received msg " << rsp->encode() << endl; if (m_pPacketArrivedCallbackFunc != nullptr) m_pPacketArrivedCallbackFunc(rsp, topic, conn); else { if (PacketAnalizer::GetPacketType(rsp) == PACKET_TYPE_NOTIFY ) { PACKET_CMD cmd = PACKET_CMD_NONE; PACKET_TYPE rType; string key; rType = PacketAnalizer::GetPacketType(rsp); cmd = PacketAnalizer::GetPacketCmd(rsp); if (rType == PACKET_TYPE_NOTIFY /*&& m_pActionName->size() <= 0*/ && cmd == PACKET_CMD_UPDATE) { //只有在非Action期间,更新,如果有遗漏,再说 2024.9.2 cmd = PacketAnalizer::GetPacketCmd(rsp); key = PacketAnalizer::GetPacketKey(rsp); //2.notify UpdateDeviceResource(cmd, *rsp); } else { ////mLog::FWARN("{$} Got unexpected.. Packet {$}", m_strClientName, rsp->encode()); } //m_pPacketNotifyQue->Lock(); m_pPacketNotifyQue->InQueue(*rsp); //SetEvent(m_NotifyEvent);//notify to user //m_pPacketNotifyQue->UnLock(); m_NotifyEvent->SetEvent(); } else PacketArrived(rsp); } } } //UnLock(); }); } if (m_pMqttConn == nullptr) { ////mLog::FERROR("LogicClient {$} Connect Mqtt client failed with open device : {$}", m_strClientName, realPath); std::cout << "XXXXXX XXXXXX [" << m_strClientName << "] LogicClient Connect Mqtt client failed with open device [" << realPath << "]" << endl; //UnLock(); if (m_openCallbackFunc != nullptr) m_openCallbackFunc(this,realPath, 0); return false; } //ret = RET_SUCCEED; //SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str()); //PacketAnalizer::UpdateOpenRequest(req, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)m_pMqttConn); //PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str()); //PacketAnalizer::UpdatePacketTopic(&req, realPath, m_strClientName.c_str()); //PublishAction(&req, realPath, m_pMqttConn); ////UnLock(); //DWORD dwret = WaitForSingleObject(m_hOpenOK, 4000); //if (dwret == WAIT_OBJECT_0) //{ // if (m_openCallbackFunc != nullptr) // m_openCallbackFunc(this, realPath, 1); // return true; //} if (OpenDevice() == RET_SUCCEED) return true; ////mLog::FINFO("LogicClient {$} Open Device {$} failed ", m_strClientName, realPath); std::cout << "XXXXXX XXXXXX [" << m_strClientName << "]LogicClient Open Device [" << realPath << "] failed " << endl; if (m_openCallbackFunc != nullptr) m_openCallbackFunc(this, realPath, 0); CloseConnection(m_pMqttConn); m_pMqttConn = nullptr; return false; } bool LogicClient::OnEndThread() { std::cout << "Thread End..." << endl; ////mLog::FINFO("End Client Thread"); //CloseDevice(); //ResDataObject req; ////make close req //PacketAnalizer::MakeCloseRequest(req, (*m_pFileHandle)); ////send it ////ret = pLogicDeivce->CmdFromLogicDev(&req); //PublishAction(&req, m_pFilePath->c_str(), m_pMqttConn); ////UnLock(); //DWORD dwret = WaitForSingleObject(m_hCloseOK, 1000); //if (dwret != WAIT_OBJECT_0) //{ // //printf("Thread:%d,Send Close OK\n", GetCurrentThreadId()); // //} //else //{ // printf("Thread:%d,Send Close Failed\n", GetCurrentThreadId()); //} //std::cout << CurrentDateTime() << "[" << m_strClientName << "] try Close Mqtt " << endl; //CloseConnection(m_pMqttConn); m_pMqttConn = nullptr; return true; } bool LogicClient::Exec(void) { //ResDataObject req; //if (ReadCmd(req)) //{ // return true; //} std::vector> hWait = { m_ExitFlag, m_hReOpen, }; DWORD dwRet = LinuxEvent::WaitForMultipleEvents(hWait, 10);//WaitForSingleObject(m_ExitFlag, 100); if (dwRet == WAIT_OBJECT_0) { std::cout << m_strClientName << " MQTT Prepare Exit " << endl; return false; } else if (dwRet == WAIT_OBJECT_0 + 1) { ////mLog::FDEBUG(" {$} try Real ReOpen", m_strClientName); //CloseDevice(); m_hDeviceOpenOK->ResetEvent(); OpenDevice(false); ////mLog::FDEBUG(" {$} try ReOpen over", m_strClientName); } usleep(1000); return true; } int LogicClient::ReOpenDevice() { ////mLog::FINFO(" {$} set to ReOpen",m_strClientName ); m_hDeviceOpenOK->ResetEvent(); m_hReOpen->SetEvent(); return RET_SUCCEED; } int LogicClient::Open(const char *pPath, int flags, const char* pszWS, DWORD timeout, logic_client_open_callback calback) { std::cout << CurrentDateTime() << "[" << m_strClientName << "] try Open " << pPath << endl; if (Lock(timeout) == false) { std::cout << "FERROR::OpenDev Lock Timeout for Dev[flag: " << flags << " ]: " << pPath << endl; ////mLog::FERROR("OpenDev Lock Timeout for Dev[flag: {$} ]: {$} ", flags, pPath); //GPRINTA_ERROR("OpenDev Lock Timeout for Dev[flag:%d]:%s", flags, pPath); return RET_TIMEOUT; } m_strWS = pszWS; if (InitClient(pPath, flags) == false) { UnLock(); std::cout << "FERROR::InitClient Failed for Dev [flag: " << flags << " ]: " << pPath << endl; ////mLog::FERROR("InitClient Failed for Dev [flag: {$} ]: {$} ", flags, pPath); //GPRINTA_ERROR("InitClient Failed for Dev[flag:%d]:%s", flags, pPath); return RET_FAILED; } std::cout <<"IsClosed" << endl; if (IsClosed() == false) { std::cout << "LogicClient::Open " << m_strClientName << " try is Already Opend: " << m_strDevicePath << endl; ////mLog::FINFO("LogicClient::Open {$} try is Already Opend: ", m_strClientName, m_strDevicePath); std::cout << CurrentDateTime() << " LogicClient::Open [" << m_strClientName << "] try is Already Opend: " << m_strDevicePath << endl; if (Close() == RET_PENDING) { UnLock(); std::cout << "OpenDev Close Method is onGoing.. for Dev [flag: " << flags << " ]: " << pPath << endl; ////mLog::FERROR("OpenDev Close Method is onGoing.. for Dev [flag: {$} ]: {$} ", flags, pPath); //GPRINTA_ERROR("OpenDev Close Method is onGoing.. for Dev[flag:%d]:%s", flags, pPath); return RET_PENDING; } } m_openCallbackFunc = calback; std::cout << "StartThread" << endl; StartThread(false, true); std::cout << "UnLock" << endl; UnLock(); if (calback == nullptr) { if (WaitTheThreadOnTheRun(timeout)) return RET_SUCCEED; std::cout << "WaitTheThreadOnTheRun Timeout " << timeout << " for Open " << pPath << endl; ////mLog::FERROR("WaitTheThreadOnTheRun Timeout {$} for Open {$}", timeout, pPath); //GPRINTA_ERROR("WaitTheThreadOnTheRun Timeout %i for Open %s", timeout, pPath); return RET_FAILED; } return RET_SUCCEED; } int LogicClient::Close() { RET_STATUS ret = RET_SUCCEED; ////mLog::FINFO(" LogicClient::Close {$} try Close path:",m_strClientName , m_strDevicePath); std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] try Close path: " << m_strDevicePath << endl; if (Lock(1000) == false) { ////mLog::FINFO("close wait timeout"); std::cout << "close wait timeout " << endl; //return RET_PENDING; } if (IsClosed() == false) { CloseConnection(m_pMqttConn); } StopThread(); //printf("Thread:%d,Close Entry\n", GetCurrentThreadId()); UnLock(); ////mLog::FINFO(" LogicClient::Close {$} CloseConnection : {$}", m_strClientName, m_strDevicePath); std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] CloseConnection : " << m_strDevicePath << endl; //Disconnect //DisconnectCDI(); //底层MQTT连接并不关闭,除非显式关闭,或者对象被释放 m_hDeviceOpenOK->ResetEvent(); m_pMqttConn = nullptr; m_ClosedFlag = true; ////mLog::FINFO(" LogicClient::Close {$} Close Over.:", m_strClientName, m_strDevicePath); std::cout << CurrentDateTime() << " LogicClient::Close [" << m_strClientName << "] Close Over.: " << m_strDevicePath << endl; return ret; } const char *LogicClient::GetLastErrorInfo() { return m_pLastMsgInfo->c_str(); } bool LogicClient::IsClosed() { return m_ClosedFlag; } //bool LogicClient::IsOpenOK() //{ // if (m_ClosedFlag) // return false; // DWORD dwRet = WaitForSingleObject(m_hDeviceOpenOK, 0); // if (dwRet == WAIT_OBJECT_0) // { // return true; // } // else // { // return false; // } //} /* bool LogicClient::IsAllReqFinished() { ClientSysIF *p = (ClientSysIF*)m_pSysIF; bool ret = false; ret = p->IsAllRequestFinished(); return (ret); } bool LogicClient::SetRequestSyncMode(ACTION_SYNC_MODE Sync) { if (IsAllReqFinished()) { m_SyncMode = Sync; return true; } return false; } ACTION_SYNC_MODE LogicClient::GetRequestSyncMode() { return m_SyncMode; }*/ RET_STATUS LogicClient::UpdateDeviceResource(DWORD timeout) { RET_STATUS ret = RET_FAILED; Lock(); ResDataObject req, res; try { ret = (RET_STATUS)Action("UpdateDeviceResource", req, res, timeout); if (ret >= RET_SUCCEED) { ////mLog::FINFO("m_DeviceResource.clear() [{$}]", m_strDevicePath); //update m_DeviceResource.clear(); m_DeviceResource = res; ////mLog::FINFO("m_DeviceResource clear [{$}] .set [{$}]", m_strDevicePath, res.encode()); } } catch (...) { ret = RET_FAILED; } UnLock(); return ret; } bool LogicClient::HasAction(const char* pszAction) { if (string(pszAction) == "UpdateDeviceResource") return true; int actid = m_DeviceResource.GetFirstOf("Action"); if (actid < 0) { ////mLog::FINFO("Device [{$}] has no Action Items for Action [{$}] in [{$}]", m_strDevicePath, pszAction, m_DeviceResource.encode()); return false; } ResDataObject actions = m_DeviceResource[actid]; if (actions.GetFirstOf(pszAction) >= 0) return true; ////mLog::FINFO("Device [{$}] Action no Items for Action [{$}] in [{$}]", m_strDevicePath, pszAction, actions.encode()); return false; } bool LogicClient::HasProperty(const char* pszProperty) { int atrid = m_DeviceResource.GetFirstOf("Attribute"); if (atrid < 0) return false; ResDataObject actions = m_DeviceResource[atrid]; if (actions.GetFirstOf(pszProperty) >= 0) return true; return false; } bool LogicClient::GetProperty(const char* pszProperty, ResDataObject& resRet) { int atrid = m_DeviceResource.GetFirstOf("Attribute"); if (atrid < 0) return false; ResDataObject propertys = m_DeviceResource[atrid]; if (propertys.GetFirstOf(pszProperty) >= 0) { resRet = propertys[pszProperty]; return true; } return false; } RET_STATUS LogicClient::GetDeviceType(GUID &DevType) { RET_STATUS ret = RET_FAILED; Lock(); try { if (IsClosed() == false) { if (string_2_guid((const char *)m_DeviceResource["DeviceType"], DevType)) { ret = RET_SUCCEED; } } } catch (...) { ret = RET_FAILED; } UnLock(); return ret; } RET_STATUS LogicClient::GetDeviceResource(ResDataObject PARAM_OUT *pDeviceResource) { RET_STATUS ret = RET_FAILED; bool blocked = Lock(); try { if (IsClosed() == false) { (*pDeviceResource) = m_DeviceResource; string res = m_DeviceResource.encode(); ////mLog::FINFO("GetDeviceResource ok. {$}", res.substr(0, 16384)); ret = RET_SUCCEED; UnLock(); return ret; } } catch (...) { ret = RET_FAILED; } UnLock(); ////mLog::FINFO("{$} GetDeviceResource failed with closedflag [{$}]",m_strClientName, m_ClosedFlag?1:0); return ret; } void LogicClient::PacketArrived(ResDataObject *pResponse) { //two kind packet. //1.response //1.1 open response //1.2 close response //no Locks!!!!!! //assert(0); //check priority one => CLOSE //if (PacketAnalizer::GetPacketCmd(pResponse) == PACKET_CMD_CLOSE) //{ // //we have disconnect here // printf("Thread:%d,Got Close Notify.Passive Close\n", GetCurrentThreadId()); // ProcessClose(*pResponse); // SetEvent(m_NotifyEvent);//notify to user // SetEvent(m_ResponseEvent);//notify to user // return; //} //1.3 update response //1.4 action response //1.5 data ?? //2.notify //2.1 data notify //m_pPacketReceivedQue->Lock(); m_pPacketReceivedQue->InQueue(*pResponse); //for test //GPRINTA_DEBUG("Client:%d,Thread %d:Got Packet--JG:%s",this, GetCurrentThreadId(), pResponse->encode()); if (m_ReqIdx != 0) { if (m_ReqIdx == PacketAnalizer::GetPacketIdx(pResponse)) { if (PacketAnalizer::GetPacketType(pResponse) == PACKET_TYPE_RES) { //check cmd if ((PacketAnalizer::GetPacketCmd(pResponse) == PACKET_CMD_EXE) && (PacketAnalizer::GetPacketKey(pResponse) == (*m_pActionName))) { //got match response //for test //GPRINTA_DEBUG("Client:%d,Got Response--JG",this); m_ResponseEvent->SetEvent(); //m_pPacketReceivedQue->UnLock(); return; } } ////mLog::FERROR(" {$} Action Resp Error {$}", m_strClientName, pResponse->encode()); std::cout << "[" << m_strClientName << "] Action Resp Error " << pResponse->encode() << endl; } else { ////mLog::FDEBUG(" {$} When Actioned.. Get Notify {$} ", m_strClientName, pResponse->encode()); std::cout << "[" << m_strClientName << "] When Actioned.. Get Notify " << pResponse->encode() << endl; } } else { ////mLog::FDEBUG(" {$} Get Notify {$}", m_strClientName, pResponse->encode()); std::cout << "[" << m_strClientName << "] Get Notify " << pResponse->encode() << endl; } m_ResponseEvent->SetEvent();//notify to user //m_pPacketReceivedQue->UnLock(); } int LogicClient::Action_Trans(ResDataObject* pOrgReq, DWORD timeout) { RET_STATUS ret = RET_FAILED; //ResDataObject req; if (Lock(timeout) == false) { return RET_TIMEOUT; } //printf("Thread:%d,Action Entry\n", GetCurrentThreadId()); if (m_ClosedFlag) { UnLock(); return ret; } if (IsThreadSafe() == false) { UnLock(); return RET_THREAD_INVALID; } //must be in SYNC mode if (m_SyncMode != ACTION_SYNC) { UnLock(); return ret; } //make packet request //m_ReqIdx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode); //(*m_pActionName) = pActionName; ////mLog::FDEBUG(" {$} at {$} Action_Trans Req: {$} ", m_strClientName, m_pFilePath->c_str(), pOrgReq->encode()); PacketAnalizer::UpdateContextTopic(pOrgReq, m_strDefaultTopic.c_str()); string actionName = PacketAnalizer::GetPacketKey(pOrgReq); PacketAnalizer::UpdatePacketTransaction(*pOrgReq, m_strCurTransaction); //send packet //ret = pLogicDeivce->CmdFromLogicDev(&req); int nret = PublishAction(pOrgReq, PacketAnalizer::GetActionTopic(m_pFilePath->c_str(), actionName.c_str()).c_str(), m_pMqttConn); if (nret > 0) { ret = RET_SUCCEED; } else { m_ReqIdx = 0; (*m_pActionName) = ""; } UnLock(); return ret; } int LogicClient::Action_Req(const char *pActionName, ResDataObject &ReqParams, DWORD timeout, const char* pszDevicePath) { RET_STATUS ret = RET_FAILED; ResDataObject req; if (Lock(timeout) == false) { ////mLog::FERROR("try Lock failed for :Action[{$}]", pActionName); return RET_TIMEOUT; } //printf("Thread:%d,Action Entry\n", GetCurrentThreadId()); if (m_ClosedFlag) { ////mLog::FERROR("Clinet {$} aready closed. :Action[{$}]", m_strClientName, pActionName); UnLock(); return ret; } if (IsThreadSafe() == false) { ////mLog::FERROR("IsThreadSafe() == false ThreadName {$}. :Action[{$}]", m_strName, pActionName); UnLock(); return RET_THREAD_INVALID; } if (pszDevicePath == nullptr && !HasAction(pActionName)) { ////mLog::FERROR("pszDevicePath == nullptr && !HasAction(pActionName) ThreadName {$}. :Action[{$}] with devPath[{$}]", m_strName, pActionName, pszDevicePath); return RET_SUCCEED; } //must be in SYNC mode if (m_SyncMode != ACTION_SYNC) { ////mLog::FERROR("m_SyncMode != ACTION_SYNC {$} for :Action[{$}]", m_strClientName, pActionName); UnLock(); return ret; } if (m_ReqIdx != 0) { ////mLog::FERROR(" {$} try call more than once Action_Req.Ignore first ActionReq Idx[{$}]:Action[{$}]", m_strClientName, m_ReqIdx, m_pActionName->c_str()); //GPRINTA_ERROR("try call more than once Action_Req.Ignore first ActionReq Idx[%u]:Action[%s] ",m_ReqIdx,m_pActionName->c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; } ////mLog::FDEBUG(" {$} at {$} Action_Req {$} Req: {$} ", m_strClientName, m_pFilePath->c_str() , pActionName, ReqParams.encode()); //make packet request m_ReqIdx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode); (*m_pActionName) = pActionName; PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str()); PacketAnalizer::UpdatePacketTransaction(req, m_strCurTransaction); //send packet //ret = pLogicDeivce->CmdFromLogicDev(&req); int nret = PublishAction(&req, PacketAnalizer::GetActionTopic(pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName).c_str(), m_pMqttConn); if (nret > 0) { ret = RET_SUCCEED; } else { m_ReqIdx = 0; (*m_pActionName) = ""; } UnLock(); return ret; } int LogicClient::Action_Res(const char *pActionName, ResDataObject &ResParams, DWORD timeout) { RET_STATUS ret = RET_FAILED; ResDataObject res; if (Lock(timeout) == false) { return RET_TIMEOUT; } if (m_ClosedFlag) { //for test //GPRINTA_DEBUG("Client:%d,Action_Res Closed ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; UnLock(); return RET_INVALID; } if (IsThreadSafe() == false) { //for test //GPRINTA_DEBUG("Client:%d,Action_Res ThreadUnSafe ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; UnLock(); return RET_THREAD_INVALID; } //must be in SYNC mode if (m_SyncMode != ACTION_SYNC) { //for test //GPRINTA_DEBUG("Client:%d,Action_Res No Sync ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; UnLock(); return ret; } if (m_ReqIdx == 0) { //for test //GPRINTA_DEBUG("Client:%d,Action_Res ReqIdx == 0 ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); (*m_pActionName) = ""; UnLock(); return ret; } //ReqIdx must not zero while (m_pPacketReceivedQue->WaitForInQue(timeout) == WAIT_OBJECT_0) { //got response if (m_pPacketReceivedQue->DeQueue(res)) { //update notify evt IsDataArrived(); if ((m_ReqIdx == PacketAnalizer::GetPacketIdx(&res)) && (PacketAnalizer::GetPacketType(&res) == PACKET_TYPE_RES)) { //check cmd if ((PacketAnalizer::GetPacketCmd(&res) != PACKET_CMD_EXE) || (PacketAnalizer::GetPacketKey(&res) != pActionName)) { //wrong packet continue; } //got match response ret = ProcessActionResponse(res, ResParams); //for test //GPRINTA_DEBUG("Client:%d,Action_Res Finished ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; UnLock(); return ret; } else { //got other one //check priority one => CLOSE if (PacketAnalizer::GetPacketCmd(&res) == PACKET_CMD_CLOSE) { //we have disconnect here ret = ProcessClose(res); //for test //GPRINTA_DEBUG("Client:%d,Action_Res Got Closed ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); m_ReqIdx = 0; (*m_pActionName) = ""; UnLock(); return ret; } ////mLog::FERROR("{$} got device[{$}] what packet {$}",m_strClientName, m_strDevicePath, res.encode()); m_pPacketWaitQue->InQueue(res); IsDataArrived(); } } } ret = RET_TIMEOUT; //for test //GPRINTA_DEBUG("Client:%d,Action_Res Got Timeout ReqIdx:%d,Name:%s",this, m_ReqIdx, (*m_pActionName).c_str()); //m_ReqIdx = 0; //(*m_pActionName) = ""; UnLock(); return ret; } std::shared_ptr LogicClient::GetResponseHandle() { return m_ResponseEvent; } void LogicClient::SubScribeTopic(const char* pszTopic, bool bSubscribe) { if (bSubscribe) SubscribeTopic(m_pMqttConn, pszTopic); else UnSubscribe(m_pMqttConn, pszTopic); } int LogicClient::Action(const char *pActionName, ResDataObject &ReqParams, ResDataObject &ResParams, DWORD timeout, const char* pszDevicePath) { RET_STATUS ret = RET_FAILED; ResDataObject req, res; if (Lock(timeout) == false) { ////mLog::FERROR("{$} try Action {$} LockTimeout", m_strClientName, pActionName); return RET_TIMEOUT; } //printf("Thread:%d,Action Entry\n", GetCurrentThreadId()); if (m_ClosedFlag) { ////mLog::FERROR("{$} try Action {$} Aready Closed", m_strClientName, pActionName); UnLock(); return ret; } if (IsThreadSafe() == false) { ////mLog::FERROR("{$} try Action {$} IsThreadSafe false", m_strClientName, pActionName); UnLock(); return RET_THREAD_INVALID; } if (pszDevicePath == nullptr && !HasAction(pActionName)) { ////mLog::FERROR("{$} try Action {$} with pszDevicePath == nullptr && !HasAction(pActionName)", m_strClientName, pActionName); UnLock(); return RET_SUCCEED; } ////mLog::FDEBUG(" {$} at {$} Action {$} Req: {$} ", m_strClientName, pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName, ReqParams.encode()); //make packet request DWORD reqidx = PacketAnalizer::MakeActionRequest(req, (*m_pFileHandle), pActionName, ReqParams, m_SyncMode); PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str()); PacketAnalizer::UpdatePacketTransaction(req, m_strCurTransaction); UnLock(); //send packet int ActionAndResp(ccos_mqtt_connection* hConnection,const char* pAction, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime ) //ret = pLogicDeivce->CmdFromLog icDev(&req); int nret = ActionAndRespWithConnDefalt(m_pMqttConn, pActionName,req, &ReqParams, PacketAnalizer::GetActionTopic(pszDevicePath==nullptr?m_pFilePath->c_str():pszDevicePath,pActionName).c_str(), res, timeout); if (nret > 0) { ret = RET_SUCCEED; DWORD retId = PacketAnalizer::GetPacketIdx(&res); PACKET_TYPE tpRes = PacketAnalizer::GetPacketType(&res); if ((reqidx == retId) && (tpRes == PACKET_TYPE_RES)) { //check cmd if ((PacketAnalizer::GetPacketCmd(&res) == PACKET_CMD_EXE)) { ret = ProcessActionResponse(res, ResParams); //UnLock(); return ret; } } else { ////mLog::FINFO("{$} What Resp ? [{$}]", m_strClientName, PacketAnalizer::GetPacketKey(&res)); std::cout << "What Resp ?" << PacketAnalizer::GetPacketKey(&res) << endl; } } ////mLog::FERROR("Failed={$} : {$} at {$} Action {$} Req: {$} ", (int)ret, m_strClientName, pszDevicePath == nullptr ? m_pFilePath->c_str() : pszDevicePath, pActionName, ReqParams.encode()); //else //{ // ReOpenDevice(); //} return ret; } bool LogicClient::IsThreadSafe() { return true; //return (m_OwnerThreadId == GetCurrentThreadId()); } std::shared_ptr LogicClient::GetNotifyHandle() { return m_NotifyEvent; } void LogicClient::ClearQueue_Internal() { //m_pPacketReceivedQue->Lock(); m_pPacketReceivedQue->Clear(); m_pPacketWaitQue->Clear(); m_NotifyEvent->ResetEvent(); //m_pPacketReceivedQue->UnLock(); } bool LogicClient::DeQueuePacket_Internal(ResDataObject &Packet) { bool ret = false; //m_pPacketReceivedQue->Lock(); //wait que first if (m_pPacketWaitQue->DeQueue(Packet)) { ret = true; } else if (m_pPacketReceivedQue->DeQueue(Packet)) { ret = true; } else { ret = false; } //set event if (m_pPacketWaitQue->size() > 0 || m_pPacketReceivedQue->size() > 0) { m_NotifyEvent->SetEvent(); } else { m_NotifyEvent->ResetEvent(); } //m_pPacketReceivedQue->UnLock(); return ret; } void LogicClient::CleanupQueue() { if (Lock(1000) == false) { //printf("Can't Lock Que for cleanup\n"); return; } ClearQueue_Internal(); UnLock(); } bool LogicClient::IsDataArrived() { bool ret = false; //the packet queue must be encapsulated //m_pPacketReceivedQue->Lock(); if (m_pPacketWaitQue->size() > 0 || m_pPacketReceivedQue->size() > 0) { ////mLog::FDEBUG("{$} have packet to process ..m_pPacketWaitQue {$} .m_pPacketReceivedQue {$}.", m_strClientName, m_pPacketWaitQue->size(), m_pPacketReceivedQue->size()); ret = true; m_ResponseEvent->SetEvent(); } else { m_ResponseEvent->ResetEvent(); } //m_pPacketReceivedQue->UnLock(); //m_pPacketNotifyQue->Lock(); if (m_pPacketNotifyQue->size() > 0 ) { ////mLog::FDEBUG("{$} have notify to process ..m_pPacketNotifyQue {$} .", m_strClientName, m_pPacketNotifyQue->size()); ret = true; m_NotifyEvent->SetEvent(); } else { m_NotifyEvent->ResetEvent(); } //m_pPacketNotifyQue->UnLock(); return ret; } /* * PACKET_CMD CommonLogicClient::ReadCmd(ResDataObject &CmdObject) { PACKET_CMD ret = PACKET_CMD_NONE; while (IsDataArrived()) { ret = LogicClient::ReadCmd(CmdObject); if (ret != PACKET_CMD_NONE) { return ret; } } return ret; } */ /// /// 子类CommonLogicClient ReadCMD判断 需要检查逻辑,进行兼容性修改 /// /// /// PACKET_CMD LogicClient::ReadCmd(ResDataObject &CmdObject) { PACKET_CMD cmd = PACKET_CMD_NONE; //check connection if (IsClosed() == true) { cmd = PACKET_CMD_CLOSE; ClearQueue_Internal(); return cmd; } if(IsThreadSafe() == false) { ClearQueue_Internal(); return cmd; } if (Lock(100) == false) { std::cout << "LogicClient::ReadCmd " << m_strClientName << "] Lock timeout Receved Que has [" << m_pPacketReceivedQue->size() << "] Object" << endl; return cmd; } //read receive que second RET_STATUS ret = RET_NOSUPPORT; PACKET_TYPE rType; ResDataObject rMsg; string key; //read wait que first while (DeQueuePacket_Internal(rMsg)) { rType = PacketAnalizer::GetPacketType(&rMsg); cmd = PacketAnalizer::GetPacketCmd(&rMsg); key = PacketAnalizer::GetPacketKey(&rMsg); std::cout << "ReadCmd [" << m_strClientName << "] Type: " << rType << "cmd: " << cmd << " Key: " << key << endl; ////mLog::FINFO("{$} ReadCmd Type: {$} cmd: {$} Key: {$} ", m_strClientName, (int)rType, (int)cmd, key); if (cmd > PACKET_CMD_NONE && cmd < PACKET_CMD_MAX) { if (rType == PACKET_TYPE_RES) { //1.response CmdObject = rMsg; //[open] need care if (cmd == PACKET_CMD_OPEN) { ret = ProcessOpenResponse(rMsg); if (ret >= RET_SUCCEED) { UnLock(); return cmd; } } } else if (rType == PACKET_TYPE_NOTIFY) { //2.notify UpdateDeviceResource(cmd, rMsg); CmdObject = rMsg; UnLock(); return cmd; } else { //ignore it cmd = PACKET_CMD_NONE; } } rMsg.clear(); } UnLock(); return cmd; } PACKET_CMD LogicClient::ReadNotify(ResDataObject& CmdObject) { CmdObject.clear(); //m_pPacketNotifyQue->Lock(); if (m_pPacketNotifyQue->DeQueue(CmdObject)) { //m_pPacketNotifyQue->UnLock(); return PacketAnalizer::GetPacketCmd(&CmdObject); } m_NotifyEvent->ResetEvent(); if(m_pPacketReceivedQue->DeQueue(CmdObject)) { return PacketAnalizer::GetPacketCmd(&CmdObject); } m_ResponseEvent->ResetEvent(); //m_pPacketNotifyQue->UnLock(); return PACKET_CMD_NONE; } RET_STATUS SYSTEM_CALL LogicClient::WaitForState(const char* state, DWORD timeout) { PACKET_CMD cmd; ResDataObject data; ResDataObject context; string strKey, value; DWORD firstTick = GetTickCount(); std::shared_ptr notifyhandle = GetNotifyHandle(); while (true) { while (IsDataArrived()) { data.clear(); cmd = ReadCmd(data); strKey = PacketAnalizer::GetPacketKey(&data); PacketAnalizer::GetPacketContext(&data, context); value = (const char *)context; if (strKey == "CurrentState") { if (value == string(state)) return RET_SUCCEED; } else if (strKey == string(state)) { return RET_SUCCEED; } } if ((GetTickCount() - firstTick) < timeout) { if (notifyhandle->Wait(timeout)) { continue; } } break; } return RET_FAILED; } RET_STATUS LogicClient::UpdateDeviceResource(int cmdType,ResDataObject &packet) { RET_STATUS ret = RET_SUCCEED; Lock(); try { string strKey = PacketAnalizer::GetPacketKey(&packet); ResDataObject ResContext; PacketAnalizer::GetPacketContext(&packet, ResContext); if (cmdType == PACKET_CMD_UPDATE) { if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0) { m_DeviceResource[ATTRIBUTE].update(strKey.c_str(), ResContext); } } else if (cmdType == PACKET_CMD_ADD) { if (strKey=="ErrorList") { /*pLogicDeivce->*/GetDeviceResource(&m_DeviceResource); } else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) < 0) { m_DeviceResource[ATTRIBUTE].add(strKey.c_str(), ResContext); } else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0 && (string)ResContext != "" &&m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str()) < 0) { m_DeviceResource[ATTRIBUTE][strKey.c_str()].add(((string)ResContext).c_str(), ""); } } else if (cmdType == PACKET_CMD_DEL) { if (strKey == "ErrorList") { /*pLogicDeivce->*/GetDeviceResource(&m_DeviceResource); } else if (m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str()) >= 0 ) { if ((string)ResContext != "" && m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str()) >= 0) { m_DeviceResource[ATTRIBUTE][strKey.c_str()].eraseOneOf(((string)ResContext).c_str(), m_DeviceResource[ATTRIBUTE][strKey.c_str()].GetFirstOf(((string)ResContext).c_str())); } else { m_DeviceResource[ATTRIBUTE].eraseOneOf(strKey.c_str(), m_DeviceResource[ATTRIBUTE].GetFirstOf(strKey.c_str())); } } } } catch (...) { ret = RET_FAILED; } UnLock(); return ret; }