unbounded_ordering_queue.hpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2013.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file unbounded_ordering_queue.hpp
  9. * \author Andrey Semashev
  10. * \date 24.07.2011
  11. *
  12. * The header contains implementation of unbounded ordering record queueing strategy for
  13. * the asynchronous sink frontend.
  14. */
  15. #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
  16. #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #if defined(BOOST_LOG_NO_THREADS)
  22. #error Boost.Log: This header content is only supported in multithreaded environment
  23. #endif
  24. #include <queue>
  25. #include <vector>
  26. #include <boost/cstdint.hpp>
  27. #include <boost/move/core.hpp>
  28. #include <boost/move/utility.hpp>
  29. #include <boost/thread/locks.hpp>
  30. #include <boost/thread/mutex.hpp>
  31. #include <boost/thread/condition_variable.hpp>
  32. #include <boost/thread/thread_time.hpp>
  33. #include <boost/date_time/posix_time/posix_time_types.hpp>
  34. #include <boost/log/detail/timestamp.hpp>
  35. #include <boost/log/keywords/order.hpp>
  36. #include <boost/log/keywords/ordering_window.hpp>
  37. #include <boost/log/core/record_view.hpp>
  38. #include <boost/log/detail/header.hpp>
  39. namespace boost {
  40. BOOST_LOG_OPEN_NAMESPACE
  41. namespace sinks {
  42. /*!
  43. * \brief Unbounded ordering log record queueing strategy
  44. *
  45. * The \c unbounded_ordering_queue class is intended to be used with
  46. * the \c asynchronous_sink frontend as a log record queueing strategy.
  47. *
  48. * This strategy provides the following properties to the record queueing mechanism:
  49. *
  50. * \li The queue has no size limits.
  51. * \li The queue has a fixed latency window. This means that each log record put
  52. * into the queue will normally not be dequeued for a certain period of time.
  53. * \li The queue performs stable record ordering within the latency window.
  54. * The ordering predicate can be specified in the \c OrderT template parameter.
  55. *
  56. * Since this queue has no size limits, it may grow uncontrollably if sink backends
  57. * dequeue log records not fast enough. When this is an issue, it is recommended to
  58. * use one of the bounded strategies.
  59. */
  60. template< typename OrderT >
  61. class unbounded_ordering_queue
  62. {
  63. private:
  64. typedef boost::mutex mutex_type;
  65. //! Log record with enqueueing timestamp
  66. class enqueued_record
  67. {
  68. BOOST_COPYABLE_AND_MOVABLE(enqueued_record)
  69. public:
  70. //! Ordering predicate
  71. struct order :
  72. public OrderT
  73. {
  74. typedef typename OrderT::result_type result_type;
  75. order() {}
  76. order(order const& that) : OrderT(static_cast< OrderT const& >(that)) {}
  77. order(OrderT const& that) : OrderT(that) {}
  78. result_type operator() (enqueued_record const& left, enqueued_record const& right) const
  79. {
  80. // std::priority_queue requires ordering with semantics of std::greater, so we swap arguments
  81. return OrderT::operator() (right.m_record, left.m_record);
  82. }
  83. };
  84. boost::log::aux::timestamp m_timestamp;
  85. record_view m_record;
  86. enqueued_record(enqueued_record const& that) : m_timestamp(that.m_timestamp), m_record(that.m_record)
  87. {
  88. }
  89. enqueued_record(BOOST_RV_REF(enqueued_record) that) :
  90. m_timestamp(that.m_timestamp),
  91. m_record(boost::move(that.m_record))
  92. {
  93. }
  94. explicit enqueued_record(record_view const& rec) :
  95. m_timestamp(boost::log::aux::get_timestamp()),
  96. m_record(rec)
  97. {
  98. }
  99. enqueued_record& operator= (BOOST_COPY_ASSIGN_REF(enqueued_record) that)
  100. {
  101. m_timestamp = that.m_timestamp;
  102. m_record = that.m_record;
  103. return *this;
  104. }
  105. enqueued_record& operator= (BOOST_RV_REF(enqueued_record) that)
  106. {
  107. m_timestamp = that.m_timestamp;
  108. m_record = boost::move(that.m_record);
  109. return *this;
  110. }
  111. };
  112. typedef std::priority_queue<
  113. enqueued_record,
  114. std::vector< enqueued_record >,
  115. typename enqueued_record::order
  116. > queue_type;
  117. private:
  118. //! Ordering window duration, in milliseconds
  119. const uint64_t m_ordering_window;
  120. //! Synchronization mutex
  121. mutex_type m_mutex;
  122. //! Condition for blocking
  123. condition_variable m_cond;
  124. //! Thread-safe queue
  125. queue_type m_queue;
  126. //! Interruption flag
  127. bool m_interruption_requested;
  128. public:
  129. /*!
  130. * Returns ordering window size specified during initialization
  131. */
  132. posix_time::time_duration get_ordering_window() const
  133. {
  134. return posix_time::milliseconds(m_ordering_window);
  135. }
  136. /*!
  137. * Returns default ordering window size.
  138. * The default window size is specific to the operating system thread scheduling mechanism.
  139. */
  140. static posix_time::time_duration get_default_ordering_window()
  141. {
  142. // The main idea behind this parameter is that the ordering window should be large enough
  143. // to allow the frontend to order records from different threads on an attribute
  144. // that contains system time. Thus this value should be:
  145. // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
  146. // For instance, on Windows it defaults to around 15-16 ms.
  147. // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
  148. // switch threads on any known OS. It can be tuned for other platforms as needed.
  149. return posix_time::milliseconds(30);
  150. }
  151. protected:
  152. //! Initializing constructor
  153. template< typename ArgsT >
  154. explicit unbounded_ordering_queue(ArgsT const& args) :
  155. m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
  156. m_queue(args[keywords::order]),
  157. m_interruption_requested(false)
  158. {
  159. }
  160. //! Enqueues log record to the queue
  161. void enqueue(record_view const& rec)
  162. {
  163. lock_guard< mutex_type > lock(m_mutex);
  164. enqueue_unlocked(rec);
  165. }
  166. //! Attempts to enqueue log record to the queue
  167. bool try_enqueue(record_view const& rec)
  168. {
  169. unique_lock< mutex_type > lock(m_mutex, try_to_lock);
  170. if (lock.owns_lock())
  171. {
  172. enqueue_unlocked(rec);
  173. return true;
  174. }
  175. else
  176. return false;
  177. }
  178. //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
  179. bool try_dequeue_ready(record_view& rec)
  180. {
  181. lock_guard< mutex_type > lock(m_mutex);
  182. if (!m_queue.empty())
  183. {
  184. const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
  185. enqueued_record const& elem = m_queue.top();
  186. if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
  187. {
  188. // We got a new element
  189. rec = elem.m_record;
  190. m_queue.pop();
  191. return true;
  192. }
  193. }
  194. return false;
  195. }
  196. //! Attempts to dequeue log record from the queue, does not block.
  197. bool try_dequeue(record_view& rec)
  198. {
  199. lock_guard< mutex_type > lock(m_mutex);
  200. if (!m_queue.empty())
  201. {
  202. enqueued_record const& elem = m_queue.top();
  203. rec = elem.m_record;
  204. m_queue.pop();
  205. return true;
  206. }
  207. return false;
  208. }
  209. //! Dequeues log record from the queue, blocks if no log records are ready to be processed
  210. bool dequeue_ready(record_view& rec)
  211. {
  212. unique_lock< mutex_type > lock(m_mutex);
  213. while (!m_interruption_requested)
  214. {
  215. if (!m_queue.empty())
  216. {
  217. const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
  218. enqueued_record const& elem = m_queue.top();
  219. const uint64_t difference = (now - elem.m_timestamp).milliseconds();
  220. if (difference >= m_ordering_window)
  221. {
  222. // We got a new element
  223. rec = elem.m_record;
  224. m_queue.pop();
  225. return true;
  226. }
  227. else
  228. {
  229. // Wait until the element becomes ready to be processed
  230. m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
  231. }
  232. }
  233. else
  234. {
  235. // Wait for an element to come
  236. m_cond.wait(lock);
  237. }
  238. }
  239. m_interruption_requested = false;
  240. return false;
  241. }
  242. //! Wakes a thread possibly blocked in the \c dequeue method
  243. void interrupt_dequeue()
  244. {
  245. lock_guard< mutex_type > lock(m_mutex);
  246. m_interruption_requested = true;
  247. m_cond.notify_one();
  248. }
  249. private:
  250. //! Enqueues a log record
  251. void enqueue_unlocked(record_view const& rec)
  252. {
  253. const bool was_empty = m_queue.empty();
  254. m_queue.push(enqueued_record(rec));
  255. if (was_empty)
  256. m_cond.notify_one();
  257. }
  258. };
  259. } // namespace sinks
  260. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  261. } // namespace boost
  262. #include <boost/log/detail/footer.hpp>
  263. #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_