| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- import mqtt from 'mqtt';
- import emitter from '../utils/eventEmitter';
- import { getMqttBrokerUrl } from '@/API/config';
- interface MqttMessage {
- IDX: string;
- TYPE: string;
- CMD: string;
- KEY: string;
- CONTEXT: string;
- Ver: string;
- Transaction: string;
- HANDLE: {
- ROUTE: string;
- FLAGS: string;
- LANG: string;
- HANDLEID: string;
- OWNERID: {
- EBUSID: string;
- MACHINEID: string;
- PROCID: string;
- ADDR: string;
- };
- DEVID: {
- EBUSID: string;
- MACHINEID: string;
- PROCID: string;
- ADDR: string;
- };
- };
- TOPIC: string;
- PUBLISH: string;
- }
- const MQTT_TOPIC_2 = 'CCOS/DEVICE/Generator/Notify/GENERATORSYNCSTATE'; //发生器
- const MQTT_TOPIC = 'CCOS/DEVICE/Detector/Notify/XrayON';
- const MQTT_TOPIC_DETECTOR = 'CCOS/DEVICE/Detector/Notify/DetectorStatus'; //探测器
- const MQTT_KV = 'CCOS/DEVICE/Generator/Notify/KV'; //KV值
- const MQTT_MAS = 'CCOS/DEVICE/Generator/Notify/MAS';
- const MQTT_MA = 'CCOS/DEVICE/Generator/Notify/MA';
- const MQTT_MS = 'CCOS/DEVICE/Generator/Notify/MS';
- const MQTT_TECHMODE = 'CCOS/DEVICE/Generator/Notify/TECHMODE';
- const MQTT_ERRORLIST_NOTIFY = 'CCOS/DEVICE/Generator/Notify/ErrorList';
- interface ErrorListMessage {
- IDX: string;
- TYPE: string;
- CMD: string;
- KEY: string;
- CONTEXT: Record<
- string,
- {
- CodeID: string;
- Type: string; // 0 表示error , 1 表示 warning
- Level: string;
- Resouceinfo: string;
- Description: string;
- CreationTime: string;
- AppId: string;
- InstanceId: string;
- }
- >;
- Ver: string;
- Transaction: string;
- HANDLE: {
- ROUTE: string;
- FLAGS: string;
- LANG: string;
- HANDLEID: string;
- OWNERID: {
- EBUSID: string;
- MACHINEID: string;
- PROCID: string;
- ADDR: string;
- };
- DEVID: {
- EBUSID: string;
- MACHINEID: string;
- PROCID: string;
- ADDR: string;
- };
- };
- TOPIC: string;
- PUBLISH: string;
- }
- const handleErrorListMessage = (message: ErrorListMessage) => {
- console.log(
- `[mqttServiceForDevice] 收到ErrorList消息: ${JSON.stringify(message.CONTEXT)}`
- );
- Object.keys(message.CONTEXT).forEach((key) => {
- const error = message.CONTEXT[key];
- console.log(
- `Error CodeID: ${error.CodeID}, Level: ${error.Level}, Resource Info: ${error.Resouceinfo}`
- );
- // Emit the new event with the error information
- emitter.emit('DEVICE_ERROR', error);
- });
- };
- const options = {
- clean: true,
- connectTimeout: 4000,
- username: 'mqttx1112',
- password: '',
- keepalive: 120, // 心跳间隔 120 秒(更宽松,减少超时)
- reconnectPeriod: 5000, // 断线后 5 秒自动重连
- clientId: `device_mqtt_${Math.random().toString(16).substring(2, 10)}`, // 唯一客户端 ID
- };
- let mqttClient: any = null;
- let isConnecting = false; // 连接状态标志,防止重复调用
- const handleMqttMessage = (message: MqttMessage) => {
- console.log(`[mqttServiceForDevice] 收到message.CONTEXT ${message.CONTEXT}`);
- if (message.CONTEXT === '0') {
- emitter.emit('GENERATOR_RAD_OFF');
- } else if (message.CONTEXT === '1') {
- emitter.emit('ACQUISITION_STARTED');
- emitter.emit('GENERATOR_RAD_PREPARE');
- } else if (message.CONTEXT === '2') {
- emitter.emit('GENERATOR_RAD_READY');
- } else if (message.CONTEXT === '3') {
- emitter.emit('ACQUISITION_STARTED');
- } else if (message.CONTEXT === '4') {
- emitter.emit('GENERATOR_STATUS_STANDBY'); //发生器ready
- }
- };
- const handleMqttMessageFromDetector = (message: MqttMessage) => {
- console.log(
- `[mqttServiceForDevice] 收到来自于Detector 的 message.CONTEXT ${message.CONTEXT}`
- );
- if (message.CONTEXT === '4') {
- //探测器状态会被通知为4(表示准备就绪)
- emitter.emit('DETECTOR_ACQUISITION_STARTED');
- } else if (message.CONTEXT === '5') {
- //探测器状态首先被通知为5(表示正在采集)
- emitter.emit('DETECTOR_ACQUISITION_INPROGRESS');
- } else {
- //探测器出错
- emitter.emit('DETECTOR_ACQUISITION_ERROR');
- }
- };
- const startListening = async () => {
- try {
- // 防止重复调用
- if (isConnecting) {
- console.log('[mqttServiceForDevice] 连接正在进行中,忽略重复调用');
- return;
- }
- const MQTT_BROKER_URL = getMqttBrokerUrl();
- const timestamp = () => new Date().toISOString();
-
- console.log(`[${timestamp()}] [mqttServiceForDevice] startListening with broker: ${MQTT_BROKER_URL}`);
- // 清理旧连接
- if (mqttClient) {
- console.log(`[${timestamp()}] [mqttServiceForDevice] 检测到旧连接,正在清理...`);
- try {
- // 移除所有事件监听器,防止内存泄漏
- mqttClient.removeAllListeners();
- // 强制关闭连接
- mqttClient.end(true);
- mqttClient = null;
- console.log(`[${timestamp()}] [mqttServiceForDevice] 旧连接已清理`);
- } catch (cleanupError) {
- console.error(`[${timestamp()}] [mqttServiceForDevice] 清理旧连接时出错:`, cleanupError);
- }
- }
- isConnecting = true;
-
- mqttClient = mqtt.connect(MQTT_BROKER_URL, options);
- mqttClient.on('connect', (connack) => {
- isConnecting = false;
- const timestamp = () => new Date().toISOString();
- console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ Connected to MQTT broker: ${MQTT_BROKER_URL}`, {
- sessionPresent: connack.sessionPresent,
- returnCode: connack.returnCode
- });
-
- // 发送连接成功事件到应用层
- emitter.emit('DEVICE_MQTT_CONNECTED');
-
- mqttClient.subscribe(MQTT_TOPIC, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_TOPIC_2, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_TOPIC_DETECTOR, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TOPIC_DETECTOR',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_KV, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_KV',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_MAS, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MAS',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_MA, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MA',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_MS, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MS',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_TECHMODE, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TECHMODE',
- err
- );
- }
- });
- mqttClient.subscribe(MQTT_ERRORLIST_NOTIFY, (err) => {
- if (err) {
- console.error(
- '[mqttServiceForDevice] Failed to subscribe to topic MQTT_ERRORLIST_NOTIFY',
- err
- );
- }
- });
- });
- // 重连事件
- mqttClient.on('reconnect', () => {
- const timestamp = () => new Date().toISOString();
- console.log(`[${timestamp()}] [mqttServiceForDevice] 🔄 正在尝试重连...`);
- emitter.emit('DEVICE_MQTT_RECONNECTING');
- });
- // 连接关闭事件
- mqttClient.on('close', () => {
- const timestamp = () => new Date().toISOString();
- console.log(`[${timestamp()}] [mqttServiceForDevice] 🚪 连接已关闭`);
- emitter.emit('DEVICE_MQTT_DISCONNECTED');
- });
- // 离线事件
- mqttClient.on('offline', () => {
- const timestamp = () => new Date().toISOString();
- console.log(`[${timestamp()}] [mqttServiceForDevice] 📡 客户端离线`);
- emitter.emit('DEVICE_MQTT_OFFLINE');
- });
- // 断开连接事件
- mqttClient.on('disconnect', (packet) => {
- const timestamp = () => new Date().toISOString();
- console.log(`[${timestamp()}] [mqttServiceForDevice] ⚠️ 主动断开连接`, packet);
- });
- // 错误事件
- mqttClient.on('error', (error) => {
- isConnecting = false;
- const timestamp = () => new Date().toISOString();
- console.error(`[${timestamp()}] [mqttServiceForDevice] ❌ 设备mqtt出现错误:`, {
- message: error.message,
- code: error.code,
- stack: error.stack?.split('\n')[0] // 只记录第一行堆栈
- });
-
- // 发送错误事件到应用层
- emitter.emit('DEVICE_MQTT_ERROR', {
- message: error.message,
- code: error.code
- });
- });
- mqttClient.on('message', (topic, message) => {
- console.log(
- `【接收设备mqtt消息】 topic是 ${topic} , message : ${JSON.parse(message.toString())}`
- );
- if (topic === MQTT_TOPIC) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- handleMqttMessage(parsedMessage);
- }
- if (topic === MQTT_TOPIC_2) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- handleMqttMessage(parsedMessage);
- }
- if (topic === MQTT_TOPIC_DETECTOR) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- handleMqttMessageFromDetector(parsedMessage);
- }
- if (topic === MQTT_KV) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- console.log(`从设备发来新的KV值 ${parsedMessage.CONTEXT}`);
- emitter.emit('NEW_KV', parsedMessage.CONTEXT);
- }
- if (topic === MQTT_MAS) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- console.log(`从设备发来新的MAS值 ${parsedMessage.CONTEXT}`);
- emitter.emit('NEW_MAS', parsedMessage.CONTEXT);
- }
- if (topic === MQTT_MA) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- console.log(`从设备发来新的MA值 ${parsedMessage.CONTEXT}`);
- emitter.emit('NEW_MA', parsedMessage.CONTEXT);
- }
- if (topic === MQTT_MS) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- console.log(`从设备发来新的MS值 ${parsedMessage.CONTEXT}`);
- emitter.emit('NEW_MS', parsedMessage.CONTEXT);
- }
- if (topic === MQTT_TECHMODE) {
- const parsedMessage: MqttMessage = JSON.parse(message.toString());
- console.log(`从设备发来新的曝光模式值 ${parsedMessage.CONTEXT}`);
- emitter.emit('NEW_EXPOSURE_MODE', parsedMessage.CONTEXT);
- }
- if (topic === MQTT_ERRORLIST_NOTIFY) {
- const parsedMessage: ErrorListMessage = JSON.parse(message.toString());
- handleErrorListMessage(parsedMessage);
- }
- });
- } catch (error) {
- console.error('[mqttServiceForDevice] Failed to start listening:', error);
- }
- };
- const stopListening = () => {
- const timestamp = () => new Date().toISOString();
-
- if (!mqttClient) {
- console.log(`[${timestamp()}] [mqttServiceForDevice] 没有活跃的连接需要关闭`);
- return;
- }
-
- console.log(`[${timestamp()}] [mqttServiceForDevice] 正在停止监听...`);
-
- try {
- // 移除所有事件监听器
- mqttClient.removeAllListeners();
- // 优雅关闭连接
- mqttClient.end(false, () => {
- console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ 已停止监听 MQTT broker`);
- mqttClient = null;
- isConnecting = false;
- });
- } catch (error) {
- console.error(`[${timestamp()}] [mqttServiceForDevice] 停止监听时出错:`, error);
- // 强制关闭
- mqttClient.end(true);
- mqttClient = null;
- isConnecting = false;
- }
- };
- export { startListening, stopListening };
|