Forráskód Böngészése

refactor (1.22.0->1.22.1): 增强MQTT服务连接稳定性和事件管理

- 在 mqttService.ts 中优化连接配置,添加心跳和自动重连机制
- 在 mqttServiceForDevice.ts 中同步优化连接管理流程
- 完善事件监听器(reconnect、close、offline、disconnect)
- 增强日志记录系统,添加时间戳和状态图标
- 改进错误处理,将错误事件发送到应用层
- 实现优雅的连接关闭逻辑
- 添加连接状态管理,防止重复连接

改动文件:
- src/domain/mqttService.ts
- src/domain/mqttServiceForDevice.ts
- CHANGELOG.md
- package.json
dengdx 3 hete
szülő
commit
3e578b4cdb
4 módosított fájl, 268 hozzáadás és 18 törlés
  1. 54 0
      CHANGELOG.md
  2. 1 1
      package.json
  3. 108 9
      src/domain/mqttService.ts
  4. 105 8
      src/domain/mqttServiceForDevice.ts

+ 54 - 0
CHANGELOG.md

@@ -2,6 +2,60 @@
 
 本项目的所有重要变更都将记录在此文件中。
 
+## [1.22.1] - 2025-12-23 13:33
+
+### 变更 (Changed)
+- **MQTT服务连接稳定性和事件管理增强** - 系统性优化两个MQTT服务的连接管理和事件处理机制
+  - 增强MQTT连接配置,添加keepalive(120秒)和reconnectPeriod(5秒)参数
+  - 为每个客户端生成唯一ID,避免连接冲突
+  - 添加连接状态管理(isConnecting标志),防止重复连接调用
+  - 实现旧连接清理机制,调用removeAllListeners()防止内存泄漏
+  - 完善连接生命周期事件监听:reconnect、close、offline、disconnect
+  - 增强日志记录系统,添加ISO时间戳和状态emoji图标(✅🔄🚪📡⚠️❌)
+  - 改进错误处理,将错误事件发送到应用层(MQTT_ERROR、DEVICE_MQTT_ERROR)
+  - 实现优雅的连接关闭逻辑,支持错误处理和资源释放
+  - 为两个MQTT服务(mqttService和mqttServiceForDevice)同步实施相同优化
+
+**核心改进:**
+- 连接稳定性:通过心跳和自动重连机制减少连接中断
+- 内存管理:清理旧连接和事件监听器,避免内存泄漏
+- 状态可见性:完整的事件监听和日志记录,便于监控和调试
+- 错误恢复:优雅的错误处理和重连机制,提升系统健壮性
+- 资源管理:正确的连接关闭流程,确保资源释放
+
+**技术实现:**
+- 连接配置优化:
+  - 添加 keepalive: 120 和 reconnectPeriod: 5000
+  - 使用随机生成的唯一 clientId
+- 状态管理:
+  - 添加 isConnecting 标志防止重复调用
+  - mqttClient 类型改为 any 并初始化为 null
+- 连接生命周期:
+  - startListening 前清理旧连接
+  - connect 事件接收 connack 参数并发送成功事件
+  - 添加 reconnect、close、offline、disconnect 事件监听
+- 日志增强:
+  - 使用 ISO 时间戳函数 `() => new Date().toISOString()`
+  - 添加 emoji 图标标识不同状态
+  - 错误日志只记录第一行堆栈信息
+- 错误处理:
+  - 发送 MQTT_CONNECTED/DEVICE_MQTT_CONNECTED 事件
+  - 发送 MQTT_ERROR/DEVICE_MQTT_ERROR 事件到应用层
+  - error 事件重置 isConnecting 标志
+- 优雅关闭:
+  - 检查 mqttClient 是否存在
+  - 调用 removeAllListeners() 清理监听器
+  - 使用 end(false, callback) 优雅关闭
+  - catch 块中强制关闭 end(true)
+
+**改动文件:**
+- src/domain/mqttService.ts
+- src/domain/mqttServiceForDevice.ts
+- CHANGELOG.md
+- package.json (版本更新: 1.22.0 -> 1.22.1)
+
+---
+
 ## [1.22.0] - 2025-12-22 19:27
 
 ### 新增 (Added)

+ 1 - 1
package.json

@@ -1,6 +1,6 @@
 {
   "name": "zsis",
-  "version": "1.22.0",
+  "version": "1.22.1",
   "private": true,
   "description": "医学成像系统",
   "main": "main.js",

+ 108 - 9
src/domain/mqttService.ts

@@ -16,12 +16,14 @@ const MQTT_GLOBAL_TOPIC = 'MODULE/TASK/STATUS/GLOBAL';
 const options = {
   clean: true, // true: 清除会话, false: 保留会话
   connectTimeout: 4000, // 超时时间
-  // 认证信息
-  // clientId: 'mqttx_58db45cc7777',
   username: 'mqttx1112',
   password: '',
+  keepalive: 120,        // 心跳间隔 120 秒(更宽松,减少超时)
+  reconnectPeriod: 5000, // 断线后 5 秒自动重连
+  clientId: `mqtt_${Math.random().toString(16).substring(2, 10)}`, // 唯一客户端 ID
 };
-let mqttClient;
+let mqttClient: any = null;
+let isConnecting = false; // 连接状态标志,防止重复调用
 
 const handleMqttMessage = (message: MqttMessage) => {
   switch (message.status) {
@@ -58,12 +60,46 @@ const handleMqttMessage = (message: MqttMessage) => {
 
 const startListening = async () => {
   try {
+    // 防止重复调用
+    if (isConnecting) {
+      console.log('[mqttService] 连接正在进行中,忽略重复调用');
+      return;
+    }
+
     const MQTT_BROKER_URL = getMqttBrokerUrl();
-    console.log(`[mqttService] startListening with broker: ${MQTT_BROKER_URL}`);
+    const timestamp = () => new Date().toISOString();
+    
+    console.log(`[${timestamp()}] [mqttService] startListening with broker: ${MQTT_BROKER_URL}`);
+
+    // 清理旧连接
+    if (mqttClient) {
+      console.log(`[${timestamp()}] [mqttService] 检测到旧连接,正在清理...`);
+      try {
+        // 移除所有事件监听器,防止内存泄漏
+        mqttClient.removeAllListeners();
+        // 强制关闭连接
+        mqttClient.end(true);
+        mqttClient = null;
+        console.log(`[${timestamp()}] [mqttService] 旧连接已清理`);
+      } catch (cleanupError) {
+        console.error(`[${timestamp()}] [mqttService] 清理旧连接时出错:`, cleanupError);
+      }
+    }
 
+    isConnecting = true;
+    
     mqttClient = mqtt.connect(MQTT_BROKER_URL, options);
-    mqttClient.on('connect', () => {
-      console.log('[mqttService] Connected to MQTT broker');
+    mqttClient.on('connect', (connack) => {
+      isConnecting = false;
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttService] ✅ Connected to MQTT broker`, {
+        sessionPresent: connack.sessionPresent,
+        returnCode: connack.returnCode
+      });
+      
+      // 发送连接成功事件到应用层
+      emitter.emit('MQTT_CONNECTED');
+      
       mqttClient.subscribe(MQTT_TOPIC, (err) => {
         if (err) {
           console.error('[mqttService] Failed to subscribe to topic', err);
@@ -76,8 +112,48 @@ const startListening = async () => {
       });
     });
 
+    // 重连事件
+    mqttClient.on('reconnect', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttService] 🔄 正在尝试重连...`);
+      emitter.emit('MQTT_RECONNECTING');
+    });
+
+    // 连接关闭事件
+    mqttClient.on('close', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttService] 🚪 连接已关闭`);
+      emitter.emit('MQTT_DISCONNECTED');
+    });
+
+    // 离线事件
+    mqttClient.on('offline', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttService] 📡 客户端离线`);
+      emitter.emit('MQTT_OFFLINE');
+    });
+
+    // 断开连接事件
+    mqttClient.on('disconnect', (packet) => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttService] ⚠️ 主动断开连接`, packet);
+    });
+
+    // 错误事件
     mqttClient.on('error', (error) => {
-      console.error('[mqttService] 连接失败:', error);
+      isConnecting = false;
+      const timestamp = () => new Date().toISOString();
+      console.error(`[${timestamp()}] [mqttService] ❌ 连接失败:`, {
+        message: error.message,
+        code: error.code,
+        stack: error.stack?.split('\n')[0] // 只记录第一行堆栈
+      });
+      
+      // 发送错误事件到应用层
+      emitter.emit('MQTT_ERROR', {
+        message: error.message,
+        code: error.code
+      });
     });
 
     mqttClient.on('message', (topic, message) => {
@@ -100,7 +176,30 @@ const startListening = async () => {
 };
 
 const stopListening = () => {
-  mqttClient.end();
-  console.log('Stopped listening to MQTT broker');
+  const timestamp = () => new Date().toISOString();
+  
+  if (!mqttClient) {
+    console.log(`[${timestamp()}] [mqttService] 没有活跃的连接需要关闭`);
+    return;
+  }
+  
+  console.log(`[${timestamp()}] [mqttService] 正在停止监听...`);
+  
+  try {
+    // 移除所有事件监听器
+    mqttClient.removeAllListeners();
+    // 优雅关闭连接
+    mqttClient.end(false, () => {
+      console.log(`[${timestamp()}] [mqttService] ✅ 已停止监听 MQTT broker`);
+      mqttClient = null;
+      isConnecting = false;
+    });
+  } catch (error) {
+    console.error(`[${timestamp()}] [mqttService] 停止监听时出错:`, error);
+    // 强制关闭
+    mqttClient.end(true);
+    mqttClient = null;
+    isConnecting = false;
+  }
 };
 export { mqttClient, handleMqttMessage, startListening, stopListening };

+ 105 - 8
src/domain/mqttServiceForDevice.ts

@@ -102,9 +102,13 @@ const options = {
   connectTimeout: 4000,
   username: 'mqttx1112',
   password: '',
+  keepalive: 120,        // 心跳间隔 120 秒(更宽松,减少超时)
+  reconnectPeriod: 5000, // 断线后 5 秒自动重连
+  clientId: `device_mqtt_${Math.random().toString(16).substring(2, 10)}`, // 唯一客户端 ID
 };
 
-let mqttClient;
+let mqttClient: any = null;
+let isConnecting = false; // 连接状态标志,防止重复调用
 
 const handleMqttMessage = (message: MqttMessage) => {
   console.log(`[mqttServiceForDevice] 收到message.CONTEXT ${message.CONTEXT}`);
@@ -139,12 +143,46 @@ const handleMqttMessageFromDetector = (message: MqttMessage) => {
 
 const startListening = async () => {
   try {
+    // 防止重复调用
+    if (isConnecting) {
+      console.log('[mqttServiceForDevice] 连接正在进行中,忽略重复调用');
+      return;
+    }
+
     const MQTT_BROKER_URL = getMqttBrokerUrl();
-    console.log(`[mqttServiceForDevice] startListening with broker: ${MQTT_BROKER_URL}`);
+    const timestamp = () => new Date().toISOString();
+    
+    console.log(`[${timestamp()}] [mqttServiceForDevice] startListening with broker: ${MQTT_BROKER_URL}`);
+
+    // 清理旧连接
+    if (mqttClient) {
+      console.log(`[${timestamp()}] [mqttServiceForDevice] 检测到旧连接,正在清理...`);
+      try {
+        // 移除所有事件监听器,防止内存泄漏
+        mqttClient.removeAllListeners();
+        // 强制关闭连接
+        mqttClient.end(true);
+        mqttClient = null;
+        console.log(`[${timestamp()}] [mqttServiceForDevice] 旧连接已清理`);
+      } catch (cleanupError) {
+        console.error(`[${timestamp()}] [mqttServiceForDevice] 清理旧连接时出错:`, cleanupError);
+      }
+    }
 
+    isConnecting = true;
+    
     mqttClient = mqtt.connect(MQTT_BROKER_URL, options);
-    mqttClient.on('connect', () => {
-      console.log(`[mqttServiceForDevice] Connected to MQTT broker 目标broker 是: ${MQTT_BROKER_URL}`);
+    mqttClient.on('connect', (connack) => {
+      isConnecting = false;
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ Connected to MQTT broker: ${MQTT_BROKER_URL}`, {
+        sessionPresent: connack.sessionPresent,
+        returnCode: connack.returnCode
+      });
+      
+      // 发送连接成功事件到应用层
+      emitter.emit('DEVICE_MQTT_CONNECTED');
+      
       mqttClient.subscribe(MQTT_TOPIC, (err) => {
         if (err) {
           console.error(
@@ -221,11 +259,47 @@ const startListening = async () => {
       });
     });
 
+    // 重连事件
+    mqttClient.on('reconnect', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttServiceForDevice] 🔄 正在尝试重连...`);
+      emitter.emit('DEVICE_MQTT_RECONNECTING');
+    });
+
+    // 连接关闭事件
+    mqttClient.on('close', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttServiceForDevice] 🚪 连接已关闭`);
+      emitter.emit('DEVICE_MQTT_DISCONNECTED');
+    });
+
+    // 离线事件
+    mqttClient.on('offline', () => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttServiceForDevice] 📡 客户端离线`);
+      emitter.emit('DEVICE_MQTT_OFFLINE');
+    });
+
+    // 断开连接事件
+    mqttClient.on('disconnect', (packet) => {
+      const timestamp = () => new Date().toISOString();
+      console.log(`[${timestamp()}] [mqttServiceForDevice] ⚠️ 主动断开连接`, packet);
+    });
+
+    // 错误事件
     mqttClient.on('error', (error) => {
-      console.error('[mqttServiceForDevice] 设备mqtt出现错误:', {
+      isConnecting = false;
+      const timestamp = () => new Date().toISOString();
+      console.error(`[${timestamp()}] [mqttServiceForDevice] ❌ 设备mqtt出现错误:`, {
         message: error.message,
         code: error.code,
-        stack: error.stack
+        stack: error.stack?.split('\n')[0] // 只记录第一行堆栈
+      });
+      
+      // 发送错误事件到应用层
+      emitter.emit('DEVICE_MQTT_ERROR', {
+        message: error.message,
+        code: error.code
       });
     });
 
@@ -282,8 +356,31 @@ const startListening = async () => {
 };
 
 const stopListening = () => {
-  mqttClient.end();
-  console.log('[mqttServiceForDevice] Stopped listening to MQTT broker');
+  const timestamp = () => new Date().toISOString();
+  
+  if (!mqttClient) {
+    console.log(`[${timestamp()}] [mqttServiceForDevice] 没有活跃的连接需要关闭`);
+    return;
+  }
+  
+  console.log(`[${timestamp()}] [mqttServiceForDevice] 正在停止监听...`);
+  
+  try {
+    // 移除所有事件监听器
+    mqttClient.removeAllListeners();
+    // 优雅关闭连接
+    mqttClient.end(false, () => {
+      console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ 已停止监听 MQTT broker`);
+      mqttClient = null;
+      isConnecting = false;
+    });
+  } catch (error) {
+    console.error(`[${timestamp()}] [mqttServiceForDevice] 停止监听时出错:`, error);
+    // 强制关闭
+    mqttClient.end(true);
+    mqttClient = null;
+    isConnecting = false;
+  }
 };
 
 export { startListening, stopListening };