task_io_service.ipp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. //
  2. // detail/impl/task_io_service.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #if !defined(BOOST_ASIO_HAS_IOCP)
  17. #include <boost/asio/detail/event.hpp>
  18. #include <boost/asio/detail/limits.hpp>
  19. #include <boost/asio/detail/reactor.hpp>
  20. #include <boost/asio/detail/task_io_service.hpp>
  21. #include <boost/asio/detail/task_io_service_thread_info.hpp>
  22. #include <boost/asio/detail/push_options.hpp>
  23. namespace boost {
  24. namespace asio {
  25. namespace detail {
  26. struct task_io_service::task_cleanup
  27. {
  28. ~task_cleanup()
  29. {
  30. if (this_thread_->private_outstanding_work > 0)
  31. {
  32. boost::asio::detail::increment(
  33. task_io_service_->outstanding_work_,
  34. this_thread_->private_outstanding_work);
  35. }
  36. this_thread_->private_outstanding_work = 0;
  37. // Enqueue the completed operations and reinsert the task at the end of
  38. // the operation queue.
  39. lock_->lock();
  40. task_io_service_->task_interrupted_ = true;
  41. task_io_service_->op_queue_.push(this_thread_->private_op_queue);
  42. task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
  43. }
  44. task_io_service* task_io_service_;
  45. mutex::scoped_lock* lock_;
  46. thread_info* this_thread_;
  47. };
  48. struct task_io_service::work_cleanup
  49. {
  50. ~work_cleanup()
  51. {
  52. if (this_thread_->private_outstanding_work > 1)
  53. {
  54. boost::asio::detail::increment(
  55. task_io_service_->outstanding_work_,
  56. this_thread_->private_outstanding_work - 1);
  57. }
  58. else if (this_thread_->private_outstanding_work < 1)
  59. {
  60. task_io_service_->work_finished();
  61. }
  62. this_thread_->private_outstanding_work = 0;
  63. #if defined(BOOST_ASIO_HAS_THREADS)
  64. if (!this_thread_->private_op_queue.empty())
  65. {
  66. lock_->lock();
  67. task_io_service_->op_queue_.push(this_thread_->private_op_queue);
  68. }
  69. #endif // defined(BOOST_ASIO_HAS_THREADS)
  70. }
  71. task_io_service* task_io_service_;
  72. mutex::scoped_lock* lock_;
  73. thread_info* this_thread_;
  74. };
  75. task_io_service::task_io_service(
  76. boost::asio::io_service& io_service, std::size_t concurrency_hint)
  77. : boost::asio::detail::service_base<task_io_service>(io_service),
  78. one_thread_(concurrency_hint == 1),
  79. mutex_(),
  80. task_(0),
  81. task_interrupted_(true),
  82. outstanding_work_(0),
  83. stopped_(false),
  84. shutdown_(false),
  85. first_idle_thread_(0)
  86. {
  87. BOOST_ASIO_HANDLER_TRACKING_INIT;
  88. }
  89. void task_io_service::shutdown_service()
  90. {
  91. mutex::scoped_lock lock(mutex_);
  92. shutdown_ = true;
  93. lock.unlock();
  94. // Destroy handler objects.
  95. while (!op_queue_.empty())
  96. {
  97. operation* o = op_queue_.front();
  98. op_queue_.pop();
  99. if (o != &task_operation_)
  100. o->destroy();
  101. }
  102. // Reset to initial state.
  103. task_ = 0;
  104. }
  105. void task_io_service::init_task()
  106. {
  107. mutex::scoped_lock lock(mutex_);
  108. if (!shutdown_ && !task_)
  109. {
  110. task_ = &use_service<reactor>(this->get_io_service());
  111. op_queue_.push(&task_operation_);
  112. wake_one_thread_and_unlock(lock);
  113. }
  114. }
  115. std::size_t task_io_service::run(boost::system::error_code& ec)
  116. {
  117. ec = boost::system::error_code();
  118. if (outstanding_work_ == 0)
  119. {
  120. stop();
  121. return 0;
  122. }
  123. thread_info this_thread;
  124. event wakeup_event;
  125. this_thread.wakeup_event = &wakeup_event;
  126. this_thread.private_outstanding_work = 0;
  127. this_thread.next = 0;
  128. thread_call_stack::context ctx(this, this_thread);
  129. mutex::scoped_lock lock(mutex_);
  130. std::size_t n = 0;
  131. for (; do_run_one(lock, this_thread, ec); lock.lock())
  132. if (n != (std::numeric_limits<std::size_t>::max)())
  133. ++n;
  134. return n;
  135. }
  136. std::size_t task_io_service::run_one(boost::system::error_code& ec)
  137. {
  138. ec = boost::system::error_code();
  139. if (outstanding_work_ == 0)
  140. {
  141. stop();
  142. return 0;
  143. }
  144. thread_info this_thread;
  145. event wakeup_event;
  146. this_thread.wakeup_event = &wakeup_event;
  147. this_thread.private_outstanding_work = 0;
  148. this_thread.next = 0;
  149. thread_call_stack::context ctx(this, this_thread);
  150. mutex::scoped_lock lock(mutex_);
  151. return do_run_one(lock, this_thread, ec);
  152. }
  153. std::size_t task_io_service::poll(boost::system::error_code& ec)
  154. {
  155. ec = boost::system::error_code();
  156. if (outstanding_work_ == 0)
  157. {
  158. stop();
  159. return 0;
  160. }
  161. thread_info this_thread;
  162. this_thread.wakeup_event = 0;
  163. this_thread.private_outstanding_work = 0;
  164. this_thread.next = 0;
  165. thread_call_stack::context ctx(this, this_thread);
  166. mutex::scoped_lock lock(mutex_);
  167. #if defined(BOOST_ASIO_HAS_THREADS)
  168. // We want to support nested calls to poll() and poll_one(), so any handlers
  169. // that are already on a thread-private queue need to be put on to the main
  170. // queue now.
  171. if (one_thread_)
  172. if (thread_info* outer_thread_info = ctx.next_by_key())
  173. op_queue_.push(outer_thread_info->private_op_queue);
  174. #endif // defined(BOOST_ASIO_HAS_THREADS)
  175. std::size_t n = 0;
  176. for (; do_poll_one(lock, this_thread, ec); lock.lock())
  177. if (n != (std::numeric_limits<std::size_t>::max)())
  178. ++n;
  179. return n;
  180. }
  181. std::size_t task_io_service::poll_one(boost::system::error_code& ec)
  182. {
  183. ec = boost::system::error_code();
  184. if (outstanding_work_ == 0)
  185. {
  186. stop();
  187. return 0;
  188. }
  189. thread_info this_thread;
  190. this_thread.wakeup_event = 0;
  191. this_thread.private_outstanding_work = 0;
  192. this_thread.next = 0;
  193. thread_call_stack::context ctx(this, this_thread);
  194. mutex::scoped_lock lock(mutex_);
  195. #if defined(BOOST_ASIO_HAS_THREADS)
  196. // We want to support nested calls to poll() and poll_one(), so any handlers
  197. // that are already on a thread-private queue need to be put on to the main
  198. // queue now.
  199. if (one_thread_)
  200. if (thread_info* outer_thread_info = ctx.next_by_key())
  201. op_queue_.push(outer_thread_info->private_op_queue);
  202. #endif // defined(BOOST_ASIO_HAS_THREADS)
  203. return do_poll_one(lock, this_thread, ec);
  204. }
  205. void task_io_service::stop()
  206. {
  207. mutex::scoped_lock lock(mutex_);
  208. stop_all_threads(lock);
  209. }
  210. bool task_io_service::stopped() const
  211. {
  212. mutex::scoped_lock lock(mutex_);
  213. return stopped_;
  214. }
  215. void task_io_service::reset()
  216. {
  217. mutex::scoped_lock lock(mutex_);
  218. stopped_ = false;
  219. }
  220. void task_io_service::post_immediate_completion(
  221. task_io_service::operation* op, bool is_continuation)
  222. {
  223. #if defined(BOOST_ASIO_HAS_THREADS)
  224. if (one_thread_ || is_continuation)
  225. {
  226. if (thread_info* this_thread = thread_call_stack::contains(this))
  227. {
  228. ++this_thread->private_outstanding_work;
  229. this_thread->private_op_queue.push(op);
  230. return;
  231. }
  232. }
  233. #endif // defined(BOOST_ASIO_HAS_THREADS)
  234. work_started();
  235. mutex::scoped_lock lock(mutex_);
  236. op_queue_.push(op);
  237. wake_one_thread_and_unlock(lock);
  238. }
  239. void task_io_service::post_deferred_completion(task_io_service::operation* op)
  240. {
  241. #if defined(BOOST_ASIO_HAS_THREADS)
  242. if (one_thread_)
  243. {
  244. if (thread_info* this_thread = thread_call_stack::contains(this))
  245. {
  246. this_thread->private_op_queue.push(op);
  247. return;
  248. }
  249. }
  250. #endif // defined(BOOST_ASIO_HAS_THREADS)
  251. mutex::scoped_lock lock(mutex_);
  252. op_queue_.push(op);
  253. wake_one_thread_and_unlock(lock);
  254. }
  255. void task_io_service::post_deferred_completions(
  256. op_queue<task_io_service::operation>& ops)
  257. {
  258. if (!ops.empty())
  259. {
  260. #if defined(BOOST_ASIO_HAS_THREADS)
  261. if (one_thread_)
  262. {
  263. if (thread_info* this_thread = thread_call_stack::contains(this))
  264. {
  265. this_thread->private_op_queue.push(ops);
  266. return;
  267. }
  268. }
  269. #endif // defined(BOOST_ASIO_HAS_THREADS)
  270. mutex::scoped_lock lock(mutex_);
  271. op_queue_.push(ops);
  272. wake_one_thread_and_unlock(lock);
  273. }
  274. }
  275. void task_io_service::do_dispatch(
  276. task_io_service::operation* op)
  277. {
  278. work_started();
  279. mutex::scoped_lock lock(mutex_);
  280. op_queue_.push(op);
  281. wake_one_thread_and_unlock(lock);
  282. }
  283. void task_io_service::abandon_operations(
  284. op_queue<task_io_service::operation>& ops)
  285. {
  286. op_queue<task_io_service::operation> ops2;
  287. ops2.push(ops);
  288. }
  289. std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
  290. task_io_service::thread_info& this_thread,
  291. const boost::system::error_code& ec)
  292. {
  293. while (!stopped_)
  294. {
  295. if (!op_queue_.empty())
  296. {
  297. // Prepare to execute first handler from queue.
  298. operation* o = op_queue_.front();
  299. op_queue_.pop();
  300. bool more_handlers = (!op_queue_.empty());
  301. if (o == &task_operation_)
  302. {
  303. task_interrupted_ = more_handlers;
  304. if (more_handlers && !one_thread_)
  305. {
  306. if (!wake_one_idle_thread_and_unlock(lock))
  307. lock.unlock();
  308. }
  309. else
  310. lock.unlock();
  311. task_cleanup on_exit = { this, &lock, &this_thread };
  312. (void)on_exit;
  313. // Run the task. May throw an exception. Only block if the operation
  314. // queue is empty and we're not polling, otherwise we want to return
  315. // as soon as possible.
  316. task_->run(!more_handlers, this_thread.private_op_queue);
  317. }
  318. else
  319. {
  320. std::size_t task_result = o->task_result_;
  321. if (more_handlers && !one_thread_)
  322. wake_one_thread_and_unlock(lock);
  323. else
  324. lock.unlock();
  325. // Ensure the count of outstanding work is decremented on block exit.
  326. work_cleanup on_exit = { this, &lock, &this_thread };
  327. (void)on_exit;
  328. // Complete the operation. May throw an exception. Deletes the object.
  329. o->complete(*this, ec, task_result);
  330. return 1;
  331. }
  332. }
  333. else
  334. {
  335. // Nothing to run right now, so just wait for work to do.
  336. this_thread.next = first_idle_thread_;
  337. first_idle_thread_ = &this_thread;
  338. this_thread.wakeup_event->clear(lock);
  339. this_thread.wakeup_event->wait(lock);
  340. }
  341. }
  342. return 0;
  343. }
  344. std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
  345. task_io_service::thread_info& this_thread,
  346. const boost::system::error_code& ec)
  347. {
  348. if (stopped_)
  349. return 0;
  350. operation* o = op_queue_.front();
  351. if (o == &task_operation_)
  352. {
  353. op_queue_.pop();
  354. lock.unlock();
  355. {
  356. task_cleanup c = { this, &lock, &this_thread };
  357. (void)c;
  358. // Run the task. May throw an exception. Only block if the operation
  359. // queue is empty and we're not polling, otherwise we want to return
  360. // as soon as possible.
  361. task_->run(false, this_thread.private_op_queue);
  362. }
  363. o = op_queue_.front();
  364. if (o == &task_operation_)
  365. {
  366. wake_one_idle_thread_and_unlock(lock);
  367. return 0;
  368. }
  369. }
  370. if (o == 0)
  371. return 0;
  372. op_queue_.pop();
  373. bool more_handlers = (!op_queue_.empty());
  374. std::size_t task_result = o->task_result_;
  375. if (more_handlers && !one_thread_)
  376. wake_one_thread_and_unlock(lock);
  377. else
  378. lock.unlock();
  379. // Ensure the count of outstanding work is decremented on block exit.
  380. work_cleanup on_exit = { this, &lock, &this_thread };
  381. (void)on_exit;
  382. // Complete the operation. May throw an exception. Deletes the object.
  383. o->complete(*this, ec, task_result);
  384. return 1;
  385. }
  386. void task_io_service::stop_all_threads(
  387. mutex::scoped_lock& lock)
  388. {
  389. stopped_ = true;
  390. while (first_idle_thread_)
  391. {
  392. thread_info* idle_thread = first_idle_thread_;
  393. first_idle_thread_ = idle_thread->next;
  394. idle_thread->next = 0;
  395. idle_thread->wakeup_event->signal(lock);
  396. }
  397. if (!task_interrupted_ && task_)
  398. {
  399. task_interrupted_ = true;
  400. task_->interrupt();
  401. }
  402. }
  403. bool task_io_service::wake_one_idle_thread_and_unlock(
  404. mutex::scoped_lock& lock)
  405. {
  406. if (first_idle_thread_)
  407. {
  408. thread_info* idle_thread = first_idle_thread_;
  409. first_idle_thread_ = idle_thread->next;
  410. idle_thread->next = 0;
  411. idle_thread->wakeup_event->signal_and_unlock(lock);
  412. return true;
  413. }
  414. return false;
  415. }
  416. void task_io_service::wake_one_thread_and_unlock(
  417. mutex::scoped_lock& lock)
  418. {
  419. if (!wake_one_idle_thread_and_unlock(lock))
  420. {
  421. if (!task_interrupted_ && task_)
  422. {
  423. task_interrupted_ = true;
  424. task_->interrupt();
  425. }
  426. lock.unlock();
  427. }
  428. }
  429. } // namespace detail
  430. } // namespace asio
  431. } // namespace boost
  432. #include <boost/asio/detail/pop_options.hpp>
  433. #endif // !defined(BOOST_ASIO_HAS_IOCP)
  434. #endif // BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP