Procházet zdrojové kódy

添加休息队列接收消息!

lwk před 4 dny
rodič
revize
35b773df6a

+ 91 - 0
LogicDevice--mqtt/AsyncMsgHandler.cpp

@@ -0,0 +1,91 @@
+#include "AsyncMsgHandler.h"
+#include "ResDataObject.h"  // 假设这是您的ResDataObject头文件
+#include <iostream>
+
+AsyncMsgHandler& AsyncMsgHandler::getInstance() {
+    static AsyncMsgHandler instance;
+    return instance;
+}
+
+AsyncMsgHandler::AsyncMsgHandler() : m_running(true) {
+    // 启动工作线程
+    m_workerThread = std::thread(&AsyncMsgHandler::workerThreadFunc, this);
+    std::cout << "[AsyncMsgHandler] 异步消息处理器已启动" << std::endl;
+}
+
+AsyncMsgHandler::~AsyncMsgHandler() {
+    // 停止工作线程
+    m_running = false;
+    m_condition.notify_all();
+
+    if (m_workerThread.joinable()) {
+        m_workerThread.join();
+    }
+
+    std::cout << "[AsyncMsgHandler] 异步消息处理器已关闭" << std::endl;
+}
+
+void AsyncMsgHandler::handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
+                                       const std::string& topic, ccos_mqtt_connection* connection) {
+    if (!callback || !connection) {
+        return;
+    }
+
+    // 创建消息任务
+    auto task = std::make_unique<MessageTask>(callback, req, topic, connection);
+
+    // 加入队列
+    {
+        std::lock_guard<std::mutex> lock(m_queueMutex);
+        m_messageQueue.push(std::move(task));
+    }
+
+    // 通知工作线程
+    m_condition.notify_one();
+}
+
+void AsyncMsgHandler::workerThreadFunc() {
+    while (m_running) {
+        std::unique_ptr<MessageTask> task;
+
+        // 从队列获取任务
+        {
+            std::unique_lock<std::mutex> lock(m_queueMutex);
+            m_condition.wait(lock, [this] { return !m_messageQueue.empty() || !m_running; });
+
+            if (!m_running) {
+                break;
+            }
+
+            if (!m_messageQueue.empty()) {
+                task = std::move(m_messageQueue.front());
+                m_messageQueue.pop();
+            }
+        }
+
+        // 执行任务
+        if (task && task->callback) {
+            try {
+                // 调用用户回调函数
+                task->callback(const_cast<ResDataObject*>(&task->req),
+                             task->topic.c_str(),
+                             task->connection);
+            }
+            catch (const std::exception& e) {
+                std::cout << "[AsyncMsgHandler] 消息处理异常: " << e.what()
+                         << ", Topic: " << task->topic << std::endl;
+            }
+            catch (...) {
+                std::cout << "[AsyncMsgHandler] 消息处理未知异常, Topic: "
+                         << task->topic << std::endl;
+            }
+        }
+    }
+
+    std::cout << "[AsyncMsgHandler] 工作线程已退出" << std::endl;
+}
+
+size_t AsyncMsgHandler::getQueueSize() {
+    std::lock_guard<std::mutex> lock(m_queueMutex);
+    return m_messageQueue.size();
+}

+ 54 - 0
LogicDevice--mqtt/AsyncMsgHandler.h

@@ -0,0 +1,54 @@
+#pragma once
+
+#include <thread>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <functional>
+#include <atomic>
+#include <memory>
+#include "LogicDevice.h"
+
+/**
+ * 轻量级异步MQTT消息处理器
+ * 专门解决msgarrivd函数中用户回调阻塞导致的消息丢失问题
+ */
+class AsyncMsgHandler {
+public:
+    struct MessageTask {
+        ccos_mqtt_callback callback;
+        ResDataObject req;
+        std::string topic;
+        ccos_mqtt_connection* connection;
+
+        MessageTask(ccos_mqtt_callback cb, const ResDataObject& r, const std::string& t, ccos_mqtt_connection* conn)
+            : callback(cb), req(r), topic(t), connection(conn) {}
+    };
+
+private:
+    std::queue<std::unique_ptr<MessageTask>> m_messageQueue;
+    std::mutex m_queueMutex;
+    std::condition_variable m_condition;
+    std::atomic<bool> m_running{true};
+    std::thread m_workerThread;
+
+    // 工作线程函数
+    void workerThreadFunc();
+
+public:
+    static AsyncMsgHandler& getInstance();
+
+    AsyncMsgHandler();
+    ~AsyncMsgHandler();
+
+    // 异步处理消息
+    void handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
+                          const std::string& topic, ccos_mqtt_connection* connection);
+
+    // 获取队列状态
+    size_t getQueueSize();
+
+    // 禁用拷贝构造和赋值
+    AsyncMsgHandler(const AsyncMsgHandler&) = delete;
+    AsyncMsgHandler& operator=(const AsyncMsgHandler&) = delete;
+};

+ 2 - 0
LogicDevice--mqtt/CMakeLists.txt

@@ -64,6 +64,8 @@ set(SRC_FILES
     LogicDevice.h
     LogLocalHelper.cpp
     LogLocalHelper.h
+    AsyncMsgHandler.cpp
+    AsyncMsgHandler.h
 )
 
 # 创建库目标

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 146 - 133
LogicDevice--mqtt/LogicDevice.cpp


Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů