123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- #pragma once
- #include <thread>
- #include <queue>
- #include <mutex>
- #include <condition_variable>
- #include <functional>
- #include <atomic>
- #include <memory>
- #include "LogicDevice.h"
- /**
- * 轻量级异步MQTT消息处理器
- * 专门解决msgarrivd函数中用户回调阻塞导致的消息丢失问题
- */
- class AsyncMsgHandler {
- public:
- struct MessageTask {
- ccos_mqtt_callback callback;
- ResDataObject req;
- std::string topic;
- ccos_mqtt_connection* connection;
- MessageTask(ccos_mqtt_callback cb, const ResDataObject& r, const std::string& t, ccos_mqtt_connection* conn)
- : callback(cb), req(r), topic(t), connection(conn) {}
- };
- private:
- std::queue<std::unique_ptr<MessageTask>> m_messageQueue;
- std::mutex m_queueMutex;
- std::condition_variable m_condition;
- std::atomic<bool> m_running{true};
- std::thread m_workerThread;
- // 工作线程函数
- void workerThreadFunc();
- public:
- static AsyncMsgHandler& getInstance();
- AsyncMsgHandler();
- ~AsyncMsgHandler();
- // 异步处理消息
- void handleMessageAsync(ccos_mqtt_callback callback, const ResDataObject& req,
- const std::string& topic, ccos_mqtt_connection* connection);
- // 获取队列状态
- size_t getQueueSize();
- // 禁用拷贝构造和赋值
- AsyncMsgHandler(const AsyncMsgHandler&) = delete;
- AsyncMsgHandler& operator=(const AsyncMsgHandler&) = delete;
- };
|