AsyncMsgHandler.h 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #pragma once
  2. #include <thread>
  3. #include <queue>
  4. #include <mutex>
  5. #include <condition_variable>
  6. #include <functional>
  7. #include <atomic>
  8. #include <memory>
  9. #include "LogicDevice.h"
  10. /**
  11. * 轻量级异步MQTT消息处理器
  12. * 专门解决msgarrivd函数中用户回调阻塞导致的消息丢失问题
  13. */
  14. class AsyncMsgHandler {
  15. public:
  16. struct MessageTask {
  17. ccos_mqtt_callback callback;
  18. ResDataObject req;
  19. std::string topic;
  20. ccos_mqtt_connection* connection;
  21. MessageTask(ccos_mqtt_callback cb, const ResDataObject& r, const std::string& t, ccos_mqtt_connection* conn)
  22. : callback(cb), req(r), topic(t), connection(conn) {}
  23. };
  24. private:
  25. std::queue<std::unique_ptr<MessageTask>> m_messageQueue;
  26. std::mutex m_queueMutex;
  27. std::condition_variable m_condition;
  28. std::atomic<bool> m_running{true};
  29. std::thread m_workerThread;
  30. // 工作线程函数
  31. void workerThreadFunc();
  32. public:
  33. static AsyncMsgHandler& getInstance();
  34. AsyncMsgHandler();
  35. ~AsyncMsgHandler();
  36. // 异步处理消息
  37. void handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
  38. const std::string& topic, ccos_mqtt_connection* connection);
  39. // 获取队列状态
  40. size_t getQueueSize();
  41. // 禁用拷贝构造和赋值
  42. AsyncMsgHandler(const AsyncMsgHandler&) = delete;
  43. AsyncMsgHandler& operator=(const AsyncMsgHandler&) = delete;
  44. };