Przeglądaj źródła

MQTT发送时检测和上一条消息的间隔小于50ms就进行延时达到50ms后再进行发送!

lwk 2 tygodni temu
rodzic
commit
908aa16133
1 zmienionych plików z 34 dodań i 21 usunięć
  1. 34 21
      LogicDevice--mqtt/LogicDevice.cpp

+ 34 - 21
LogicDevice--mqtt/LogicDevice.cpp

@@ -2375,15 +2375,10 @@ void msgarrivd(void* client, message_data_t* message)
 		// 使用异步处理避免阻塞MQTT接收流程
 		try {
 			AsyncMsgHandler::getInstance().handleMessageAsync(onmsg, req, topic, connection);
-			std::cout << logPrefix << clientId << " 消息已提交到异步处理队列: " << topic << std::endl;
 		}
 		catch (const std::exception& e) {
-			std::cout << logPrefix << clientId << " 异步消息处理提交失败: " << e.what()
-			         << ", Topic: " << topic << std::endl;
 		}
 		catch (...) {
-			std::cout << logPrefix << clientId << " 异步消息处理提交出现未知错误, Topic: "
-			         << topic << std::endl;
 		}
 	}
 	else {
@@ -2406,8 +2401,11 @@ void* MqttSendThreadFunc(void* pPara)
 	sem_t* hSemaphore = std::get<SEMAPHORE_HANDLE_ID>(*connection);
 	mqtt_topic_list* pTopicList = std::get<MQTT_TIPIC_LIST_ID>(*connection);
 
+	// 添加延时控制相关变量
+	static DWORD lastSendTime = 0;
+	const DWORD MIN_SEND_INTERVAL = 50; // 最小发送间隔50ms
 	// 线程启动日志
-	std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
+	//std::cout << "[" << client_id << "] The MQTT sending thread is started" << std::endl;
 
 	while (std::get<THREAD_RUNNING_ID>(*connection)) {
 		struct timespec ts;
@@ -2418,13 +2416,9 @@ void* MqttSendThreadFunc(void* pPara)
 		//std::cout << "[" << client_id << "] Wait for the message semaphore..." << std::endl;
 
 		if (sem_timedwait(hSemaphore, &ts) == 0) {
-			std::cout << "[" << client_id << "] Received the message semaphore and ready to process the message" << std::endl;
-			
 			pLock->Thread_Lock();
-			std::cout << "[" << client_id << "] Successfully obtained the sending lock" << std::endl;
 
 			if (pList->empty()) {
-				std::cout << "[" << client_id << "] The message list is empty, release the lock." << std::endl;
 				pLock->Thread_UnLock();
 				continue;
 			}
@@ -2433,19 +2427,38 @@ void* MqttSendThreadFunc(void* pPara)
 			pList->pop_front();
 			pLock->Thread_UnLock();
 
-			std::cout << "[" << client_id << "] Get a message from the message list, the number of remaining messages: " << pList->size() << std::endl;
-			std::string m_strSendMessage;
-			for (int x = 0; x < pMsg->size(); x++) {
-				const char* pTopic = pMsg->GetKey(x);
-				m_strSendMessage = (std::string)(*pMsg)[pTopic];
+			if (pMsg->size() > 0) {
+				const char* pTopic = pMsg->GetKey(0);
+				std::string m_strSendMessage = (std::string)(*pMsg)[pTopic];
+
+				// ========== 简化延时控制逻辑 ==========
+				DWORD now = GetTickCount();
+				DWORD elapsed = (lastSendTime == 0) ? MIN_SEND_INTERVAL + 1 : (now - lastSendTime);
+
+				// 如果上次发送在最小间隔内,需要延时
+				if (elapsed < MIN_SEND_INTERVAL) {
+					DWORD sleepMs = MIN_SEND_INTERVAL - elapsed;
+					FINFO("[MqttSendThreadFunc] {$} - Sleeping {$}ms to avoid QoS handshake conflict, elapsed: {$}ms",
+						client_id, sleepMs, elapsed);
+
+					if (sleepMs > 0) {
+						// 使用usleep进行毫秒级延时
+						usleep(sleepMs * 1000);
+						// 更新当前时间(因为经过了延时)
+						now = GetTickCount();
+					}
+				}
 
-				std::cout<< CurrentDateTime() << " [" << client_id << "] Prepare to publish messages to the topic: " << pTopic
-					<< ", Message length: " << m_strSendMessage.length() << "Byte" << std::endl;
-				PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id, pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
+				// 更新最后发送时间
+				lastSendTime = now;
+				// ========== 延时控制逻辑结束 ==========
+
+				// 发送消息
+				PublishActionWithoutLock(m_strSendMessage, pTopic, pConn, client_id,
+					pTopicList->size() <= 0 ? QOS2 : MQTT_QOS);
 			}
-			std::cout << "[" << client_id << "] The message processing is completed, and the message object has been released." << std::endl;
+
 			delete pMsg;
-			//std::cout << "[" << client_id << "] Release the sending lock" << std::endl;
 		}
 		else if (errno == ETIMEDOUT) {
 			//std::cout << "[" << client_id << "] The semaphore wait timed out, continue waiting." << std::endl;
@@ -2453,7 +2466,7 @@ void* MqttSendThreadFunc(void* pPara)
 		}
 		else {
 			// 其他错误情况
-			std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
+			//std::cout << "[" << client_id << "] Semaphore wait error, error code: " << errno << std::endl;
 			continue;
 		}
 	}