mqttServiceForDevice.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. import mqtt from 'mqtt';
  2. import emitter from '../utils/eventEmitter';
  3. import { getMqttBrokerUrl } from '@/API/config';
  4. interface MqttMessage {
  5. IDX: string;
  6. TYPE: string;
  7. CMD: string;
  8. KEY: string;
  9. CONTEXT: string;
  10. Ver: string;
  11. Transaction: string;
  12. HANDLE: {
  13. ROUTE: string;
  14. FLAGS: string;
  15. LANG: string;
  16. HANDLEID: string;
  17. OWNERID: {
  18. EBUSID: string;
  19. MACHINEID: string;
  20. PROCID: string;
  21. ADDR: string;
  22. };
  23. DEVID: {
  24. EBUSID: string;
  25. MACHINEID: string;
  26. PROCID: string;
  27. ADDR: string;
  28. };
  29. };
  30. TOPIC: string;
  31. PUBLISH: string;
  32. }
  33. const MQTT_TOPIC_2 = 'CCOS/DEVICE/Generator/Notify/GENERATORSYNCSTATE'; //发生器
  34. const MQTT_TOPIC = 'CCOS/DEVICE/Detector/Notify/XrayON';
  35. const MQTT_TOPIC_DETECTOR = 'CCOS/DEVICE/Detector/Notify/DetectorStatus'; //探测器
  36. const MQTT_KV = 'CCOS/DEVICE/Generator/Notify/KV'; //KV值
  37. const MQTT_MAS = 'CCOS/DEVICE/Generator/Notify/MAS';
  38. const MQTT_MA = 'CCOS/DEVICE/Generator/Notify/MA';
  39. const MQTT_MS = 'CCOS/DEVICE/Generator/Notify/MS';
  40. const MQTT_TECHMODE = 'CCOS/DEVICE/Generator/Notify/TECHMODE';
  41. const MQTT_ERRORLIST_NOTIFY = 'CCOS/DEVICE/Generator/Notify/ErrorList';
  42. interface ErrorListMessage {
  43. IDX: string;
  44. TYPE: string;
  45. CMD: string;
  46. KEY: string;
  47. CONTEXT: Record<
  48. string,
  49. {
  50. CodeID: string;
  51. Type: string; // 0 表示error , 1 表示 warning
  52. Level: string;
  53. Resouceinfo: string;
  54. Description: string;
  55. CreationTime: string;
  56. AppId: string;
  57. InstanceId: string;
  58. }
  59. >;
  60. Ver: string;
  61. Transaction: string;
  62. HANDLE: {
  63. ROUTE: string;
  64. FLAGS: string;
  65. LANG: string;
  66. HANDLEID: string;
  67. OWNERID: {
  68. EBUSID: string;
  69. MACHINEID: string;
  70. PROCID: string;
  71. ADDR: string;
  72. };
  73. DEVID: {
  74. EBUSID: string;
  75. MACHINEID: string;
  76. PROCID: string;
  77. ADDR: string;
  78. };
  79. };
  80. TOPIC: string;
  81. PUBLISH: string;
  82. }
  83. const handleErrorListMessage = (message: ErrorListMessage) => {
  84. console.log(
  85. `[mqttServiceForDevice] 收到ErrorList消息: ${JSON.stringify(message.CONTEXT)}`
  86. );
  87. Object.keys(message.CONTEXT).forEach((key) => {
  88. const error = message.CONTEXT[key];
  89. console.log(
  90. `Error CodeID: ${error.CodeID}, Level: ${error.Level}, Resource Info: ${error.Resouceinfo}`
  91. );
  92. // Emit the new event with the error information
  93. emitter.emit('DEVICE_ERROR', error);
  94. });
  95. };
  96. const options = {
  97. clean: true,
  98. connectTimeout: 4000,
  99. username: 'mqttx1112',
  100. password: '',
  101. keepalive: 120, // 心跳间隔 120 秒(更宽松,减少超时)
  102. reconnectPeriod: 5000, // 断线后 5 秒自动重连
  103. clientId: `device_mqtt_${Math.random().toString(16).substring(2, 10)}`, // 唯一客户端 ID
  104. };
  105. let mqttClient: any = null;
  106. let isConnecting = false; // 连接状态标志,防止重复调用
  107. const handleMqttMessage = (message: MqttMessage) => {
  108. console.log(`[mqttServiceForDevice] 收到message.CONTEXT ${message.CONTEXT}`);
  109. if (message.CONTEXT === '0') {
  110. emitter.emit('GENERATOR_RAD_OFF');
  111. } else if (message.CONTEXT === '1') {
  112. emitter.emit('ACQUISITION_STARTED');
  113. emitter.emit('GENERATOR_RAD_PREPARE');
  114. } else if (message.CONTEXT === '2') {
  115. emitter.emit('GENERATOR_RAD_READY');
  116. } else if (message.CONTEXT === '3') {
  117. emitter.emit('ACQUISITION_STARTED');
  118. } else if (message.CONTEXT === '4') {
  119. emitter.emit('GENERATOR_STATUS_STANDBY'); //发生器ready
  120. }
  121. };
  122. const handleMqttMessageFromDetector = (message: MqttMessage) => {
  123. console.log(
  124. `[mqttServiceForDevice] 收到来自于Detector 的 message.CONTEXT ${message.CONTEXT}`
  125. );
  126. if (message.CONTEXT === '4') {
  127. //探测器状态会被通知为4(表示准备就绪)
  128. emitter.emit('DETECTOR_ACQUISITION_STARTED');
  129. } else if (message.CONTEXT === '5') {
  130. //探测器状态首先被通知为5(表示正在采集)
  131. emitter.emit('DETECTOR_ACQUISITION_INPROGRESS');
  132. } else {
  133. //探测器出错
  134. emitter.emit('DETECTOR_ACQUISITION_ERROR');
  135. }
  136. };
  137. const startListening = async () => {
  138. try {
  139. // 防止重复调用
  140. if (isConnecting) {
  141. console.log('[mqttServiceForDevice] 连接正在进行中,忽略重复调用');
  142. return;
  143. }
  144. const MQTT_BROKER_URL = getMqttBrokerUrl();
  145. const timestamp = () => new Date().toISOString();
  146. console.log(`[${timestamp()}] [mqttServiceForDevice] startListening with broker: ${MQTT_BROKER_URL}`);
  147. // 清理旧连接
  148. if (mqttClient) {
  149. console.log(`[${timestamp()}] [mqttServiceForDevice] 检测到旧连接,正在清理...`);
  150. try {
  151. // 移除所有事件监听器,防止内存泄漏
  152. mqttClient.removeAllListeners();
  153. // 强制关闭连接
  154. mqttClient.end(true);
  155. mqttClient = null;
  156. console.log(`[${timestamp()}] [mqttServiceForDevice] 旧连接已清理`);
  157. } catch (cleanupError) {
  158. console.error(`[${timestamp()}] [mqttServiceForDevice] 清理旧连接时出错:`, cleanupError);
  159. }
  160. }
  161. isConnecting = true;
  162. mqttClient = mqtt.connect(MQTT_BROKER_URL, options);
  163. mqttClient.on('connect', (connack) => {
  164. isConnecting = false;
  165. const timestamp = () => new Date().toISOString();
  166. console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ Connected to MQTT broker: ${MQTT_BROKER_URL}`, {
  167. sessionPresent: connack.sessionPresent,
  168. returnCode: connack.returnCode
  169. });
  170. // 发送连接成功事件到应用层
  171. emitter.emit('DEVICE_MQTT_CONNECTED');
  172. mqttClient.subscribe(MQTT_TOPIC, (err) => {
  173. if (err) {
  174. console.error(
  175. '[mqttServiceForDevice] Failed to subscribe to topic',
  176. err
  177. );
  178. }
  179. });
  180. mqttClient.subscribe(MQTT_TOPIC_2, (err) => {
  181. if (err) {
  182. console.error(
  183. '[mqttServiceForDevice] Failed to subscribe to topic',
  184. err
  185. );
  186. }
  187. });
  188. mqttClient.subscribe(MQTT_TOPIC_DETECTOR, (err) => {
  189. if (err) {
  190. console.error(
  191. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TOPIC_DETECTOR',
  192. err
  193. );
  194. }
  195. });
  196. mqttClient.subscribe(MQTT_KV, (err) => {
  197. if (err) {
  198. console.error(
  199. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_KV',
  200. err
  201. );
  202. }
  203. });
  204. mqttClient.subscribe(MQTT_MAS, (err) => {
  205. if (err) {
  206. console.error(
  207. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MAS',
  208. err
  209. );
  210. }
  211. });
  212. mqttClient.subscribe(MQTT_MA, (err) => {
  213. if (err) {
  214. console.error(
  215. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MA',
  216. err
  217. );
  218. }
  219. });
  220. mqttClient.subscribe(MQTT_MS, (err) => {
  221. if (err) {
  222. console.error(
  223. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_MS',
  224. err
  225. );
  226. }
  227. });
  228. mqttClient.subscribe(MQTT_TECHMODE, (err) => {
  229. if (err) {
  230. console.error(
  231. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_TECHMODE',
  232. err
  233. );
  234. }
  235. });
  236. mqttClient.subscribe(MQTT_ERRORLIST_NOTIFY, (err) => {
  237. if (err) {
  238. console.error(
  239. '[mqttServiceForDevice] Failed to subscribe to topic MQTT_ERRORLIST_NOTIFY',
  240. err
  241. );
  242. }
  243. });
  244. });
  245. // 重连事件
  246. mqttClient.on('reconnect', () => {
  247. const timestamp = () => new Date().toISOString();
  248. console.log(`[${timestamp()}] [mqttServiceForDevice] 🔄 正在尝试重连...`);
  249. emitter.emit('DEVICE_MQTT_RECONNECTING');
  250. });
  251. // 连接关闭事件
  252. mqttClient.on('close', () => {
  253. const timestamp = () => new Date().toISOString();
  254. console.log(`[${timestamp()}] [mqttServiceForDevice] 🚪 连接已关闭`);
  255. emitter.emit('DEVICE_MQTT_DISCONNECTED');
  256. });
  257. // 离线事件
  258. mqttClient.on('offline', () => {
  259. const timestamp = () => new Date().toISOString();
  260. console.log(`[${timestamp()}] [mqttServiceForDevice] 📡 客户端离线`);
  261. emitter.emit('DEVICE_MQTT_OFFLINE');
  262. });
  263. // 断开连接事件
  264. mqttClient.on('disconnect', (packet) => {
  265. const timestamp = () => new Date().toISOString();
  266. console.log(`[${timestamp()}] [mqttServiceForDevice] ⚠️ 主动断开连接`, packet);
  267. });
  268. // 错误事件
  269. mqttClient.on('error', (error) => {
  270. isConnecting = false;
  271. const timestamp = () => new Date().toISOString();
  272. console.error(`[${timestamp()}] [mqttServiceForDevice] ❌ 设备mqtt出现错误:`, {
  273. message: error.message,
  274. code: error.code,
  275. stack: error.stack?.split('\n')[0] // 只记录第一行堆栈
  276. });
  277. // 发送错误事件到应用层
  278. emitter.emit('DEVICE_MQTT_ERROR', {
  279. message: error.message,
  280. code: error.code
  281. });
  282. });
  283. mqttClient.on('message', (topic, message) => {
  284. console.log(
  285. `【接收设备mqtt消息】 topic是 ${topic} , message : ${JSON.parse(message.toString())}`
  286. );
  287. if (topic === MQTT_TOPIC) {
  288. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  289. handleMqttMessage(parsedMessage);
  290. }
  291. if (topic === MQTT_TOPIC_2) {
  292. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  293. handleMqttMessage(parsedMessage);
  294. }
  295. if (topic === MQTT_TOPIC_DETECTOR) {
  296. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  297. handleMqttMessageFromDetector(parsedMessage);
  298. }
  299. if (topic === MQTT_KV) {
  300. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  301. console.log(`从设备发来新的KV值 ${parsedMessage.CONTEXT}`);
  302. emitter.emit('NEW_KV', parsedMessage.CONTEXT);
  303. }
  304. if (topic === MQTT_MAS) {
  305. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  306. console.log(`从设备发来新的MAS值 ${parsedMessage.CONTEXT}`);
  307. emitter.emit('NEW_MAS', parsedMessage.CONTEXT);
  308. }
  309. if (topic === MQTT_MA) {
  310. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  311. console.log(`从设备发来新的MA值 ${parsedMessage.CONTEXT}`);
  312. emitter.emit('NEW_MA', parsedMessage.CONTEXT);
  313. }
  314. if (topic === MQTT_MS) {
  315. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  316. console.log(`从设备发来新的MS值 ${parsedMessage.CONTEXT}`);
  317. emitter.emit('NEW_MS', parsedMessage.CONTEXT);
  318. }
  319. if (topic === MQTT_TECHMODE) {
  320. const parsedMessage: MqttMessage = JSON.parse(message.toString());
  321. console.log(`从设备发来新的曝光模式值 ${parsedMessage.CONTEXT}`);
  322. emitter.emit('NEW_EXPOSURE_MODE', parsedMessage.CONTEXT);
  323. }
  324. if (topic === MQTT_ERRORLIST_NOTIFY) {
  325. const parsedMessage: ErrorListMessage = JSON.parse(message.toString());
  326. handleErrorListMessage(parsedMessage);
  327. }
  328. });
  329. } catch (error) {
  330. console.error('[mqttServiceForDevice] Failed to start listening:', error);
  331. }
  332. };
  333. const stopListening = () => {
  334. const timestamp = () => new Date().toISOString();
  335. if (!mqttClient) {
  336. console.log(`[${timestamp()}] [mqttServiceForDevice] 没有活跃的连接需要关闭`);
  337. return;
  338. }
  339. console.log(`[${timestamp()}] [mqttServiceForDevice] 正在停止监听...`);
  340. try {
  341. // 移除所有事件监听器
  342. mqttClient.removeAllListeners();
  343. // 优雅关闭连接
  344. mqttClient.end(false, () => {
  345. console.log(`[${timestamp()}] [mqttServiceForDevice] ✅ 已停止监听 MQTT broker`);
  346. mqttClient = null;
  347. isConnecting = false;
  348. });
  349. } catch (error) {
  350. console.error(`[${timestamp()}] [mqttServiceForDevice] 停止监听时出错:`, error);
  351. // 强制关闭
  352. mqttClient.end(true);
  353. mqttClient = null;
  354. isConnecting = false;
  355. }
  356. };
  357. export { startListening, stopListening };