#pragma once #include #include #include #include #include #include #include #include #include //namespace UCT = ::Utility::Concurrent; namespace Utility { namespace Concurrent { enum class wait_result { enQuit = -2, // 收到 Quit 信号, list 状态未知 enTimeOut = -1, // 等待超时, list 状态未知 enEmpty = 0, // 收到通知, list 为空 enOne = 1, // 收到通知, wait_pop 返回 value 有效, 返回后, list 为空 enMore = 2, // 收到通知, wait_pop 返回 value 有效, 返回后, list 还有值 // operator bool () const = delete; }; } } namespace Utility { namespace Concurrent { //----------------------------------------------------------------------------- // ExclusiveBase //----------------------------------------------------------------------------- template class ExclusiveBase { protected: tContainer m_Chunk; mutable std::mutex m_Mutex; std::condition_variable m_Notify; bool m_bQuit = false; public: using container = typename tContainer; using reference = typename tContainer::reference; using const_reference = typename tContainer::const_reference; // using const_iterator = typename tContainer::const_iterator; public: ExclusiveBase () = default; ExclusiveBase (const ExclusiveBase &) = delete; ExclusiveBase (ExclusiveBase &&) = delete; virtual ~ExclusiveBase () { clear (); } ExclusiveBase & operator = (const ExclusiveBase &) = delete; ExclusiveBase & operator = (ExclusiveBase &&) = delete; public: bool empty () const { std::lock_guard lk (m_Mutex); return m_Chunk.empty (); } [[nodiscard]] size_t size () const { std::lock_guard lk (m_Mutex); return m_Chunk.size (); } public: void quit () { std::lock_guard lk (m_Mutex); m_bQuit = true; m_Notify.notify_all (); } void clear () { std::lock_guard lk (m_Mutex); m_Chunk.clear (); } public: // 检查 m_bQuit ! 等待非空, 然后执行相应的 Action template wait_result wait (_Act _action, Args && ... args) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (! m_Chunk.empty ()) { _action (& m_Chunk, args...); return wait_result::enMore; } m_Notify.wait (lk, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); }); if (m_bQuit) return wait_result::enQuit; if (m_Chunk.empty ()) return wait_result::enEmpty; _action (& m_Chunk, args...); return wait_result::enMore; } template wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action, Args && ... args) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (! m_Chunk.empty ()) { _action (& m_Chunk, args...); return wait_result::enMore; } { auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); }); if (! rc) return wait_result::enTimeOut; } if (m_bQuit) return wait_result::enQuit; if (m_Chunk.empty ()) return wait_result::enEmpty; _action (& m_Chunk, args...); return wait_result::enMore; } wait_result wait () { std::function action = [ ] (auto *) { }; auto rc = wait (action); return rc; } template wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur) { std::function action = [ ] (auto *) { }; auto rc = wait_for (_dur, action); return rc; } public: void notify_all () { m_Notify.notify_all (); } void notify_one () { m_Notify.notify_one (); } public: // 入队 size_t push (const tItem & value) { std::lock_guard lk (m_Mutex); m_Chunk.push_back (value); auto sz = m_Chunk.size (); m_Notify.notify_one (); return sz; } size_t push (tItem && value) { std::lock_guard lk (m_Mutex); m_Chunk.push_back (std::move (value)); auto sz = m_Chunk.size (); m_Notify.notify_one (); return sz; } public: // 出队. 如队列为空则等待, 并且要检查 m_bQuit // 检查 m_bQuit wait_result wait_pop (tItem & value) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) m_Notify.wait (lk, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); }); if (m_bQuit) return wait_result::enQuit; if (m_Chunk.empty ()) return wait_result::enEmpty; value = std::move (m_Chunk.front ()); m_Chunk.pop_front (); if (m_Chunk.empty ()) return wait_result::enOne; return wait_result::enMore; } // 检查 m_bQuit template wait_result wait_pop (tItem & value, const std::chrono::duration <_Rep, _Period> & _dur) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) { auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); }); if (! rc) return wait_result::enTimeOut; } if (m_bQuit) return wait_result::enQuit; if (m_Chunk.empty ()) return wait_result::enEmpty; value = std::move (m_Chunk.front ()); m_Chunk.pop_front (); if (m_Chunk.empty ()) return wait_result::enOne; return wait_result::enMore; } public: // 不检查 m_bQuit bool try_pop (tItem & value) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; value = std::move (m_Chunk.front ()); m_Chunk.pop_front (); return true; } [[nodiscard]] std::optional try_pop () { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return std::nullopt; std::optional value = std::move (m_Chunk.front ()); m_Chunk.pop_front (); return value; } // 不检查 m_bQuit int try_swap (tContainer & to) { std::unique_lock lk (m_Mutex); m_Chunk.swap (to); return static_cast (to.size ()); } [[nodiscard]] tContainer try_swap () { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return tContainer (); tContainer to; m_Chunk.swap (to); return std::move (to); } // 不检查 m_bQuit int try_copy (tContainer & to) { std::unique_lock lk (m_Mutex); to = m_Chunk; return static_cast (to.size ()); } [[nodiscard]] tContainer try_copy () { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return tContainer (); tContainer to = m_Chunk; return std::move (to); } public: // 不检查 m_bQuit // 把所有元素都弹出, 每个元素作为参数, 用指定的函数执行 template int try_pop_all (_Act _onEach, Args && ... args) { std::unique_lock lk (this->m_Mutex); int sz = static_cast (this->m_Chunk.size ()); if (sz <= 0) return 0; if (sz == 1) { auto Item = std::move (this->m_Chunk.front ()); this->m_Chunk.clear (); lk.unlock (); _onEach (Item, args...); return 1; } container to; this->m_Chunk.swap (to); lk.unlock (); for (auto & Item : to) _onEach (Item, args...); return static_cast (to.size ()); } public: // 不检查 m_bQuit [[nodiscard]] bool find (const tItem & _val) const { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; auto b = m_Chunk.begin (); auto e = m_Chunk.end (); auto iter = std::find (b, e, _val); return (iter != e); } // 用指定的谓词查找 // 如果能找到, 返回 true // 如果未找到, 返回 false template [[nodiscard]] bool find_if (_Pr _Pred) const { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; auto b = m_Chunk.begin (); auto e = m_Chunk.end (); auto iter = std::find_if (b, e, _Pred); return (iter != e); } public: // 锁定, 然后执行 Action template auto action (_Act _action, Args && ... args) -> decltype (_action (& m_Chunk, args...)) { std::unique_lock lk (m_Mutex); return _action (& m_Chunk, args...); } // 用指定的谓词查找 // 如果能找到, 就执行 Action, 然后返回 true // 如果未找到, 不执行 Action, 直接返回 false template bool action_if (_Pr _Pred, _Act _action, Args && ... args) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; auto b = m_Chunk.begin (); auto e = m_Chunk.end (); auto iter = std::find_if (b, e, _Pred); if (iter == e) return false; _action (& m_Chunk, iter, args...); return true; } // 对首项执行指定的函数/行为. // 如果为空, 返回 0; 否则返回 1 template int on_front (_Act _action, Args && ... args) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return 0; _action (m_Chunk.front (), args...); return 1; } // 对末项行指定的函数/行为. // 如果为空, 返回 0; 否则返回 1 template int on_back (_Act _action, Args && ... args) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return 0; _action (m_Chunk.back (), args...); return 1; } // 对所有的元素迭代 // 返回: 元素数量 template int for_each (_Act _onEach, Args && ... args) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return 0; for (auto & Item : m_Chunk) _onEach (Item, args...); return static_cast (m_Chunk.size ()); } // 对所有的元素迭代, 用谓词判断是否继续, 如果谓词返回 true 就继续迭代, 否则停止迭代 // 返回: 执行了多少次 action template int for_each_if (_Pr _Pred) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return 0; int nCount = 0; for (auto & Item : m_Chunk) { if (!_Pred (Item)) break; nCount ++; } return nCount; } // 对所有的元素迭代, 用谓词判断是否继续, 如果谓词返回 true 就继续迭代, 否则停止迭代 // 返回: 执行了多少次 action template int for_each_if (_Pr _Pred, _Act _onEach, Args && ... args) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return 0; int nCount = 0; for (auto & Item : m_Chunk) { if (!_Pred (Item)) break; _onEach (Item, args...); nCount ++; } return nCount; } public: // 用指定的值查找 // 如果能找到, 就删除, 然后返回 true // 如果未找到, 不删除, 直接返回 false bool erase (const tItem & _val) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; auto b = m_Chunk.begin (); auto e = m_Chunk.end (); auto iter = std::find (b, e, _val); if (iter == e) return false; m_Chunk.erase (iter); return true; } // 用指定的谓词查找 // 如果能找到, 就删除, 然后返回 true // 如果未找到, 不删除, 直接返回 false template bool erase_if (_Pr _Pred) { std::unique_lock lk (m_Mutex); if (m_Chunk.empty ()) return false; auto b = m_Chunk.begin (); auto e = m_Chunk.end (); auto iter = std::find_if (b, e, _Pred); if (iter == e) return false; m_Chunk.erase (iter); return true; } public: // 复制一个没有锁的列表 auto clone () const -> tContainer { std::unique_lock lk (m_Mutex); return tContainer (m_Chunk); } }; template class ExclusiveList : public ExclusiveBase > { public: template void try_sort (_Pr _Pred) { std::unique_lock lk (this->m_Mutex); if (this->m_Chunk.size () <= 1) return; this->m_Chunk.sort (_Pred); } }; template class ExclusiveVector : public ExclusiveBase > { public: template void try_sort (_Pr _Pred) { std::unique_lock lk (this->m_Mutex); if (this->m_Chunk.size () <= 1) return; std::sort (this->m_Chunk.begin (), this->m_Chunk.end (), _Pred); } }; template class ExclusiveDeque : public ExclusiveBase > { public: template void try_sort (_Pr _Pred) { std::unique_lock lk (this->m_Mutex); if (this->m_Chunk.size () <= 1) return; std::sort (this->m_Chunk.begin (), this->m_Chunk.end (), _Pred); } }; // 模板别名 template using ExclusiveListOfPtr = ExclusiveList < std::unique_ptr >; template using ExclusiveVectorOfPtr = ExclusiveVector < std::unique_ptr >; template using ExclusiveDequeOfPtr = ExclusiveDeque < std::unique_ptr >; // template using ExclusiveQueue = ExclusiveBase < T, std::queue >; // template using ExclusiveQueueOfPtr = ExclusiveQueue < std::unique_ptr >; } } namespace Utility { namespace Concurrent { //----------------------------------------------------------------------------- // ExclusiveOptional.tlh // 允许互斥访问 std::optional //----------------------------------------------------------------------------- template class ExclusiveOptional { using value_type = T; using return_type = std::optional ; protected: std::optional m_Option; mutable std::mutex m_Mutex; std::condition_variable m_Notify; bool m_bQuit = false; public: ExclusiveOptional () = default; private: // 禁止拷贝构造函数 ExclusiveOptional (const ExclusiveOptional & h) = delete; // 禁止复制 ExclusiveOptional & operator = (const ExclusiveOptional & h) = delete; public: void attach (T && object) { std::lock_guard lk (m_Mutex); m_Option = std::move (object); m_Notify.notify_one (); } void attach (const T & object) { std::lock_guard lk (m_Mutex); m_Option = object; m_Notify.notify_one (); } return_type detach () { std::lock_guard lk (m_Mutex); auto v = std::move (m_Option); m_Option.reset (); return v; } bool empty () const { std::lock_guard lk (m_Mutex); return ! m_Option.has_value (); } void quit () { m_bQuit = true; m_Notify.notify_all (); } void clear () { std::lock_guard lk (m_Mutex); m_Option.reset (); } public: // 检查 m_bQuit ! 等待非空, 然后执行相应的 Action template wait_result wait (_Act _action) { return do_wait (_action, false); } template wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action) { return do_wait_for (_dur, _action, false); } template wait_result wait_pop (_Act _action) { return do_wait (_action, true); } template wait_result wait_pop_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action) { return do_wait_for (_action, true); } protected: template wait_result do_wait (_Act _action, bool clearAfterAction = false) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (m_Option) { _action (m_Option.value ()); if (clearAfterAction) m_Option.reset (); return wait_result::enOne; } m_Notify.wait (lk, [ this ] { return (m_Option.has_value ()) || (m_bQuit); }); if (m_bQuit) return wait_result::enQuit; if (! m_Option) return wait_result::enEmpty; _action (m_Option.value ()); if (clearAfterAction) m_Option.reset (); return wait_result::enOne; } template wait_result do_wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action, bool clearAfterAction = false) { if (m_bQuit) return wait_result::enQuit; std::unique_lock lk (m_Mutex); if (m_Option) { _action (m_Option.value ()); if (clearAfterAction) m_Option.reset (); return wait_result::enOne; } { auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (m_Option.has_value ()) || (m_bQuit); }); if (! rc) return wait_result::enTimeOut; } if (m_bQuit) return wait_result::enQuit; if (! m_Option) return wait_result::enEmpty; _action (m_Option.value ()); if (clearAfterAction) m_Option.reset (); return wait_result::enOne; } }; } }