AsyncMsgHandler.cpp 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #include "AsyncMsgHandler.h"
  2. #include "ResDataObject.h" // 假设这是您的ResDataObject头文件
  3. #include <iostream>
  4. AsyncMsgHandler& AsyncMsgHandler::getInstance() {
  5. static AsyncMsgHandler instance;
  6. return instance;
  7. }
  8. AsyncMsgHandler::AsyncMsgHandler() : m_running(true) {
  9. // 启动工作线程
  10. m_workerThread = std::thread(&AsyncMsgHandler::workerThreadFunc, this);
  11. std::cout << "[AsyncMsgHandler] 异步消息处理器已启动" << std::endl;
  12. }
  13. AsyncMsgHandler::~AsyncMsgHandler() {
  14. // 停止工作线程
  15. m_running = false;
  16. m_condition.notify_all();
  17. if (m_workerThread.joinable()) {
  18. m_workerThread.join();
  19. }
  20. std::cout << "[AsyncMsgHandler] 异步消息处理器已关闭" << std::endl;
  21. }
  22. void AsyncMsgHandler::handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
  23. const std::string& topic, ccos_mqtt_connection* connection) {
  24. if (!callback || !connection) {
  25. return;
  26. }
  27. // 创建消息任务
  28. auto task = std::make_unique<MessageTask>(callback, req, topic, connection);
  29. // 加入队列
  30. {
  31. std::lock_guard<std::mutex> lock(m_queueMutex);
  32. m_messageQueue.push(std::move(task));
  33. }
  34. // 通知工作线程
  35. m_condition.notify_one();
  36. }
  37. void AsyncMsgHandler::workerThreadFunc() {
  38. while (m_running) {
  39. std::unique_ptr<MessageTask> task;
  40. // 从队列获取任务
  41. {
  42. std::unique_lock<std::mutex> lock(m_queueMutex);
  43. m_condition.wait(lock, [this] { return !m_messageQueue.empty() || !m_running; });
  44. if (!m_running) {
  45. break;
  46. }
  47. if (!m_messageQueue.empty()) {
  48. task = std::move(m_messageQueue.front());
  49. m_messageQueue.pop();
  50. }
  51. }
  52. // 执行任务
  53. if (task && task->callback) {
  54. try {
  55. // 调用用户回调函数
  56. task->callback(const_cast<ResDataObject*>(&task->req),
  57. task->topic.c_str(),
  58. task->connection);
  59. }
  60. catch (const std::exception& e) {
  61. std::cout << "[AsyncMsgHandler] 消息处理异常: " << e.what()
  62. << ", Topic: " << task->topic << std::endl;
  63. }
  64. catch (...) {
  65. std::cout << "[AsyncMsgHandler] 消息处理未知异常, Topic: "
  66. << task->topic << std::endl;
  67. }
  68. }
  69. }
  70. std::cout << "[AsyncMsgHandler] 工作线程已退出" << std::endl;
  71. }
  72. size_t AsyncMsgHandler::getQueueSize() {
  73. std::lock_guard<std::mutex> lock(m_queueMutex);
  74. return m_messageQueue.size();
  75. }