import mqtt from 'mqtt'; import emitter from '../utils/eventEmitter'; import { getMqttBrokerUrl } from '@/API/config'; interface MqttMessage { IDX: string; TYPE: string; CMD: string; KEY: string; CONTEXT: string; Ver: string; Transaction: string; HANDLE: { ROUTE: string; FLAGS: string; LANG: string; HANDLEID: string; OWNERID: { EBUSID: string; MACHINEID: string; PROCID: string; ADDR: string; }; DEVID: { EBUSID: string; MACHINEID: string; PROCID: string; ADDR: string; }; }; TOPIC: string; PUBLISH: string; } const MQTT_TOPIC_2 = 'CCOS/DEVICE/Generator/Notify/GENERATORSYNCSTATE'; //发生器 const MQTT_TOPIC = 'CCOS/DEVICE/Detector/Notify/XrayON'; const MQTT_TOPIC_DETECTOR = 'CCOS/DEVICE/Detector/Notify/DetectorStatus'; //探测器 const MQTT_KV = 'CCOS/DEVICE/Generator/Notify/KV'; //KV值 const MQTT_MAS = 'CCOS/DEVICE/Generator/Notify/MAS'; const MQTT_MA = 'CCOS/DEVICE/Generator/Notify/MA'; const MQTT_MS = 'CCOS/DEVICE/Generator/Notify/MS'; const MQTT_TECHMODE = 'CCOS/DEVICE/Generator/Notify/TECHMODE'; const MQTT_ERRORLIST_NOTIFY = 'CCOS/DEVICE/Generator/Notify/ErrorList'; interface ErrorListMessage { IDX: string; TYPE: string; CMD: string; KEY: string; CONTEXT: Record< string, { CodeID: string; Type: string; // 0 表示error , 1 表示 warning Level: string; Resouceinfo: string; Description: string; CreationTime: string; AppId: string; InstanceId: string; } >; Ver: string; Transaction: string; HANDLE: { ROUTE: string; FLAGS: string; LANG: string; HANDLEID: string; OWNERID: { EBUSID: string; MACHINEID: string; PROCID: string; ADDR: string; }; DEVID: { EBUSID: string; MACHINEID: string; PROCID: string; ADDR: string; }; }; TOPIC: string; PUBLISH: string; } const handleErrorListMessage = (message: ErrorListMessage) => { console.log( `[mqttServiceForDevice] 收到ErrorList消息: ${JSON.stringify(message.CONTEXT)}` ); Object.keys(message.CONTEXT).forEach((key) => { const error = message.CONTEXT[key]; console.log( `Error CodeID: ${error.CodeID}, Level: ${error.Level}, Resource Info: ${error.Resouceinfo}` ); // Emit the new event with the error information emitter.emit('DEVICE_ERROR', error); }); }; const options = { clean: true, connectTimeout: 4000, username: 'mqttx1112', password: '', keepalive: 120, // 心跳间隔 120 秒(更宽松,减少超时) reconnectPeriod: 5000, // 断线后 5 秒自动重连 clientId: `device_mqtt_${Math.random().toString(16).substring(2, 10)}`, // 唯一客户端 ID }; let mqttClient: any = null; let isConnecting = false; // 连接状态标志,防止重复调用 const handleMqttMessage = (message: MqttMessage) => { console.log(`[mqttServiceForDevice] 收到message.CONTEXT ${message.CONTEXT}`); if (message.CONTEXT === '0') { emitter.emit('GENERATOR_RAD_OFF'); } else if (message.CONTEXT === '1') { emitter.emit('ACQUISITION_STARTED'); emitter.emit('GENERATOR_RAD_PREPARE'); } else if (message.CONTEXT === '2') { emitter.emit('GENERATOR_RAD_READY'); } else if (message.CONTEXT === '3') { emitter.emit('ACQUISITION_STARTED'); } else if (message.CONTEXT === '4') { emitter.emit('GENERATOR_STATUS_STANDBY'); //发生器ready } }; const handleMqttMessageFromDetector = (message: MqttMessage) => { console.log( `[mqttServiceForDevice] 收到来自于Detector 的 message.CONTEXT ${message.CONTEXT}` ); if (message.CONTEXT === '4') { //探测器状态会被通知为4(表示准备就绪) emitter.emit('DETECTOR_ACQUISITION_STARTED'); } else if (message.CONTEXT === '5') { //探测器状态首先被通知为5(表示正在采集) emitter.emit('DETECTOR_ACQUISITION_INPROGRESS'); } else { //探测器出错 emitter.emit('DETECTOR_ACQUISITION_ERROR'); } }; const startListening = async () => { try { // 防止重复调用 if (isConnecting) { console.log('[mqttServiceForDevice] 连接正在进行中,忽略重复调用'); return; } const MQTT_BROKER_URL = getMqttBrokerUrl(); const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] startListening with broker: ${MQTT_BROKER_URL}`); // 清理旧连接 if (mqttClient) { console.log(`[${timestamp()}] [mqttServiceForDevice] 检测到旧连接,正在清理...`); try { // 移除所有事件监听器,防止内存泄漏 mqttClient.removeAllListeners(); // 强制关闭连接 mqttClient.end(true); mqttClient = null; console.log(`[${timestamp()}] [mqttServiceForDevice] 旧连接已清理`); } catch (cleanupError) { console.error(`[${timestamp()}] [mqttServiceForDevice] 清理旧连接时出错:`, cleanupError); } } isConnecting = true; mqttClient = mqtt.connect(MQTT_BROKER_URL, options); mqttClient.on('connect', (connack) => { isConnecting = false; const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ Connected to MQTT broker: ${MQTT_BROKER_URL}`, { sessionPresent: connack.sessionPresent, returnCode: connack.returnCode }); // 发送连接成功事件到应用层 emitter.emit('DEVICE_MQTT_CONNECTED'); mqttClient.subscribe(MQTT_TOPIC, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic', err ); } }); mqttClient.subscribe(MQTT_TOPIC_2, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic', err ); } }); mqttClient.subscribe(MQTT_TOPIC_DETECTOR, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TOPIC_DETECTOR', err ); } }); mqttClient.subscribe(MQTT_KV, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_KV', err ); } }); mqttClient.subscribe(MQTT_MAS, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MAS', err ); } }); mqttClient.subscribe(MQTT_MA, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MA', err ); } }); mqttClient.subscribe(MQTT_MS, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MS', err ); } }); mqttClient.subscribe(MQTT_TECHMODE, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TECHMODE', err ); } }); mqttClient.subscribe(MQTT_ERRORLIST_NOTIFY, (err) => { if (err) { console.error( '[mqttServiceForDevice] Failed to subscribe to topic MQTT_ERRORLIST_NOTIFY', err ); } }); }); // 重连事件 mqttClient.on('reconnect', () => { const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] 🔄 正在尝试重连...`); emitter.emit('DEVICE_MQTT_RECONNECTING'); }); // 连接关闭事件 mqttClient.on('close', () => { const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] 🚪 连接已关闭`); emitter.emit('DEVICE_MQTT_DISCONNECTED'); }); // 离线事件 mqttClient.on('offline', () => { const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] 📡 客户端离线`); emitter.emit('DEVICE_MQTT_OFFLINE'); }); // 断开连接事件 mqttClient.on('disconnect', (packet) => { const timestamp = () => new Date().toISOString(); console.log(`[${timestamp()}] [mqttServiceForDevice] ⚠️ 主动断开连接`, packet); }); // 错误事件 mqttClient.on('error', (error) => { isConnecting = false; const timestamp = () => new Date().toISOString(); console.error(`[${timestamp()}] [mqttServiceForDevice] ❌ 设备mqtt出现错误:`, { message: error.message, code: error.code, stack: error.stack?.split('\n')[0] // 只记录第一行堆栈 }); // 发送错误事件到应用层 emitter.emit('DEVICE_MQTT_ERROR', { message: error.message, code: error.code }); }); mqttClient.on('message', (topic, message) => { console.log( `【接收设备mqtt消息】 topic是 ${topic} , message : ${JSON.parse(message.toString())}` ); if (topic === MQTT_TOPIC) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); handleMqttMessage(parsedMessage); } if (topic === MQTT_TOPIC_2) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); handleMqttMessage(parsedMessage); } if (topic === MQTT_TOPIC_DETECTOR) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); handleMqttMessageFromDetector(parsedMessage); } if (topic === MQTT_KV) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); console.log(`从设备发来新的KV值 ${parsedMessage.CONTEXT}`); emitter.emit('NEW_KV', parsedMessage.CONTEXT); } if (topic === MQTT_MAS) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); console.log(`从设备发来新的MAS值 ${parsedMessage.CONTEXT}`); emitter.emit('NEW_MAS', parsedMessage.CONTEXT); } if (topic === MQTT_MA) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); console.log(`从设备发来新的MA值 ${parsedMessage.CONTEXT}`); emitter.emit('NEW_MA', parsedMessage.CONTEXT); } if (topic === MQTT_MS) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); console.log(`从设备发来新的MS值 ${parsedMessage.CONTEXT}`); emitter.emit('NEW_MS', parsedMessage.CONTEXT); } if (topic === MQTT_TECHMODE) { const parsedMessage: MqttMessage = JSON.parse(message.toString()); console.log(`从设备发来新的曝光模式值 ${parsedMessage.CONTEXT}`); emitter.emit('NEW_EXPOSURE_MODE', parsedMessage.CONTEXT); } if (topic === MQTT_ERRORLIST_NOTIFY) { const parsedMessage: ErrorListMessage = JSON.parse(message.toString()); handleErrorListMessage(parsedMessage); } }); } catch (error) { console.error('[mqttServiceForDevice] Failed to start listening:', error); } }; const stopListening = () => { const timestamp = () => new Date().toISOString(); if (!mqttClient) { console.log(`[${timestamp()}] [mqttServiceForDevice] 没有活跃的连接需要关闭`); return; } console.log(`[${timestamp()}] [mqttServiceForDevice] 正在停止监听...`); try { // 移除所有事件监听器 mqttClient.removeAllListeners(); // 优雅关闭连接 mqttClient.end(false, () => { console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ 已停止监听 MQTT broker`); mqttClient = null; isConnecting = false; }); } catch (error) { console.error(`[${timestamp()}] [mqttServiceForDevice] 停止监听时出错:`, error); // 强制关闭 mqttClient.end(true); mqttClient = null; isConnecting = false; } }; export { startListening, stopListening };