Pārlūkot izejas kodu

为MQTT加入延时,防止启动时因为前一个消息还在QS2的握手协议中,后一个消息就到了,导致后一个消息被吞了!

lwk 15 stundas atpakaļ
vecāks
revīzija
8140bc7b08

+ 23 - 11
LogicClient/LogicClient.cpp

@@ -3,7 +3,7 @@
 #include "LogicClient.h"
 #include "common_api.h"
 #include "PacketAnalizer.h"
-#include "LogLocalHelper.h" 
+#include "LogLocalHelper.h"
 #include "Log4CPP.h"
 #define ATTRIBUTE ("Attribute")
 #include <iostream>
@@ -476,11 +476,16 @@ RET_STATUS LogicClient::ProcessOpenResponse(ResDataObject &packet)
 							notifyTopic += "/Notify/#";
 							SubscribeTopic(m_pMqttConn, notifyTopic.c_str());
 
-							// 等待订阅生效:确保SUBACK已处理,避免后续消息丢失
-							// 100ms在某些情况下不够,增加到200ms提高稳定性
-							const int notifySubDelayMs = 200;
-							FINFO("[LogicClient::ProcessOpenResponse] {$} - Waiting {$}ms for Notify subscription", m_strClientName, notifySubDelayMs);
-							usleep(notifySubDelayMs * 1000);
+							// 等待订阅完成:轮询检查SUBACK是否收到,确保Notify订阅真正生效
+							const DWORD notifyTimeoutMs = 5000;
+							FINFO("[LogicClient::ProcessOpenResponse] {$} - Waiting for Notify subscription to complete (timeout: {$}ms)", m_strClientName, notifyTimeoutMs);
+							if (!WaitForSubscriptionComplete(m_pMqttConn, notifyTimeoutMs)) {
+								FERROR("[LogicClient::ProcessOpenResponse] {$} - Notify subscription did not complete within timeout", m_strClientName);
+							}
+
+							// SUBACK收到后,等待Broker完成内部路由表更新
+							// 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
+							WaitForBrokerRouting(m_strClientName.c_str());
 						}
 
 						if (m_openCallbackFunc != nullptr)
@@ -603,11 +608,18 @@ RET_STATUS LogicClient::OpenDevice(bool bAsync)
 		FINFO("[LogicClient::OpenDevice-Info] {$} - Subscribing to default MQTT topic: {$}", m_strClientName, m_strDefaultTopic);
 		SubscribeTopic(m_pMqttConn, m_strDefaultTopic.c_str());
 
-		// 等待订阅生效:已在InnerConnect中等待CONNACK,此处额外等待200ms确保SUBACK处理
-		// 100ms在某些情况下不够,增加到200ms提高稳定性
-		const int delayMs = 200;
-		FINFO("[LogicClient::OpenDevice-Info] {$} - Waiting {$}ms to ensure MQTT subscription completes", m_strClientName, delayMs);
-		usleep(delayMs * 1000);
+		// 等待订阅完成:轮询检查SUBACK是否收到,而不是使用固定延时
+		// 这样可以确保订阅真正生效后才发送消息,避免时序问题导致消息丢失
+		const DWORD timeoutMs = 5000; // 5秒超时
+		FINFO("[LogicClient::OpenDevice-Info] {$} - Waiting for subscription to complete (timeout: {$}ms)", m_strClientName, timeoutMs);
+		if (!WaitForSubscriptionComplete(m_pMqttConn, timeoutMs)) {
+			FERROR("[LogicClient::OpenDevice-Error] {$} - Subscription did not complete within timeout", m_strClientName);
+			// 即使超时也继续,但记录错误
+		}
+
+		// SUBACK收到后,等待Broker完成内部路由表更新
+		// 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
+		WaitForBrokerRouting(m_strClientName.c_str());
 	}
 
 	// 输出调试信息

+ 117 - 14
LogicDevice--mqtt/LogicDevice.cpp

@@ -251,6 +251,19 @@ void LogicDevice::SubscribeSelf() {
 
 		//订阅
 	}
+
+	// 等待初始订阅完成(EBusRoot、CCOSDevice、Abstract等)
+	FINFO("{$} Waiting for initial subscriptions to complete...", m_strClientID);
+	const DWORD initialSubTimeoutMs = 5000;
+	if (!WaitForSubscriptionComplete(m_pMqttConntion, initialSubTimeoutMs)) {
+		FERROR("{$} Some initial subscriptions did not complete within timeout", m_strClientID);
+	}
+
+	// SUBACK收到后,等待Broker完成路由表更新
+	// 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
+	WaitForBrokerRouting(m_strClientID.c_str());
+
+	// 订阅Actions(内部也会等待SUBACK + Broker延时)
 	SubscribeActions();
 }
 
@@ -1922,12 +1935,19 @@ void  LogicDevice::SubscribeActions()
 	//FINFO("{$} Subscribe Action {$} return {$}  topic num {$}", m_strClientID, pszAction, ret, pTopicList->size());
 	//*/
 
-	// 等待订阅生效:给MQTT Broker足够时间处理订阅并返回SUBACK
-	// 这样可以确保后续的消息能够被正确路由
-	// 必须在所有订阅完成后等待,包括SubscribeModule中的订阅
-	FINFO("{$} Waiting for subscriptions to take effect...", m_strClientID);
-	usleep(500 * 1000); // 等待500ms,确保SUBACK已处理
-	FINFO("{$} Subscriptions ready", m_strClientID);
+	// 等待所有订阅完成:轮询检查ack_list,确保所有SUBACK都已收到
+	// 这比固定延时更可靠,能确保订阅真正生效后才继续
+	FINFO("{$} Waiting for all subscriptions to complete...", m_strClientID);
+	const DWORD subscribeTimeoutMs = 10000; // 10秒超时(可能有多个订阅)
+	if (!WaitForSubscriptionComplete(m_pMqttConntion, subscribeTimeoutMs)) {
+		FERROR("{$} Some subscriptions did not complete within timeout", m_strClientID);
+	} else {
+		FINFO("{$} All subscriptions ready", m_strClientID);
+	}
+
+	// SUBACK收到后,等待Broker完成内部路由表更新
+	// 延时可通过环境变量MQTT_BROKER_ROUTING_DELAY_MS配置,默认200ms
+	WaitForBrokerRouting(m_strClientID.c_str());
 
 }
 /*
@@ -2557,8 +2577,9 @@ mqtt_client_t* InnerConnect(ccos_mqtt_connection* connection) {
 			FINFO("[SUCCESS] Connected in {$}ms after {$} attempts", total_time, attempt_count);
 
 			// 等待CONNACK处理完成:mqtt_connect()是异步的,返回0只表示CONNECT包已发送
-			// 需要等待MQTT Broker返回CONNACK并完成处理,否则后续的订阅可能无效
-			const int connackDelayMs = 500;
+			// 需要给MQTT内部线程一点时间处理CONNACK(通常几十毫秒内完成)
+			// 实际的订阅完成由WaitForSubscriptionComplete()保证,所以这里只需要短暂等待
+			const int connackDelayMs = 100;
 			FINFO("[MQTT] Waiting {$}ms for CONNACK processing...", connackDelayMs);
 			usleep(connackDelayMs * 1000);
 			FINFO("[MQTT] CONNACK wait complete, connection ready for subscriptions");
@@ -2974,10 +2995,9 @@ LOGICDEVICE_API int  SubscribeTopic(ccos_mqtt_connection* hConnection, const cha
 			{
 				//std::get<MQTT_CLT_ID>(*hConnection) = pMqttClient;
 				//resubscribe_topic(pMqttClient, pMqttClient->mqtt_conn_context);
-				//FWARN("mqtt_connect ret {$}  do mqtt resubscribe {$}", rc, std::get<CLINET_ID_ID>(*hConnection));
+				FINFO("Reconnected successfully, connection ready for subscriptions");
 			}
-			
-			usleep(2000 * 1000);
+			// 移除2秒延时:InnerConnect已等待CONNACK,WaitForSubscriptionComplete会确保订阅完成
 		}
 	} while (rc != MQTT_SUCCESS_ERROR && GetTickCount() - dwTick < 100);
 
@@ -2986,6 +3006,84 @@ LOGICDEVICE_API int  SubscribeTopic(ccos_mqtt_connection* hConnection, const cha
 	return 0;
 }
 
+// 等待订阅完成(轮询检查ack_list,确保SUBACK已收到)
+LOGICDEVICE_API bool WaitForSubscriptionComplete(ccos_mqtt_connection* hConnection, DWORD timeoutMs)
+{
+	if (hConnection == nullptr) {
+		FERROR("WaitForSubscriptionComplete: hConnection is nullptr");
+		return false;
+	}
+
+	mqtt_client_t* pMqttClient = (mqtt_client_t*)std::get<MQTT_CLT_ID>(*hConnection);
+	if (pMqttClient == nullptr) {
+		FERROR("WaitForSubscriptionComplete: pMqttClient is nullptr");
+		return false;
+	}
+
+	const std::string clientId = std::get<CLINET_ID_ID>(*hConnection);
+	FINFO("[WaitForSubscriptionComplete] {$} - Waiting for all subscriptions to complete (timeout: {$}ms)",
+		clientId, timeoutMs);
+
+	DWORD startTime = GetTickCount();
+	const DWORD pollIntervalMs = 10; // 每10ms轮询一次
+	int lastAckCount = -1;
+
+	while (GetTickCount() - startTime < timeoutMs) {
+		// 检查ack_handler_list中是否还有待处理的SUBACK
+		// mqtt_ack_handler_number记录了ack_list中的条目数量
+		int currentAckCount = pMqttClient->mqtt_ack_handler_number;
+
+		// 如果ack数量变化,输出日志
+		if (currentAckCount != lastAckCount) {
+			FINFO("[WaitForSubscriptionComplete] {$} - Pending ack count: {$}", clientId, currentAckCount);
+			lastAckCount = currentAckCount;
+		}
+
+		// 如果没有待处理的ack,说明所有订阅都已完成
+		if (currentAckCount == 0) {
+			DWORD elapsed = GetTickCount() - startTime;
+			FINFO("[WaitForSubscriptionComplete] {$} - All subscriptions completed in {$}ms",
+				clientId, elapsed);
+			return true;
+		}
+
+		// 短暂休眠后继续轮询
+		usleep(pollIntervalMs * 1000);
+	}
+
+	// 超时
+	int finalAckCount = pMqttClient->mqtt_ack_handler_number;
+	FERROR("[WaitForSubscriptionComplete] {$} - Timeout after {$}ms, still have {$} pending acks",
+		clientId, timeoutMs, finalAckCount);
+	return false;
+}
+
+// 等待Broker完成路由表更新 - 可配置延时
+// 通过环境变量 MQTT_BROKER_ROUTING_DELAY_MS 配置,默认200ms
+LOGICDEVICE_API DWORD WaitForBrokerRouting(const char* clientId)
+{
+	static int brokerDelayMs = -1; // 静态变量,只读取一次环境变量
+
+	if (brokerDelayMs == -1) {
+		const char* envDelay = getenv("MQTT_BROKER_ROUTING_DELAY_MS");
+		if (envDelay != nullptr && atoi(envDelay) > 0) {
+			brokerDelayMs = atoi(envDelay);
+			FINFO("[WaitForBrokerRouting] Using custom delay from environment: {$}ms", brokerDelayMs);
+		} else {
+			brokerDelayMs = 500; // 默认500ms
+			FINFO("[WaitForBrokerRouting] Using default delay: {$}ms (set MQTT_BROKER_ROUTING_DELAY_MS to customize)", brokerDelayMs);
+		}
+	}
+
+	if (clientId != nullptr) {
+		FINFO("[WaitForBrokerRouting] {$} - Waiting {$}ms for Broker to process subscription routing",
+			clientId, brokerDelayMs);
+	}
+
+	usleep(brokerDelayMs * 1000);
+	return brokerDelayMs;
+}
+
 //主题订阅取消
 LOGICDEVICE_API int  UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic)
 {
@@ -3337,10 +3435,16 @@ LOGICDEVICE_API int  ActionAndRespWithConnection(ccos_mqtt_connection* hConnecti
 /// <param name="hEvent"></param>
 /// <param name="dwWaitTime"></param>
 /// <returns></returns>
-LOGICDEVICE_API int  ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, 
+LOGICDEVICE_API int  ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext,
 	const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime)
 {
-	usleep(1000 * 1000);
+	// 等待之前的QoS消息握手完成,避免消息冲突导致丢失
+	// 这比固定延时更可靠:只在必要时等待,而不是盲目延时1秒
+	const std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
+	FINFO("[ActionAndRespWithConnDefalt] {$} - Waiting for previous QoS handshakes to complete before sending {$}", client_id, pAction);
+	if (!WaitForSubscriptionComplete(hConnection, 5000)) {
+		FWARN("[ActionAndRespWithConnDefalt] {$} - Some QoS handshakes still pending, proceeding anyway", client_id);
+	}
 
 	std::cout << CurrentDateTime() << std::get<CLINET_ID_ID>(*hConnection) << "\nAction : " << pAction << "  to " << pszTopic <<  "\n Action Body: " << " Context " << pContext->encode() << endl << endl; //<< req.encode()
 
@@ -3397,7 +3501,6 @@ LOGICDEVICE_API int  ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnecti
 	std::get<FILTER_HANDLE_ID>(*pfilter) = hEvent;
 	
 	std::get<FILTER_FUNC_ID>(*pfilter) = func;
-	std::string client_id = std::get<CLINET_ID_ID>(*hConnection);
 
 	PacketAnalizer::UpdatePacketTopic(&req, pszTopic, std::get<CLINET_ID_ID>(*hConnection).c_str());
 

+ 50 - 46
LogicDevice--mqtt/LogicDevice.h

@@ -14,13 +14,13 @@
 #include "MsgQueue.h"
 #include <list>
 
-#include <sys/time.h> // 添加Linux时间头文件
-#include <pthread.h>  // 添加Linux线程头文件
-#include <unistd.h>   // 添加unistd头文件
-#include <semaphore.h> // 添加信号量头文件
+#include <sys/time.h> // 娣诲姞Linux鏃堕棿澶存枃浠�
+#include <pthread.h>  // 娣诲姞Linux绾跨▼澶存枃浠�
+#include <unistd.h>   // 娣诲姞unistd澶存枃浠�
+#include <semaphore.h> // 娣诲姞淇″彿閲忓ご鏂囦欢
 
  
-// 添加 GUID 类型定义(如果未定义)
+// 娣诲姞 GUID 绫诲瀷瀹氫箟锛堝�鏋滄湭瀹氫箟锛�
 #ifndef GUID_DEFINED
 #define GUID_DEFINED
 typedef struct _GUID {
@@ -65,11 +65,11 @@ using ccos_mqtt_connection = std::tuple<
 	void*,                 // MSG_HOOK_ID (ccos_mqtt_msg_filter*)
 	std::string,           // CLINET_ID_ID
 	ccos_mqtt_callback,    // USER_MSG_CAKBCK_ID
-	pthread_t,             // CONNECTED_THREAD_ID (Linux使用pthread_t)
+	pthread_t,             // CONNECTED_THREAD_ID (Linux浣跨敤pthread_t)
 	CcosLock*,             // CONN_SEND_LOCK_ID 
 	mqtt_msg_list*,        // MSG_LIST_ID
-	sem_t* ,               // SEMAPHORE_HANDLE_ID (Linux使用sem_t*)
-	std::atomic<bool>      //线程运行状态标志
+	sem_t* ,               // SEMAPHORE_HANDLE_ID (Linux浣跨敤sem_t*)
+	std::atomic<bool>      //绾跨▼杩愯�鐘舵€佹爣蹇�
 >; 
 const int MQTT_CLT_ID = 0;
 const int MQTT_TIPIC_LIST_ID = 1;
@@ -87,7 +87,7 @@ const int THREAD_RUNNING_ID = 9;
 using ccos_mqtt_msg_filter_func = std::function<bool(ResDataObject*)>;
 using ccos_mqtt_msg_filter = std::tuple<
 	ccos_mqtt_msg_filter_func,  // FILTER_FUNC_ID
-	std::shared_ptr<LinuxEvent>,                // LinuxEvent (替代 HANDLE)
+	std::shared_ptr<LinuxEvent>,                // LinuxEvent (鏇夸唬 HANDLE)
 	ResDataObject*,             // FILTER_RES_OBJ_ID
 	ResDataObject*              // FILTER_RESPONS_OBJ_ID
 >;
@@ -99,7 +99,7 @@ const int FILTER_RESPONS_OBJ_ID = 3;
 static std::vector<std::string> m_subscribedTopics;
 static std::recursive_mutex g_subscribedTopicsMutex;
 /// <summary>
-/// 逻辑设备命令收发接口
+/// 閫昏緫璁惧�鍛戒护鏀跺彂鎺ュ彛
 /// </summary>
 class LOGICDEVICE_API LogicDeviceSysIF
 {
@@ -115,7 +115,7 @@ public:
 	//Command In and Out
 	//notify from lower layer
 	/// <summary>
-	/// notify from lower layer,设备上行命令,发送端为逻辑设备
+	/// notify from lower layer锛岃�澶囦笂琛屽懡浠わ紝鍙戦€佺�涓洪€昏緫璁惧�
 	/// </summary>
 	/// <param name="pCmd"></param>
 	/// <returns></returns>
@@ -123,7 +123,7 @@ public:
 
 	//notify to lower layer
 		/// <summary>
-	/// notify to lower layer,设备命令下行,接收端为逻辑设备
+	/// notify to lower layer锛岃�澶囧懡浠や笅琛岋紝鎺ユ敹绔�负閫昏緫璁惧�
 	/// </summary>
 	/// <param name="pCmd"></param>
 	/// <returns></returns>
@@ -147,37 +147,37 @@ protected:
 	ccos_topic_filter m_topicFilter;
 	DWORD  m_dwLastPacket;
 
-	// int	m_bAbstractOnline; //抽象设备是否在线,初始化启动是在线,可以通过Close下线
-	ResDataObject m_rsOnlineGroup; //在线设备组,在线状态,对应工作位
+	// int	m_bAbstractOnline; //鎶借薄璁惧�鏄�惁鍦ㄧ嚎锛屽垵濮嬪寲鍚�姩鏄�湪绾匡紝鍙�互閫氳繃Close涓嬬嚎
+	ResDataObject m_rsOnlineGroup; //鍦ㄧ嚎璁惧�缁�,鍦ㄧ嚎鐘舵€侊紝瀵瑰簲宸ヤ綔浣�
 
-	string m_strClientID;	//MQTT 的客户端连接的ID,一般用DEVICE的逻辑ID来表示 VendorID_ProductID_SerialID_LogicID;
-	std::vector<string> m_strSubTopics;		//要订阅的主题列表
+	string m_strClientID;	//MQTT 鐨勫�鎴风�杩炴帴鐨処D锛屼竴鑸�敤DEVICE鐨勯€昏緫ID鏉ヨ〃绀� VendorID_ProductID_SerialID_LogicID;
+	std::vector<string> m_strSubTopics;		//瑕佽�闃呯殑涓婚�鍒楄〃
 	bool m_bIsTreeRoot;
 	string m_strEBusRoot;
 
-	std::vector<LogicDevice*> m_subDevices; //按照原有Ebus组织形式的树
-	string m_strCCOSDevicePath; //设备完整CCOS路径 CCOS/DEVICE/Detector/RF/ecomdemo/demo/1234
-	string m_strAbstractPath;	//抽象设备路径:CCOS/DEVICE/Generator,可以被Open,可以执行Action和通知等各种CCOS消息
-	string m_strCCOSRoot;		//设备根   ,CCOS/DEVICE ,CCOS/Driver , CCOS/Host,只可以被Open
+	std::vector<LogicDevice*> m_subDevices; //鎸夌収鍘熸湁Ebus缁勭粐褰㈠紡鐨勬爲
+	string m_strCCOSDevicePath; //璁惧�瀹屾暣CCOS璺�緞 CCOS/DEVICE/Detector/RF/ecomdemo/demo/1234
+	string m_strAbstractPath;	//鎶借薄璁惧�璺�緞锛欳COS/DEVICE/Generator锛屽彲浠ヨ�Open锛屽彲浠ユ墽琛孉ction鍜岄€氱煡绛夊悇绉岰COS娑堟伅
+	string m_strCCOSRoot;		//璁惧�鏍�   锛孋COS/DEVICE 锛孋COS/Driver , CCOS/Host锛屽彧鍙�互琚玂pen
 
-	std::vector<LogicDevice*> m_subCcosDevices; //按照新的树形结构的树
-	LogicDevice* m_pParent; //父节点
-	string m_strDevicePath; //相对CCOS root 根节点的 当前设备的名称,用于构建主题URI,必须以/开头
+	std::vector<LogicDevice*> m_subCcosDevices; //鎸夌収鏂扮殑鏍戝舰缁撴瀯鐨勬爲
+	LogicDevice* m_pParent; //鐖惰妭鐐�
+	string m_strDevicePath; //鐩稿�CCOS root 鏍硅妭鐐圭殑 褰撳墠璁惧�鐨勫悕绉帮紝鐢ㄤ簬鏋勫缓涓婚�URI锛屽繀椤讳互/寮€澶�
 
 	ResDataObject m_Actions;
 	MsgQueue<ResDataObject>* m_pPacketReceivedQue;
-	MsgQueue<ResDataObject>* m_pPacketSendingQue;  //发送队列
+	MsgQueue<ResDataObject>* m_pPacketSendingQue;  //鍙戦€侀槦鍒�
 
 	string m_strCurTransaction;
 
 	//Logger *m_pLogger;
 	char *m_pDevInstance;
 	ResDataObject *m_pResErrorList;
-	std::shared_ptr<LinuxEvent> m_EvtNotify;//此事件的目的是当本地有事件发生,而这事件需要要往上传递,需要驱动线程主动下来GET.
-	sem_t m_SemphRequest; //请求到了
-	sem_t m_SemphPublish; //有包要发送
+	std::shared_ptr<LinuxEvent> m_EvtNotify;//姝や簨浠剁殑鐩�殑鏄�綋鏈�湴鏈変簨浠跺彂鐢燂紝鑰岃繖浜嬩欢闇€瑕佽�寰€涓婁紶閫掞紝闇€瑕侀┍鍔ㄧ嚎绋嬩富鍔ㄤ笅鏉�ET.
+	sem_t m_SemphRequest; //璇锋眰鍒颁簡
+	sem_t m_SemphPublish; //鏈夊寘瑕佸彂閫�
 
-	DriverDPC *m_pDrvDPC;//设备对象的HOST
+	DriverDPC *m_pDrvDPC;//璁惧�瀵硅薄鐨凥OST
 
 
 	RET_STATUS SystemLog(SYSLOGLEVEL Level, const char *pCode, const char* fmt, ...);
@@ -223,18 +223,18 @@ public:
 	DriverDPC *GetDrvDPC();
 
 
-	void SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot); //设置 设备URI根 路径 ,包括EBUS和CCOS两个
+	void SetClientRootID(const char* pszEBusRoot, const char* pszCCOSRoot); //璁剧疆 璁惧�URI鏍� 璺�緞 锛屽寘鎷珽BUS鍜孋COS涓や釜
 	virtual void OnSetClientID();
 	void SetTopicFilter(ccos_topic_filter filter);
 
-	virtual void SubscribeSelf(); //所有子类必须使用 SubscribeTopic 订阅自身topic及资源topic,如果不知道需要DPC代订阅
+	virtual void SubscribeSelf(); //鎵€鏈夊瓙绫诲繀椤讳娇鐢� SubscribeTopic 璁㈤槄鑷�韩topic鍙婅祫婧恡opic锛屽�鏋滀笉鐭ラ亾闇€瑕丏PC浠h�闃�
 	void SubScribeTopic(const char* pszTopic, bool bSubscribe = true);
-	void HelpSubscribeTopic(const char* pszTopic);  //代注册接口,注册在主MQTT连接上
+	void HelpSubscribeTopic(const char* pszTopic);  //浠f敞鍐屾帴鍙o紝娉ㄥ唽鍦ㄤ富MQTT杩炴帴涓�
 	ccos_mqtt_connection* CreateConnection(const char* pszClientID, ccos_mqtt_callback onmsg);
 
 	std::shared_ptr<LinuxEvent> GetEvtHandle();
 	void NotifyDrvThread();
-	virtual RET_STATUS SYSTEM_CALL EvtProcedure();//m_EvtNotify事件发生后,由上层驱动线程进行调入.
+	virtual RET_STATUS SYSTEM_CALL EvtProcedure();//m_EvtNotify浜嬩欢鍙戠敓鍚庯紝鐢变笂灞傞┍鍔ㄧ嚎绋嬭繘琛岃皟鍏�.
 
 	virtual int SYSTEM_CALL GetDevice_Thread_Priority();//return THREAD_PRIORITY_NONE
 
@@ -246,7 +246,7 @@ public:
 
 	//get device resource
 	/// <summary>
-	/// 获取设备资源 get device resource
+	/// 鑾峰彇璁惧�璧勬簮 get device resource
 	/// </summary>
 	/// <param name="pDeviceResource"></param>
 	/// <returns></returns>
@@ -256,7 +256,7 @@ public:
 	//normal sync routine,Request to device and response from device
 
 	/// <summary>
-	/// 设备的命令响应接口 normal sync routine,Request to device and response from device
+	/// 璁惧�鐨勫懡浠ゅ搷搴旀帴鍙� normal sync routine,Request to device and response from device
 	/// </summary>
 	/// <param name="pRequest"></param>
 	/// <param name="pResponse"></param>
@@ -268,25 +268,25 @@ public:
 
 	//notify to lower layer
 	/// <summary>
-	/// 发送给设备的命令 只是通知吗?notify to lower layer
+	/// 鍙戦€佺粰璁惧�鐨勫懡浠� 鍙�槸閫氱煡鍚楋紵notify to lower layer
 	/// </summary>
 	/// <param name="pCmd"></param>
 	/// <returns></returns>
 	virtual RET_STATUS SYSTEM_CALL CmdToLogicDev(ResDataObject PARAM_IN *pCmd) = 0;
 
-	//处理来自客户的请求
+	//澶勭悊鏉ヨ嚜瀹㈡埛鐨勮�姹�
 	virtual RET_STATUS ProcessSubscribeMsg(ResDataObject* pCmd);
 	virtual RET_STATUS ProcessSubscribeRequest(ResDataObject* pCmd);
 
 	virtual RET_STATUS ProcessRequest(ResDataObject* pCmd, PACKET_CMD cmd);
 	virtual RET_STATUS ProcessResponse(ResDataObject* pCmd, PACKET_CMD cmd);
 
-	//需要子类自己实现自己的订阅处理
+	//闇€瑕佸瓙绫昏嚜宸卞疄鐜拌嚜宸辩殑璁㈤槄澶勭悊
 	virtual RET_STATUS ProcessNotify(ResDataObject* pCmd, PACKET_CMD cmd);
 
 	//notify from lower layer
 	/// <summary>
-	/// 设备侧 来的 命令?只可能是通知吗?notify from lower layer
+	/// 璁惧�渚� 鏉ョ殑 鍛戒护锛熷彧鍙�兘鏄�€氱煡鍚楋紵notify from lower layer
 	/// </summary>
 	/// <param name="pCmd"></param>
 	/// <returns></returns>
@@ -318,26 +318,30 @@ public:
 
 
 
-//创建额外连接,需要提供回调函数
+//鍒涘缓棰濆�杩炴帴锛岄渶瑕佹彁渚涘洖璋冨嚱鏁�
 LOGICDEVICE_API ccos_mqtt_connection* NewConnection(const char* pszClientID, ccos_mqtt_callback onmsg, DWORD dwOpenTimeout = 10000, bool async = false);
 LOGICDEVICE_API void CloseConnection(ccos_mqtt_connection* hConnection);
 LOGICDEVICE_API void ResetConnection(ccos_mqtt_connection* hConnection);
-//主动订阅主题
+//涓诲姩璁㈤槄涓婚�
 LOGICDEVICE_API int  SubscribeTopic(ccos_mqtt_connection* hConnection, const char* pszTopic, bool isShare = false);
-//主题订阅取消
+//绛夊緟璁㈤槄瀹屾垚锛圫UBACK鏀跺埌锛�- 杩斿洖true琛ㄧず鎴愬姛锛宖alse琛ㄧず瓒呮椂
+LOGICDEVICE_API bool WaitForSubscriptionComplete(ccos_mqtt_connection* hConnection, DWORD timeoutMs = 5000);
+//绛夊緟Broker瀹屾垚璺�敱琛ㄦ洿鏂� - 鍙�€氳繃鐜��鍙橀噺MQTT_BROKER_ROUTING_DELAY_MS閰嶇疆锛岄粯璁�500ms
+LOGICDEVICE_API DWORD WaitForBrokerRouting(const char* clientId = nullptr);
+//涓婚�璁㈤槄鍙栨秷
 LOGICDEVICE_API int  UnSubscribe(ccos_mqtt_connection* hConnection, const char* pszTopic);
-//往指定主题发送CCOS协议包整包,使用临时创建连接,仅发送,不接收
+//寰€鎸囧畾涓婚�鍙戦€丆COS鍗忚�鍖呮暣鍖咃紝浣跨敤涓存椂鍒涘缓杩炴帴锛屼粎鍙戦€侊紝涓嶆帴鏀�
 LOGICDEVICE_API int  PublishMsg(ResDataObject* pCmd, const char* pszTopic, const char* pszSenderName, DWORD dwTimeout = 3);
-//往指定主题发送CCOS协议包整包,使用已创建的连接发送
+//寰€鎸囧畾涓婚�鍙戦€丆COS鍗忚�鍖呮暣鍖咃紝浣跨敤宸插垱寤虹殑杩炴帴鍙戦€�
 LOGICDEVICE_API int  PublishAction(ResDataObject* pAction, const char* pszTopic, ccos_mqtt_connection* hConnection = nullptr, DWORD dwTimeout = 3);
 
-//使用现有连接 往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,超时没收到应答返回失败,复用链接时须小心,该函数会
+//浣跨敤鐜版湁杩炴帴 寰€鎸囧畾涓婚�鍙戦€丄ction鍖咃紝鎼哄甫鍙傛暟锛屽苟鎸囧畾绛斿�鐨凾opic锛屽悓姝ョ瓑寰卹esp锛岃秴鏃舵病鏀跺埌搴旂瓟杩斿洖澶辫触锛屽�鐢ㄩ摼鎺ユ椂椤诲皬蹇冿紝璇ュ嚱鏁颁細
 //LOGICDEVICE_API int  ActionAndRespWithConnection(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
 //	DWORD dwWaitTime);
 
-//使用现有连接 往指定主题发送Action包,携带参数,复用连接订阅的默认Topic,通过提供的消息钩子 等待resp,超时没收到应答返回失败,
+//浣跨敤鐜版湁杩炴帴 寰€鎸囧畾涓婚�鍙戦€丄ction鍖咃紝鎼哄甫鍙傛暟锛屽�鐢ㄨ繛鎺ヨ�闃呯殑榛樿�Topic锛岄€氳繃鎻愪緵鐨勬秷鎭�挬瀛� 绛夊緟resp锛岃秴鏃舵病鏀跺埌搴旂瓟杩斿洖澶辫触锛�
 LOGICDEVICE_API int  ActionAndRespWithConnDefalt(ccos_mqtt_connection* hConnection, const char* pAction, ResDataObject& req, ResDataObject* pContext, const char* pszTopic, ResDataObject& resObj, DWORD dwWaitTime);
 
-//往指定主题发送Action包,携带参数,并指定答复的Topic,同步等待resp,临时创建连接,并等待应答,超时没收到应答返回失败
+//寰€鎸囧畾涓婚�鍙戦€丄ction鍖咃紝鎼哄甫鍙傛暟锛屽苟鎸囧畾绛斿�鐨凾opic锛屽悓姝ョ瓑寰卹esp锛屼复鏃跺垱寤鸿繛鎺ワ紝骞剁瓑寰呭簲绛旓紝瓒呮椂娌℃敹鍒板簲绛旇繑鍥炲け璐�
 LOGICDEVICE_API int  ActionAndResp(const char* pAction, ResDataObject* pContext, const char* pszTopic, const char* pszRespTopic, ResDataObject& resObj,
 	DWORD dwWaitTime, const char* pszSenderName);

+ 109 - 37
LogicDevice--mqtt/ReadMe.txt

@@ -1,54 +1,126 @@
-2023-1-4 V1.0.1.7
-更新模块:LogicDeviceX64.dll
-更新人员:张锋
+ 使用方法
 
-更新功能:
-1、在system日志接口中,增加base64编码;
-==========================================================================================
+  默认行为(无需配置)
 
-2022-9-19 V1.0.1.6
-更新模块:LogicDeviceX64.dll
-更新人员:姜旺
+  # 直接运行,使用默认200ms延时
+  ./your_program
 
-更新功能:
-1、1.设备错误消息添加/删除的通知增加 InstanceId 字段;
-==========================================================================================
+  日志输出:
+  [WaitForBrokerRouting] Using default delay: 200ms (set MQTT_BROKER_ROUTING_DELAY_MS to customize)
 
-2022-8-10 V1.0.1.5
-更新模块:LogicDeviceX64.dll
-更新人员:张锋
+  自定义延时
 
-更新功能:
-1、在进行多字节转宽字符时,使用UTF8转码;
+  Linux/Unix
 
-==========================================================================================
-========================================================================
-    动态链接库:LogicDevice 项目概述
-========================================================================
+  # 设置为100ms(适用于高性能Broker)
+  export MQTT_BROKER_ROUTING_DELAY_MS=100
+  ./your_program
 
-应用程序向导已为您创建了此 LogicDevice DLL。
+  # 设置为500ms(适用于嵌入式或低性能设备)
+  export MQTT_BROKER_ROUTING_DELAY_MS=500
+  ./your_program
 
-本文件概要介绍组成 LogicDevice 应用程序的每个文件的内容。
+  # 临时设置(仅当次运行)
+  MQTT_BROKER_ROUTING_DELAY_MS=50 ./your_program
 
+  Windows
 
-LogicDevice.vcxproj
-    这是使用应用程序向导生成的 VC++ 项目的主项目文件,其中包含生成该文件的 Visual C++ 的版本信息,以及有关使用应用程序向导选择的平台、配置和项目功能的信息。
+  REM 设置为100ms
+  set MQTT_BROKER_ROUTING_DELAY_MS=100
+  your_program.exe
 
-LogicDevice.vcxproj.filters
-    这是使用“应用程序向导”生成的 VC++ 项目筛选器文件。它包含有关项目文件与筛选器之间的关联信息。在 IDE 中,通过这种关联,在特定节点下以分组形式显示具有相似扩展名的文件。例如,“.cpp”文件与“源文件”筛选器关联。
+  REM 或在系统环境变量中永久设置
 
-LogicDevice.cpp
-    这是主 DLL 源文件。
+  配置建议
 
-/////////////////////////////////////////////////////////////////////////////
-其他标准文件:
+  根据场景选择延时
 
-StdAfx.h, StdAfx.cpp
-    这些文件用于生成名为 LogicDevice.pch 的预编译头 (PCH) 文件和名为 StdAfx.obj 的预编译类型文件。
+  | 场景        | 推荐值        | 说明              |
+  |-----------|------------|-----------------|
+  | 开发测试      | 50-100ms   | 快速迭代,降低等待时间     |
+  | 生产环境(高性能) | 100-150ms  | 现代服务器+高性能Broker |
+  | 生产环境(默认)  | 200ms      | 兼顾稳定性和性能        |
+  | 嵌入式设备     | 300-500ms  | 低性能设备或高负载       |
+  | 极端高负载     | 500-1000ms | 系统资源紧张时         |
 
-/////////////////////////////////////////////////////////////////////////////
-其他注释:
+  调优步骤
 
-应用程序向导使用“TODO:”注释来指示应添加或自定义的源代码部分。
+  1. 初始测试:使用默认200ms,观察是否有消息丢失
+  2. 逐步减小:如果稳定,尝试150ms → 100ms → 50ms
+  3. 压力测试:在最小延时下进行快速重启、并发测试
+  4. 确定最优:找到不丢消息的最小延时值
 
-/////////////////////////////////////////////////////////////////////////////
+  验证方法
+
+  # 1. 快速重启测试(验证稳定性)
+  for i in {1..10}; do
+      ./your_program &
+      PID=$!
+      sleep 2
+      kill $PID
+      sleep 1
+  done
+
+  # 2. 检查日志中是否有 "timeout" 或 "not received"
+  grep -i "timeout\|not received" /path/to/logfile
+
+  # 3. 如果有超时,增加延时;如果无超时,可尝试减小延时
+
+  技术原理
+
+  为什么需要这个延时?
+
+  MQTT订阅的两个阶段:
+  1. 客户端 → Broker: SUBSCRIBE
+  2. Broker → 客户端: SUBACK  ← WaitForSubscriptionComplete确认这一步
+  3. Broker内部更新路由表     ← WaitForBrokerRouting等待这一步
+  4. 订阅生效,可以接收消息   ✅
+
+  问题:步骤2和步骤3之间的时间间隔不确定,取决于:
+  - Broker的实现(Mosquitto/EMQ/HiveMQ等)
+  - 系统负载(CPU/内存/网络)
+  - 订阅数量(单个vs批量)
+
+  为什么不能主动确认?
+
+  MQTT协议限制:
+  - SUBACK只表示"Broker收到SUBSCRIBE"
+  - 协议没有"路由已生效"的确认机制
+  - 无法通过发送测试消息验证(会被当作普通消息处理)
+
+  为什么设计成可配置?
+
+  不同环境差异巨大:
+  - 云端高性能服务器:20-50ms即可
+  - 嵌入式设备(如你的OK3588):100-200ms
+  - 虚拟机或Docker:200-500ms
+
+  固定值无法适应所有场景,可配置提供了灵活性。
+
+  日志示例
+
+  默认配置
+
+  [WaitForBrokerRouting] Using default delay: 200ms (set MQTT_BROKER_ROUTING_DELAY_MS to customize)
+  [WaitForBrokerRouting] LogicClient_xxx - Waiting 200ms for Broker to process subscription routing
+
+  自定义配置
+
+  [WaitForBrokerRouting] Using custom delay from environment: 100ms
+  [WaitForBrokerRouting] LogicClient_xxx - Waiting 100ms for Broker to process subscription routing
+
+  故障排查
+
+  问题:仍然有消息丢失
+
+  可能原因:
+  1. 延时不够 → 增加MQTT_BROKER_ROUTING_DELAY_MS到300或500
+  2. Broker性能瓶颈 → 检查Broker日志和资源使用
+  3. 网络延迟 → 检查网络质量(ping、丢包率)
+
+  问题:启动太慢
+
+  解决方法:
+  1. 减小延时(如100ms)
+  2. 如果减小后丢消息,说明Broker性能不足
+  3. 优化Broker配置或升级硬件

+ 1 - 1
OemLayer/NewModelDPC/NewModelDPC.cpp

@@ -655,7 +655,7 @@ DWORD SYSTEM_CALL NewModelDPC::OnNotify(std::vector<std::shared_ptr<LinuxEvent>>
 				}
 			}
 			//此处从程序启动后会一直打印,重复调用work函数
-			FINFO("ImageSaveDev->work()");
+			//FINFO("ImageSaveDev->work()");
 			//((ImageSaveDev*)m_pImgSaveDev)->Work();
 		}
 	}