12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #include "AsyncMsgHandler.h"
- #include "ResDataObject.h" // 假设这是您的ResDataObject头文件
- #include <iostream>
- AsyncMsgHandler& AsyncMsgHandler::getInstance() {
- static AsyncMsgHandler instance;
- return instance;
- }
- AsyncMsgHandler::AsyncMsgHandler() : m_running(true) {
- // 启动工作线程
- m_workerThread = std::thread(&AsyncMsgHandler::workerThreadFunc, this);
- std::cout << "[AsyncMsgHandler] 异步消息处理器已启动" << std::endl;
- }
- AsyncMsgHandler::~AsyncMsgHandler() {
- // 停止工作线程
- m_running = false;
- m_condition.notify_all();
- if (m_workerThread.joinable()) {
- m_workerThread.join();
- }
- std::cout << "[AsyncMsgHandler] 异步消息处理器已关闭" << std::endl;
- }
- void AsyncMsgHandler::handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
- const std::string& topic, ccos_mqtt_connection* connection) {
- if (!callback || !connection) {
- return;
- }
- // 创建消息任务
- auto task = std::make_unique<MessageTask>(callback, req, topic, connection);
- // 加入队列
- {
- std::lock_guard<std::mutex> lock(m_queueMutex);
- m_messageQueue.push(std::move(task));
- }
- // 通知工作线程
- m_condition.notify_one();
- }
- void AsyncMsgHandler::workerThreadFunc() {
- while (m_running) {
- std::unique_ptr<MessageTask> task;
- // 从队列获取任务
- {
- std::unique_lock<std::mutex> lock(m_queueMutex);
- m_condition.wait(lock, [this] { return !m_messageQueue.empty() || !m_running; });
- if (!m_running) {
- break;
- }
- if (!m_messageQueue.empty()) {
- task = std::move(m_messageQueue.front());
- m_messageQueue.pop();
- }
- }
- // 执行任务
- if (task && task->callback) {
- try {
- // 调用用户回调函数
- task->callback(const_cast<ResDataObject*>(&task->req),
- task->topic.c_str(),
- task->connection);
- }
- catch (const std::exception& e) {
- std::cout << "[AsyncMsgHandler] 消息处理异常: " << e.what()
- << ", Topic: " << task->topic << std::endl;
- }
- catch (...) {
- std::cout << "[AsyncMsgHandler] 消息处理未知异常, Topic: "
- << task->topic << std::endl;
- }
- }
- }
- std::cout << "[AsyncMsgHandler] 工作线程已退出" << std::endl;
- }
- size_t AsyncMsgHandler::getQueueSize() {
- std::lock_guard<std::mutex> lock(m_queueMutex);
- return m_messageQueue.size();
- }
|