123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- #pragma once
- #include <memory>
- #include <list>
- #include <vector>
- #include <deque>
- #include <map>
- #include <optional>
- #include <chrono>
- #include <algorithm>
- #include <condition_variable>
- //namespace UCT = ::Utility::Concurrent;
- namespace Utility
- {
- namespace Concurrent
- {
- enum class wait_result
- {
- enQuit = -2, // 收到 Quit 信号, list 状态未知
- enTimeOut = -1, // 等待超时, list 状态未知
- enEmpty = 0, // 收到通知, list 为空
- enOne = 1, // 收到通知, wait_pop 返回 value 有效, 返回后, list 为空
- enMore = 2, // 收到通知, wait_pop 返回 value 有效, 返回后, list 还有值
- // operator bool () const = delete;
- };
- }
- }
- namespace Utility
- {
- namespace Concurrent
- {
- //-----------------------------------------------------------------------------
- // ExclusiveBase
- //-----------------------------------------------------------------------------
- template <typename tItem, typename tContainer>
- class ExclusiveBase
- {
- protected:
- tContainer m_Chunk;
- mutable std::mutex m_Mutex;
- std::condition_variable m_Notify;
- bool m_bQuit = false;
- public:
- using container = typename tContainer;
- using reference = typename tContainer::reference;
- using const_reference = typename tContainer::const_reference;
- // using const_iterator = typename tContainer::const_iterator;
- public:
- ExclusiveBase () = default;
- ExclusiveBase (const ExclusiveBase &) = delete;
- ExclusiveBase (ExclusiveBase &&) = delete;
- virtual ~ExclusiveBase ()
- {
- clear ();
- }
- ExclusiveBase & operator = (const ExclusiveBase &) = delete;
- ExclusiveBase & operator = (ExclusiveBase &&) = delete;
- public:
- bool empty () const
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- return m_Chunk.empty ();
- }
- [[nodiscard]]
- size_t size () const
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- return m_Chunk.size ();
- }
- public:
- void quit ()
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_bQuit = true;
- m_Notify.notify_all ();
- }
- void clear ()
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Chunk.clear ();
- }
- public: // 检查 m_bQuit ! 等待非空, 然后执行相应的 Action
- template <typename _Act, typename ... Args>
- wait_result wait (_Act _action, Args && ... args)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (! m_Chunk.empty ())
- {
- _action (& m_Chunk, args...);
- return wait_result::enMore;
- }
- m_Notify.wait (lk, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); });
- if (m_bQuit) return wait_result::enQuit;
- if (m_Chunk.empty ()) return wait_result::enEmpty;
- _action (& m_Chunk, args...);
- return wait_result::enMore;
- }
- template <typename _Rep, typename _Period, typename _Act, typename ... Args>
- wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action, Args && ... args)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (! m_Chunk.empty ())
- {
- _action (& m_Chunk, args...);
- return wait_result::enMore;
- }
- {
- auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); });
- if (! rc) return wait_result::enTimeOut;
- }
- if (m_bQuit) return wait_result::enQuit;
- if (m_Chunk.empty ()) return wait_result::enEmpty;
- _action (& m_Chunk, args...);
- return wait_result::enMore;
- }
- wait_result wait ()
- {
- std::function <void (container *)> action = [ ] (auto *) { };
- auto rc = wait (action);
- return rc;
- }
- template <typename _Rep, typename _Period>
- wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur)
- {
- std::function <void (container *)> action = [ ] (auto *) { };
- auto rc = wait_for (_dur, action);
- return rc;
- }
- public:
- void notify_all ()
- {
- m_Notify.notify_all ();
- }
- void notify_one ()
- {
- m_Notify.notify_one ();
- }
- public: // 入队
- size_t push (const tItem & value)
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Chunk.push_back (value);
- auto sz = m_Chunk.size ();
- m_Notify.notify_one ();
- return sz;
- }
- size_t push (tItem && value)
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Chunk.push_back (std::move (value));
- auto sz = m_Chunk.size ();
- m_Notify.notify_one ();
- return sz;
- }
- public: // 出队. 如队列为空则等待, 并且要检查 m_bQuit
- // 检查 m_bQuit
- wait_result wait_pop (tItem & value)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ())
- m_Notify.wait (lk, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); });
- if (m_bQuit) return wait_result::enQuit;
- if (m_Chunk.empty ()) return wait_result::enEmpty;
- value = std::move (m_Chunk.front ());
- m_Chunk.pop_front ();
- if (m_Chunk.empty ()) return wait_result::enOne;
- return wait_result::enMore;
- }
- // 检查 m_bQuit
- template <typename _Rep, typename _Period>
- wait_result wait_pop (tItem & value, const std::chrono::duration <_Rep, _Period> & _dur)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ())
- {
- auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (! m_Chunk.empty ()) || (m_bQuit); });
- if (! rc) return wait_result::enTimeOut;
- }
- if (m_bQuit) return wait_result::enQuit;
- if (m_Chunk.empty ()) return wait_result::enEmpty;
- value = std::move (m_Chunk.front ());
- m_Chunk.pop_front ();
- if (m_Chunk.empty ()) return wait_result::enOne;
- return wait_result::enMore;
- }
- public:
- // 不检查 m_bQuit
- bool try_pop (tItem & value)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- value = std::move (m_Chunk.front ());
- m_Chunk.pop_front ();
- return true;
- }
- [[nodiscard]]
- std::optional <tItem> try_pop ()
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return std::nullopt;
- std::optional <tItem> value = std::move (m_Chunk.front ());
- m_Chunk.pop_front ();
- return value;
- }
- // 不检查 m_bQuit
- int try_swap (tContainer & to)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- m_Chunk.swap (to);
- return static_cast <int> (to.size ());
- }
- [[nodiscard]]
- tContainer try_swap ()
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return tContainer ();
- tContainer to;
- m_Chunk.swap (to);
- return std::move (to);
- }
- // 不检查 m_bQuit
- int try_copy (tContainer & to)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- to = m_Chunk;
- return static_cast <int> (to.size ());
- }
- [[nodiscard]]
- tContainer try_copy ()
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return tContainer ();
- tContainer to = m_Chunk;
- return std::move (to);
- }
- public: // 不检查 m_bQuit
- // 把所有元素都弹出, 每个元素作为参数, 用指定的函数执行
- template <typename _Act, typename ... Args>
- int try_pop_all (_Act _onEach, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (this->m_Mutex);
- int sz = static_cast <int> (this->m_Chunk.size ());
- if (sz <= 0) return 0;
- if (sz == 1)
- {
- auto Item = std::move (this->m_Chunk.front ());
- this->m_Chunk.clear ();
- lk.unlock ();
- _onEach (Item, args...);
- return 1;
- }
- container to;
- this->m_Chunk.swap (to);
- lk.unlock ();
- for (auto & Item : to)
- _onEach (Item, args...);
- return static_cast <int> (to.size ());
- }
- public: // 不检查 m_bQuit
- [[nodiscard]]
- bool find (const tItem & _val) const
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- auto b = m_Chunk.begin ();
- auto e = m_Chunk.end ();
- auto iter = std::find (b, e, _val);
- return (iter != e);
- }
- // 用指定的谓词查找
- // 如果能找到, 返回 true
- // 如果未找到, 返回 false
- template <typename _Pr>
- [[nodiscard]]
- bool find_if (_Pr _Pred) const
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- auto b = m_Chunk.begin ();
- auto e = m_Chunk.end ();
- auto iter = std::find_if (b, e, _Pred);
- return (iter != e);
- }
- public:
- // 锁定, 然后执行 Action
- template <typename _Act, typename ... Args>
- auto action (_Act _action, Args && ... args) -> decltype (_action (& m_Chunk, args...))
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- return _action (& m_Chunk, args...);
- }
- // 用指定的谓词查找
- // 如果能找到, 就执行 Action, 然后返回 true
- // 如果未找到, 不执行 Action, 直接返回 false
- template <typename _Pr, typename _Act, typename ... Args>
- bool action_if (_Pr _Pred, _Act _action, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- auto b = m_Chunk.begin ();
- auto e = m_Chunk.end ();
- auto iter = std::find_if (b, e, _Pred);
- if (iter == e) return false;
- _action (& m_Chunk, iter, args...);
- return true;
- }
- // 对首项执行指定的函数/行为.
- // 如果为空, 返回 0; 否则返回 1
- template <typename _Act, typename ... Args>
- int on_front (_Act _action, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return 0;
- _action (m_Chunk.front (), args...);
- return 1;
- }
- // 对末项行指定的函数/行为.
- // 如果为空, 返回 0; 否则返回 1
- template <typename _Act, typename ... Args>
- int on_back (_Act _action, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return 0;
- _action (m_Chunk.back (), args...);
- return 1;
- }
- // 对所有的元素迭代
- // 返回: 元素数量
- template <typename _Act, typename ... Args>
- int for_each (_Act _onEach, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return 0;
- for (auto & Item : m_Chunk)
- _onEach (Item, args...);
- return static_cast <int> (m_Chunk.size ());
- }
- // 对所有的元素迭代, 用谓词判断是否继续, 如果谓词返回 true 就继续迭代, 否则停止迭代
- // 返回: 执行了多少次 action
- template <typename _Pr>
- int for_each_if (_Pr _Pred)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return 0;
- int nCount = 0;
- for (auto & Item : m_Chunk)
- {
- if (!_Pred (Item))
- break;
- nCount ++;
- }
- return nCount;
- }
- // 对所有的元素迭代, 用谓词判断是否继续, 如果谓词返回 true 就继续迭代, 否则停止迭代
- // 返回: 执行了多少次 action
- template <typename _Pr, typename _Act, typename ... Args>
- int for_each_if (_Pr _Pred, _Act _onEach, Args && ... args)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return 0;
- int nCount = 0;
- for (auto & Item : m_Chunk)
- {
- if (!_Pred (Item))
- break;
- _onEach (Item, args...);
- nCount ++;
- }
- return nCount;
- }
- public:
- // 用指定的值查找
- // 如果能找到, 就删除, 然后返回 true
- // 如果未找到, 不删除, 直接返回 false
- bool erase (const tItem & _val)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- auto b = m_Chunk.begin ();
- auto e = m_Chunk.end ();
- auto iter = std::find (b, e, _val);
- if (iter == e) return false;
- m_Chunk.erase (iter);
- return true;
- }
- // 用指定的谓词查找
- // 如果能找到, 就删除, 然后返回 true
- // 如果未找到, 不删除, 直接返回 false
- template <typename _Pr>
- bool erase_if (_Pr _Pred)
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Chunk.empty ()) return false;
- auto b = m_Chunk.begin ();
- auto e = m_Chunk.end ();
- auto iter = std::find_if (b, e, _Pred);
- if (iter == e) return false;
- m_Chunk.erase (iter);
- return true;
- }
- public:
- // 复制一个没有锁的列表
- auto clone () const -> tContainer
- {
- std::unique_lock <std::mutex> lk (m_Mutex);
- return tContainer (m_Chunk);
- }
- };
- template <class T>
- class ExclusiveList : public ExclusiveBase <T, std::list <T> >
- {
- public:
- template <class _Pr>
- void try_sort (_Pr _Pred)
- {
- std::unique_lock <std::mutex> lk (this->m_Mutex);
- if (this->m_Chunk.size () <= 1) return;
- this->m_Chunk.sort (_Pred);
- }
- };
- template <class T>
- class ExclusiveVector : public ExclusiveBase <T, std::vector <T> >
- {
- public:
- template <class _Pr>
- void try_sort (_Pr _Pred)
- {
- std::unique_lock <std::mutex> lk (this->m_Mutex);
- if (this->m_Chunk.size () <= 1) return;
- std::sort (this->m_Chunk.begin (), this->m_Chunk.end (), _Pred);
- }
- };
- template <class T>
- class ExclusiveDeque : public ExclusiveBase <T, std::deque <T> >
- {
- public:
- template <class _Pr>
- void try_sort (_Pr _Pred)
- {
- std::unique_lock <std::mutex> lk (this->m_Mutex);
- if (this->m_Chunk.size () <= 1) return;
- std::sort (this->m_Chunk.begin (), this->m_Chunk.end (), _Pred);
- }
- };
- // 模板别名
- template <typename T> using ExclusiveListOfPtr = ExclusiveList < std::unique_ptr <T> >;
- template <typename T> using ExclusiveVectorOfPtr = ExclusiveVector < std::unique_ptr <T> >;
- template <typename T> using ExclusiveDequeOfPtr = ExclusiveDeque < std::unique_ptr <T> >;
- // template <typename T> using ExclusiveQueue = ExclusiveBase < T, std::queue <T> >;
- // template <typename T> using ExclusiveQueueOfPtr = ExclusiveQueue < std::unique_ptr <T> >;
- }
- }
- namespace Utility
- {
- namespace Concurrent
- {
- //-----------------------------------------------------------------------------
- // ExclusiveOptional.tlh
- // 允许互斥访问 std::optional
- //-----------------------------------------------------------------------------
- template <typename T>
- class ExclusiveOptional
- {
- using value_type = T;
- using return_type = std::optional <T>;
- protected:
- std::optional <T> m_Option;
- mutable std::mutex m_Mutex;
- std::condition_variable m_Notify;
- bool m_bQuit = false;
- public:
- ExclusiveOptional () = default;
- private:
- // 禁止拷贝构造函数
- ExclusiveOptional (const ExclusiveOptional & h) = delete;
- // 禁止复制
- ExclusiveOptional & operator = (const ExclusiveOptional & h) = delete;
- public:
- void attach (T && object)
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Option = std::move (object);
- m_Notify.notify_one ();
- }
- void attach (const T & object)
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Option = object;
- m_Notify.notify_one ();
- }
- return_type detach ()
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- auto v = std::move (m_Option);
- m_Option.reset ();
- return v;
- }
- bool empty () const
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- return ! m_Option.has_value ();
- }
- void quit ()
- {
- m_bQuit = true;
- m_Notify.notify_all ();
- }
- void clear ()
- {
- std::lock_guard <std::mutex> lk (m_Mutex);
- m_Option.reset ();
- }
- public: // 检查 m_bQuit ! 等待非空, 然后执行相应的 Action
- template <typename _Act>
- wait_result wait (_Act _action)
- {
- return do_wait (_action, false);
- }
- template <typename _Rep, typename _Period, typename _Act>
- wait_result wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action)
- {
- return do_wait_for (_dur, _action, false);
- }
- template <typename _Act>
- wait_result wait_pop (_Act _action)
- {
- return do_wait (_action, true);
- }
- template <typename _Rep, typename _Period, typename _Act>
- wait_result wait_pop_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action)
- {
- return do_wait_for (_action, true);
- }
- protected:
- template <typename _Act>
- wait_result do_wait (_Act _action, bool clearAfterAction = false)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Option)
- {
- _action (m_Option.value ());
- if (clearAfterAction)
- m_Option.reset ();
- return wait_result::enOne;
- }
- m_Notify.wait (lk, [ this ] { return (m_Option.has_value ()) || (m_bQuit); });
- if (m_bQuit) return wait_result::enQuit;
- if (! m_Option) return wait_result::enEmpty;
- _action (m_Option.value ());
- if (clearAfterAction)
- m_Option.reset ();
- return wait_result::enOne;
- }
- template <typename _Rep, typename _Period, typename _Act>
- wait_result do_wait_for (const std::chrono::duration <_Rep, _Period> & _dur, _Act _action, bool clearAfterAction = false)
- {
- if (m_bQuit) return wait_result::enQuit;
- std::unique_lock <std::mutex> lk (m_Mutex);
- if (m_Option)
- {
- _action (m_Option.value ());
- if (clearAfterAction)
- m_Option.reset ();
- return wait_result::enOne;
- }
- {
- auto rc = m_Notify.wait_for (lk, _dur, [ this ] { return (m_Option.has_value ()) || (m_bQuit); });
- if (! rc) return wait_result::enTimeOut;
- }
- if (m_bQuit) return wait_result::enQuit;
- if (! m_Option) return wait_result::enEmpty;
- _action (m_Option.value ());
- if (clearAfterAction)
- m_Option.reset ();
- return wait_result::enOne;
- }
- };
- }
- }
|