#include "AsyncMsgHandler.h" #include "ResDataObject.h" // 假设这是您的ResDataObject头文件 #include AsyncMsgHandler& AsyncMsgHandler::getInstance() { static AsyncMsgHandler instance; return instance; } AsyncMsgHandler::AsyncMsgHandler() : m_running(true) { // 启动工作线程 m_workerThread = std::thread(&AsyncMsgHandler::workerThreadFunc, this); std::cout << "[AsyncMsgHandler] 异步消息处理器已启动" << std::endl; } AsyncMsgHandler::~AsyncMsgHandler() { // 停止工作线程 m_running = false; m_condition.notify_all(); if (m_workerThread.joinable()) { m_workerThread.join(); } std::cout << "[AsyncMsgHandler] 异步消息处理器已关闭" << std::endl; } void AsyncMsgHandler::handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req, const std::string& topic, ccos_mqtt_connection* connection) { if (!callback || !connection) { return; } // 创建消息任务 auto task = std::make_unique(callback, req, topic, connection); // 加入队列 { std::lock_guard lock(m_queueMutex); m_messageQueue.push(std::move(task)); } // 通知工作线程 m_condition.notify_one(); } void AsyncMsgHandler::workerThreadFunc() { while (m_running) { std::unique_ptr task; // 从队列获取任务 { std::unique_lock lock(m_queueMutex); m_condition.wait(lock, [this] { return !m_messageQueue.empty() || !m_running; }); if (!m_running) { break; } if (!m_messageQueue.empty()) { task = std::move(m_messageQueue.front()); m_messageQueue.pop(); } } // 执行任务 if (task && task->callback) { try { // 调用用户回调函数 task->callback(const_cast(&task->req), task->topic.c_str(), task->connection); } catch (const std::exception& e) { std::cout << "[AsyncMsgHandler] 消息处理异常: " << e.what() << ", Topic: " << task->topic << std::endl; } catch (...) { std::cout << "[AsyncMsgHandler] 消息处理未知异常, Topic: " << task->topic << std::endl; } } } std::cout << "[AsyncMsgHandler] 工作线程已退出" << std::endl; } size_t AsyncMsgHandler::getQueueSize() { std::lock_guard lock(m_queueMutex); return m_messageQueue.size(); }