Parcourir la source

为MQTT中增加延时防止消息丢失!

lwk il y a 1 jour
Parent
commit
49579bea24
2 fichiers modifiés avec 106 ajouts et 55 suppressions
  1. 53 47
      LogicClient/LogicClient.cpp
  2. 53 8
      LogicDevice--mqtt/LogicDevice.cpp

+ 53 - 47
LogicClient/LogicClient.cpp

@@ -16,7 +16,7 @@
 //Log4CPP::Logger* gLogger = nullptr;
 string LogHost = "";
 
-// Linux下获取进程ID
+// Linux涓嬭幏鍙栬繘绋婭D
 inline DWORD GetCurrentProcessId() {
 	return getpid();
 }
@@ -32,24 +32,24 @@ inline string CurrentDateTime2()
 
 inline std::string CurrentDateTime()
 {
-	// 获取当前时间点
+	// 鑾峰彇褰撳墠鏃堕棿鐐�
 	auto now = std::chrono::system_clock::now();
-	// 将时间长度转换为微秒数
+	// 灏嗘椂闂撮暱搴﹁浆鎹�负寰��鏁�
 	auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
-	// 再转成tm格式
+	// 鍐嶈浆鎴恡m鏍煎紡
 	auto now_time_t = std::chrono::system_clock::to_time_t(now);
 	auto now_tm = std::localtime(&now_time_t);
 
-	// 可以直接输出到标准输出
+	// 鍙�互鐩存帴杈撳嚭鍒版爣鍑嗚緭鍑�
 	// std::cout << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(6) << now_us.count() % 1000000 << std::endl;
 
-	// 格式化字符,年月日时分秒
+	// 鏍煎紡鍖栧瓧绗︼紝骞存湀鏃ユ椂鍒嗙�
 	std::string now_time_str;
 	char buf[64];
 	std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", now_tm);
 	now_time_str += buf;
 
-	// 格式化微秒
+	// 鏍煎紡鍖栧井绉�
 	//snprintf(buf, sizeof(buf), ".%06lld ", now_us.count() % 1000000);
 	snprintf(buf, sizeof(buf), ".%06ld ", now_us.count() % 1000000);
 
@@ -121,23 +121,6 @@ LogicClient::LogicClient(string szClientName, string szTransaction, string szTyp
 	m_openCallbackFunc = nullptr;
 
 	SetName(m_strClientName.c_str());
-
-
-	string strLogPath = GetProcessDirectory() + R"(/Conf/log_config.xml)";
-	string LogHost = (string)getRootpath();
-	std::string moduleName = "LogicClient";
-	bool ret = initLogModule(
-		LogHost,       // 主机名(用于日志路径中的{host}占位符)
-		moduleName,        // 唯一模块名
-		strLogPath,  // 配置文件路径
-		true           // 是否输出到控制台(可选)
-	);
-	if (!ret) {
-		std::cerr << "Log init failed!" << std::endl;
-	}
-	// 绑定当前动态库的模块名(调用自身实现的接口)
-	ClientSetLocalModuleName(moduleName);
-	FINFO("Code Build datetime [{$} {$}]", __DATE__, __TIME__);
 }
 
 
@@ -150,7 +133,7 @@ LogicClient::~LogicClient(void)
 	}
 
 	if (m_Base_Thread != 0) {  
-		StopThread(3000);  // 再次尝试停止线程
+		StopThread(3000);  // 鍐嶆�灏濊瘯鍋滄�绾跨▼
 	}
 
 	//delete m_pSysIF;
@@ -306,7 +289,7 @@ bool LogicClient::InitClient(const char *pPath, int flags)
 
 	if (m_strDevicePath.length() <= 0)
 	{
-		//允许透传消息
+		//鍏佽�閫忎紶娑堟伅
 		m_SyncMode = ACTION_SYNC;
 	}
 
@@ -326,6 +309,22 @@ bool LogicClient::InitClient(const char *pPath, int flags)
 
 	}
 
+	string strLogPath = GetProcessDirectory() + R"(/Conf/log_config.xml)";
+	string LogHost = (string)getRootpath();
+	std::string moduleName = "LogicClient";
+	bool ret = initLogModule(
+		LogHost,       // 涓绘満鍚嶏紙鐢ㄤ簬鏃ュ織璺�緞涓�殑{host}鍗犱綅绗︼級
+		moduleName,        // 鍞�竴妯″潡鍚�
+		strLogPath,  // 閰嶇疆鏂囦欢璺�緞
+		true           // 鏄�惁杈撳嚭鍒版帶鍒跺彴锛堝彲閫夛級
+	);
+	if (!ret) {
+		std::cerr << "Log init failed!" << std::endl;
+	}
+	// 缁戝畾褰撳墠鍔ㄦ€佸簱鐨勬ā鍧楀悕锛堣皟鐢ㄨ嚜韬�疄鐜扮殑鎺ュ彛锛�
+	ClientSetLocalModuleName(moduleName);
+	FINFO("Code Build datetime [{$} {$}] LogHost:{$}", __DATE__, __TIME__, LogHost);
+
 	FWARN("\n{$}=============== LogicClient log begin : version:1.0.0.0 ===================\n", m_strClientName);
 
 	return true;
@@ -470,12 +469,18 @@ RET_STATUS LogicClient::ProcessOpenResponse(ResDataObject &packet)
 						if (m_bNeedNotify && m_ClosedFlag) {
 							const char* realPath = NULL;
 							if (m_strDevicePath[0] == '/')
-								realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
+								realPath = ((char*)m_strDevicePath.c_str()) + 1; //鍘绘帀棣栧瓧绗�"/"
 							else
 								realPath = m_strDevicePath.c_str();
 							string notifyTopic = realPath;
 							notifyTopic += "/Notify/#";
 							SubscribeTopic(m_pMqttConn, notifyTopic.c_str());
+
+							// 绛夊緟璁㈤槄鐢熸晥锛氱‘淇漇UBACK宸插�鐞�,閬垮厤鍚庣画娑堟伅涓㈠け
+							// 100ms鍦ㄦ煇浜涙儏鍐典笅涓嶅�,澧炲姞鍒�200ms鎻愰珮绋冲畾鎬�
+							const int notifySubDelayMs = 200;
+							FINFO("[LogicClient::ProcessOpenResponse] {$} - Waiting {$}ms for Notify subscription", m_strClientName, notifySubDelayMs);
+							usleep(notifySubDelayMs * 1000);
 						}
 
 						if (m_openCallbackFunc != nullptr)
@@ -488,7 +493,7 @@ RET_STATUS LogicClient::ProcessOpenResponse(ResDataObject &packet)
 						m_hDeviceOpenOK->SetEvent();
 						
 
-						SetThreadOnTheRun(true); //线程可以正常运转
+						SetThreadOnTheRun(true); //绾跨▼鍙�互姝e父杩愯浆
 						m_ClosedFlag = false;
 						return ret;
 					}
@@ -507,7 +512,7 @@ RET_STATUS LogicClient::CloseDevice()
 	RET_STATUS ret = RET_FAILED;
 	const char* realPath = NULL;
 	if (m_strDevicePath[0] == '/')
-		realPath = ((char*)m_strDevicePath.c_str()) + 1; //去掉首字符"/"
+		realPath = ((char*)m_strDevicePath.c_str()) + 1; //鍘绘帀棣栧瓧绗�"/"
 	else
 		realPath = m_strDevicePath.c_str();
 
@@ -535,7 +540,7 @@ RET_STATUS LogicClient::CloseDevice()
 
 	m_hDeviceOpenOK->ResetEvent();
 	m_hDeviceCloseOK->SetEvent();
-	//标志
+	//鏍囧織
 	//m_ClosedFlag = true;
 
 ////make close req
@@ -546,7 +551,7 @@ RET_STATUS LogicClient::CloseDevice()
 	{
 		if (m_strWS.length() > 0)
 		{
-			//需要显式 上设备上线
+			//闇€瑕佹樉寮� 涓婅�澶囦笂绾�
 			ResDataObject resContext;
 			resContext.add("Offline", m_strWS.c_str());
 			PacketAnalizer::UpdatePacketContext(req, resContext);
@@ -595,29 +600,30 @@ RET_STATUS LogicClient::OpenDevice(bool bAsync)
 
 	if (m_ClosedFlag)
 	{
-		std::cout << "[LogicClient::OpenDevice-Info] " << m_strClientName << " - Subscribing to default MQTT topic: " << m_strDefaultTopic << std::endl;
+		FINFO("[LogicClient::OpenDevice-Info] {$} - Subscribing to default MQTT topic: {$}", m_strClientName, m_strDefaultTopic);
 		SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str());
 
-		// 等待订阅完成
-		const int delayMs = 206;
-		std::cout << "[LogicClient::OpenDevice-Info] " << m_strClientName << " - Waiting " << delayMs << "ms to ensure MQTT subscription completes" << std::endl;
+		// 绛夊緟璁㈤槄鐢熸晥锛氬凡鍦↖nnerConnect涓�瓑寰匔ONNACK,姝ゅ�棰濆�绛夊緟200ms纭�繚SUBACK澶勭悊
+		// 100ms鍦ㄦ煇浜涙儏鍐典笅涓嶅�,澧炲姞鍒�200ms鎻愰珮绋冲畾鎬�
+		const int delayMs = 200;
+		FINFO("[LogicClient::OpenDevice-Info] {$} - Waiting {$}ms to ensure MQTT subscription completes", m_strClientName, delayMs);
 		usleep(delayMs * 1000);
 	}
 
-	// 输出调试信息
+	// 杈撳嚭璋冭瘯淇℃伅
 	std::cout << "[LogicClient::OpenDevice-Debug] " << m_strClientName << " - OpenDevice (Path: " << realPath
 		<< ", Default Topic: " << m_strDefaultTopic
 		<< ", Async Mode: " << (bAsync ? "Enabled" : "Disabled")
 		<< (m_ClosedFlag ? "" : ", MQTT Topic: No Re-Subscription Required") << std::endl;
 
-	// 构建并发送请求
+	// 鏋勫缓骞跺彂閫佽�姹�
 	ResDataObject req;
 	DWORD reqidx = PacketAnalizer::MakeOpenRequest(req, (*m_pFileHandle), realPath);
 	PacketAnalizer::UpdateOpenRequest(req, getLocalMachineId(), getLocalEbusId(), GetCurrentProcessId(), (UINT64)m_pMqttConn);
 	PacketAnalizer::UpdateContextTopic(&req, m_strDefaultTopic.c_str());
 	PacketAnalizer::UpdatePacketTopic(&req, realPath, m_strClientName.c_str());
 
-	// 处理在线标志
+	// 澶勭悊鍦ㄧ嚎鏍囧織
 	if ((m_FileFlags & CCOS_FILE_FLAGS::ABSCRACT_ONLINE) == CCOS_FILE_FLAGS::ABSCRACT_ONLINE && !m_strWS.empty())
 	{
 		ResDataObject resContext;
@@ -626,7 +632,7 @@ RET_STATUS LogicClient::OpenDevice(bool bAsync)
 		PacketAnalizer::UpdatePacketContext(req, resContext);
 	}
 
-	// 发布请求
+	// 鍙戝竷璇锋眰
 	int publishResult = PublishAction(&req, realPath, m_pMqttConn);
 
 	UnLock();
@@ -668,9 +674,9 @@ bool LogicClient::OnStartThread()
 
 	std::cout << "OnStartThreadL:m_strDevicePath: " << m_strDevicePath << std::endl;
 	if (!m_strDevicePath.empty() && m_strDevicePath[0] == '/')
-		realPath = m_strDevicePath.substr(1); // string直接截取,安全无指针问题
+		realPath = m_strDevicePath.substr(1); // string鐩存帴鎴�彇锛屽畨鍏ㄦ棤鎸囬拡闂��
 	else
-		realPath = m_strDevicePath; // 直接赋值string,值拷贝安全
+		realPath = m_strDevicePath; // 鐩存帴璧嬪€約tring锛屽€兼嫹璐濆畨鍏�
 
 	//if (Lock(10000) == false)
 	//{
@@ -685,11 +691,11 @@ bool LogicClient::OnStartThread()
 	//make open req
 	//HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
 
-	// 新增调试输出
+	// 鏂板�璋冭瘯杈撳嚭
 	if (m_pMqttConn) {
 		std::cout << "m_pMqttConn Memory address: " << m_pMqttConn << std::endl;
 
-		// 解包元组内容并输出
+		// 瑙e寘鍏冪粍鍐呭�骞惰緭鍑�
 		void* mqttClient = std::get<0>(*m_pMqttConn);
 		mqtt_topic_list* topicList = std::get<1>(*m_pMqttConn);
 		void* msgHook = std::get<2>(*m_pMqttConn);
@@ -724,7 +730,7 @@ bool LogicClient::OnStartThread()
 			if (m_pMqttConn == NULL)
 				return;
 
-			//回调函数不再加锁,要确保每一步都是ok的
+			//鍥炶皟鍑芥暟涓嶅啀鍔犻攣锛岃�纭�繚姣忎竴姝ラ兘鏄痮k鐨�
 			/*
 			if (Lock(1000) == false)
 			{
@@ -803,7 +809,7 @@ bool LogicClient::OnStartThread()
 
 							if (rType == PACKET_TYPE_NOTIFY /*&& m_pActionName->size() <= 0*/ && cmd == PACKET_CMD_UPDATE)
 							{
-								//只有在非Action期间,更新,如果有遗漏,再说 2024.9.2
+								//鍙�湁鍦ㄩ潪Action鏈熼棿锛屾洿鏂帮紝濡傛灉鏈夐仐婕忥紝鍐嶈� 2024.9.2
 								cmd = PacketAnalizer::GetPacketCmd(rsp);
 								key = PacketAnalizer::GetPacketKey(rsp);
 								//2.notify
@@ -1047,7 +1053,7 @@ int LogicClient::Close()
 	//Disconnect
 	//DisconnectCDI();
 
-	//底层MQTT连接并不关闭,除非显式关闭,或者对象被释放
+	//搴曞眰MQTT杩炴帴骞朵笉鍏抽棴锛岄櫎闈炴樉寮忓叧闂�紝鎴栬€呭�璞¤�閲婃斁
 	m_hDeviceOpenOK->ResetEvent();
 	m_pMqttConn = nullptr;
 	m_ClosedFlag = true;
@@ -1805,7 +1811,7 @@ bool LogicClient::IsDataArrived()
 */
 
 /// <summary>
-/// 子类CommonLogicClient ReadCMD判断 需要检查逻辑,进行兼容性修改
+/// 瀛愮被CommonLogicClient ReadCMD鍒ゆ柇 闇€瑕佹�鏌ラ€昏緫锛岃繘琛屽吋瀹规€т慨鏀�
 /// </summary>
 /// <param name="CmdObject"></param>
 /// <returns></returns>

+ 53 - 8
LogicDevice--mqtt/LogicDevice.cpp

@@ -379,7 +379,9 @@ void SYSTEM_CALL LogicDevice::CompleteInit()
 
 	if (m_pMqttConntion != nullptr)
 	{
-		StartThread(); //启动Request线程
+		// 使用异步启动避免阻塞:Sync=false表示不等待线程启动完成
+		// 如果Sync=true,会等待300秒直到线程设置m_RunFlag,可能导致启动卡顿
+		StartThread(false); //启动Request线程(异步)
 
 		SubscribeSelf();
 
@@ -1920,7 +1922,12 @@ void  LogicDevice::SubscribeActions()
 	//FINFO("{$} Subscribe Action {$} return {$}  topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
 	//*/
 
-	
+	// 等待订阅生效:给MQTT Broker足够时间处理订阅并返回SUBACK
+	// 这样可以确保后续的消息能够被正确路由
+	// 必须在所有订阅完成后等待,包括SubscribeModule中的订阅
+	FINFO("{$} Waiting for subscriptions to take effect...", m_strClientID);
+	usleep(500 * 1000); // 等待500ms,确保SUBACK已处理
+	FINFO("{$} Subscriptions ready", m_strClientID);
 
 }
 /*
@@ -2201,6 +2208,7 @@ void msgarrivd(void* client, message_data_t* message)
 
 	// 输出消息接收日志
 	std::cout << logPrefix << clientId << " Get Msg from " << topic << std::endl;
+	FINFO("[msgarrivd] {$} received message from topic: {$}, payload_len: {$}", clientId, topic, message->message->payloadlen);
 
 	// 校验消息数据
 	if (!message->message || !message->message->payload) {
@@ -2229,9 +2237,15 @@ void msgarrivd(void* client, message_data_t* message)
 	try {
 		std::string payload(static_cast<const char*>(message->message->payload), payloadLen);
 		req.decode(payload.c_str());
+		FINFO("[msgarrivd] {$} decoded message successfully, KEY: {$}, TYPE: {$}, CMD: {$}",
+			clientId,
+			PacketAnalizer::GetPacketKey(&req),
+			(int)PacketAnalizer::GetPacketType(&req),
+			(int)PacketAnalizer::GetPacketCmd(&req));
 	}
 	catch (...) {
 		pLock->Thread_UnLock();
+		FERROR("[msgarrivd] {$} failed to decode message from topic: {$}", clientId, topic);
 		return;
 	}
 
@@ -2255,13 +2269,17 @@ void msgarrivd(void* client, message_data_t* message)
 
 			auto func = std::get<FILTER_FUNC_ID>(*filter);
 			std::cout << logPrefix << clientId << " Got Hook Process " << topic << std::endl;
+			FINFO("[msgarrivd] {$} processing hook for topic: {$}", clientId, topic);
 
 			auto* pRequests = std::get<FILTER_RES_OBJ_ID>(*filter);
 			if (!pRequests) {
 				pLock->Thread_UnLock();
+				FWARN("[msgarrivd] {$} pRequests is null", clientId);
 				return;
 			}
 
+			FINFO("[msgarrivd] {$} checking {$} pending requests in hook", clientId, pRequests->size());
+
 			// 遍历请求处理钩子
 			for (int x = 0; x < pRequests->size(); x++) {
 				std::cout << logPrefix << clientId << " Hook Process Function " << topic << std::endl;
@@ -2270,11 +2288,15 @@ void msgarrivd(void* client, message_data_t* message)
 				ResDataObject resObj = (*pRequests)[x];
 				auto hEvent = std::get<FILTER_HANDLE_ID>(*filter);
 
+				FINFO("[msgarrivd] {$} comparing: pending_key=[{$}] vs received_key=[{$}], msg_type={$}",
+					clientId, keyAction, PacketAnalizer::GetPacketKey(&req), (int)PacketAnalizer::GetPacketType(&req));
+
 				// 匹配钩子消息
 				if (PacketAnalizer::GetPacketType(&req) == PACKET_TYPE_RES &&
 					keyAction == PacketAnalizer::GetPacketKey(&req)) {
 
 					std::cout << logPrefix << clientId << " Got Hook Function Hooked.. " << topic << std::endl;
+					FINFO("[msgarrivd] {$} MATCHED! Hook triggered for key: {$}", clientId, keyAction);
 
 					// 打开事件(优先使用resObj,失败则用默认)
 					auto event = LinuxEvent::OpenEvent(std::string(resObj).c_str());
@@ -2282,26 +2304,42 @@ void msgarrivd(void* client, message_data_t* message)
 
 					// 设置响应对象
 					auto* pResponse = std::get<FILTER_RESPONS_OBJ_ID>(*filter);
-					if (pResponse) pResponse->add(keyAction.c_str(), req);
+					if (pResponse) {
+						pResponse->add(keyAction.c_str(), req);
+						FINFO("[msgarrivd] {$} added response to pResponse for key: {$}", clientId, keyAction);
+					} else {
+						FWARN("[msgarrivd] {$} pResponse is null, cannot store response", clientId);
+					}
 
 					// 触发事件
 					if (event) {
-						try { event->SetEvent(); }
-						catch (...) {}
+						try {
+							event->SetEvent();
+							FINFO("[msgarrivd] {$} event signaled successfully for key: {$}", clientId, keyAction);
+						}
+						catch (...) {
+							FERROR("[msgarrivd] {$} failed to signal event for key: {$}", clientId, keyAction);
+						}
+					} else {
+						FWARN("[msgarrivd] {$} event is null, cannot signal", clientId);
 					}
 
 					// 清理并退出
 					pRequests->eraseOneOf(keyAction.c_str(), 0);
+					FINFO("[msgarrivd] {$} erased request from pending list, key: {$}", clientId, keyAction);
 					pLock->Thread_UnLock();
 					return;
 				}
 			}
 		}
-		catch (...) {}
+		catch (...) {
+			FERROR("[msgarrivd] {$} exception in hook processing for topic: {$}", clientId, topic);
+		}
 	}
 
 	// 未命中钩子,继续处理
 	std::cout << logPrefix << clientId << " Go on Process " << topic << std::endl;
+	FINFO("[msgarrivd] {$} hook not matched, calling user callback for topic: {$}", clientId, topic);
 	pLock->Thread_UnLock();
 
 	// 调用用户回调
@@ -2516,14 +2554,21 @@ mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
 
 		if (connected) {
 			uint64_t total_time = GetTickCount() - start_time;
-			std::cout << "[SUCCESS] Connected in " << total_time << "ms after " << attempt_count << " attempts" << std::endl;
+			FINFO("[SUCCESS] Connected in {$}ms after {$} attempts", total_time, attempt_count);
+
+			// 等待CONNACK处理完成:mqtt_connect()是异步的,返回0只表示CONNECT包已发送
+			// 需要等待MQTT Broker返回CONNACK并完成处理,否则后续的订阅可能无效
+			const int connackDelayMs = 500;
+			FINFO("[MQTT] Waiting {$}ms for CONNACK processing...", connackDelayMs);
+			usleep(connackDelayMs * 1000);
+			FINFO("[MQTT] CONNACK wait complete, connection ready for subscriptions");
 
 			// 安全地设置连接上下文
 			try {
 				return client;
 			}
 			catch (const std::exception& e) {
-				std::cerr << "[ERROR] Failed to set connection context: " << e.what() << std::endl;
+				FERROR("[ERROR] Failed to set connection context: {$}", e.what());
 				return nullptr;
 			}
 		}