condition.hpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  11. #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  12. #include <boost/interprocess/detail/config_begin.hpp>
  13. #include <boost/interprocess/detail/workaround.hpp>
  14. #include <boost/interprocess/sync/spin/mutex.hpp>
  15. #include <boost/interprocess/detail/posix_time_types_wrk.hpp>
  16. #include <boost/interprocess/detail/atomic.hpp>
  17. #include <boost/interprocess/sync/scoped_lock.hpp>
  18. #include <boost/interprocess/exceptions.hpp>
  19. #include <boost/interprocess/detail/os_thread_functions.hpp>
  20. #include <boost/interprocess/sync/spin/wait.hpp>
  21. #include <boost/move/move.hpp>
  22. #include <boost/cstdint.hpp>
  23. namespace boost {
  24. namespace interprocess {
  25. namespace ipcdetail {
  26. class spin_condition
  27. {
  28. spin_condition(const spin_condition &);
  29. spin_condition &operator=(const spin_condition &);
  30. public:
  31. spin_condition();
  32. ~spin_condition();
  33. void notify_one();
  34. void notify_all();
  35. template <typename L>
  36. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
  37. {
  38. if(abs_time == boost::posix_time::pos_infin){
  39. this->wait(lock);
  40. return true;
  41. }
  42. if (!lock)
  43. throw lock_exception();
  44. return this->do_timed_wait(abs_time, *lock.mutex());
  45. }
  46. template <typename L, typename Pr>
  47. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
  48. {
  49. if(abs_time == boost::posix_time::pos_infin){
  50. this->wait(lock, pred);
  51. return true;
  52. }
  53. if (!lock)
  54. throw lock_exception();
  55. while (!pred()){
  56. if (!this->do_timed_wait(abs_time, *lock.mutex()))
  57. return pred();
  58. }
  59. return true;
  60. }
  61. template <typename L>
  62. void wait(L& lock)
  63. {
  64. if (!lock)
  65. throw lock_exception();
  66. do_wait(*lock.mutex());
  67. }
  68. template <typename L, typename Pr>
  69. void wait(L& lock, Pr pred)
  70. {
  71. if (!lock)
  72. throw lock_exception();
  73. while (!pred())
  74. do_wait(*lock.mutex());
  75. }
  76. template<class InterprocessMutex>
  77. void do_wait(InterprocessMutex &mut);
  78. template<class InterprocessMutex>
  79. bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  80. private:
  81. template<class InterprocessMutex>
  82. bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  83. enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
  84. spin_mutex m_enter_mut;
  85. volatile boost::uint32_t m_command;
  86. volatile boost::uint32_t m_num_waiters;
  87. void notify(boost::uint32_t command);
  88. };
  89. inline spin_condition::spin_condition()
  90. {
  91. //Note that this class is initialized to zero.
  92. //So zeroed memory can be interpreted as an initialized
  93. //condition variable
  94. m_command = SLEEP;
  95. m_num_waiters = 0;
  96. }
  97. inline spin_condition::~spin_condition()
  98. {
  99. //Trivial destructor
  100. }
  101. inline void spin_condition::notify_one()
  102. {
  103. this->notify(NOTIFY_ONE);
  104. }
  105. inline void spin_condition::notify_all()
  106. {
  107. this->notify(NOTIFY_ALL);
  108. }
  109. inline void spin_condition::notify(boost::uint32_t command)
  110. {
  111. //This mutex guarantees that no other thread can enter to the
  112. //do_timed_wait method logic, so that thread count will be
  113. //constant until the function writes a NOTIFY_ALL command.
  114. //It also guarantees that no other notification can be signaled
  115. //on this spin_condition before this one ends
  116. m_enter_mut.lock();
  117. //Return if there are no waiters
  118. if(!atomic_read32(&m_num_waiters)) {
  119. m_enter_mut.unlock();
  120. return;
  121. }
  122. //Notify that all threads should execute wait logic
  123. spin_wait swait;
  124. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
  125. swait.yield();
  126. }
  127. //The enter mutex will rest locked until the last waiting thread unlocks it
  128. }
  129. template<class InterprocessMutex>
  130. inline void spin_condition::do_wait(InterprocessMutex &mut)
  131. {
  132. this->do_timed_wait(false, boost::posix_time::ptime(), mut);
  133. }
  134. template<class InterprocessMutex>
  135. inline bool spin_condition::do_timed_wait
  136. (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
  137. {
  138. return this->do_timed_wait(true, abs_time, mut);
  139. }
  140. template<class InterprocessMutex>
  141. inline bool spin_condition::do_timed_wait(bool tout_enabled,
  142. const boost::posix_time::ptime &abs_time,
  143. InterprocessMutex &mut)
  144. {
  145. boost::posix_time::ptime now = microsec_clock::universal_time();
  146. if(tout_enabled){
  147. if(now >= abs_time) return false;
  148. }
  149. typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
  150. //The enter mutex guarantees that while executing a notification,
  151. //no other thread can execute the do_timed_wait method.
  152. {
  153. //---------------------------------------------------------------
  154. InternalLock lock;
  155. if(tout_enabled){
  156. InternalLock dummy(m_enter_mut, abs_time);
  157. lock = boost::move(dummy);
  158. }
  159. else{
  160. InternalLock dummy(m_enter_mut);
  161. lock = boost::move(dummy);
  162. }
  163. if(!lock)
  164. return false;
  165. //---------------------------------------------------------------
  166. //We increment the waiting thread count protected so that it will be
  167. //always constant when another thread enters the notification logic.
  168. //The increment marks this thread as "waiting on spin_condition"
  169. atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
  170. //We unlock the external mutex atomically with the increment
  171. mut.unlock();
  172. }
  173. //By default, we suppose that no timeout has happened
  174. bool timed_out = false, unlock_enter_mut= false;
  175. //Loop until a notification indicates that the thread should
  176. //exit or timeout occurs
  177. while(1){
  178. //The thread sleeps/spins until a spin_condition commands a notification
  179. //Notification occurred, we will lock the checking mutex so that
  180. spin_wait swait;
  181. while(atomic_read32(&m_command) == SLEEP){
  182. swait.yield();
  183. //Check for timeout
  184. if(tout_enabled){
  185. now = microsec_clock::universal_time();
  186. if(now >= abs_time){
  187. //If we can lock the mutex it means that no notification
  188. //is being executed in this spin_condition variable
  189. timed_out = m_enter_mut.try_lock();
  190. //If locking fails, indicates that another thread is executing
  191. //notification, so we play the notification game
  192. if(!timed_out){
  193. //There is an ongoing notification, we will try again later
  194. continue;
  195. }
  196. //No notification in execution, since enter mutex is locked.
  197. //We will execute time-out logic, so we will decrement count,
  198. //release the enter mutex and return false.
  199. break;
  200. }
  201. }
  202. }
  203. //If a timeout occurred, the mutex will not execute checking logic
  204. if(tout_enabled && timed_out){
  205. //Decrement wait count
  206. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  207. unlock_enter_mut = true;
  208. break;
  209. }
  210. else{
  211. boost::uint32_t result = atomic_cas32
  212. (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
  213. if(result == SLEEP){
  214. //Other thread has been notified and since it was a NOTIFY one
  215. //command, this thread must sleep again
  216. continue;
  217. }
  218. else if(result == NOTIFY_ONE){
  219. //If it was a NOTIFY_ONE command, only this thread should
  220. //exit. This thread has atomically marked command as sleep before
  221. //so no other thread will exit.
  222. //Decrement wait count.
  223. unlock_enter_mut = true;
  224. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  225. break;
  226. }
  227. else{
  228. //If it is a NOTIFY_ALL command, all threads should return
  229. //from do_timed_wait function. Decrement wait count.
  230. unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  231. //Check if this is the last thread of notify_all waiters
  232. //Only the last thread will release the mutex
  233. if(unlock_enter_mut){
  234. atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
  235. }
  236. break;
  237. }
  238. }
  239. }
  240. //Unlock the enter mutex if it is a single notification, if this is
  241. //the last notified thread in a notify_all or a timeout has occurred
  242. if(unlock_enter_mut){
  243. m_enter_mut.unlock();
  244. }
  245. //Lock external again before returning from the method
  246. mut.lock();
  247. return !timed_out;
  248. }
  249. } //namespace ipcdetail
  250. } //namespace interprocess
  251. } //namespace boost
  252. #include <boost/interprocess/detail/config_end.hpp>
  253. #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP