kqueue_reactor.ipp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. //
  2. // detail/impl/kqueue_reactor.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
  7. //
  8. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  9. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  10. //
  11. #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
  12. #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
  13. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  14. # pragma once
  15. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  16. #include <boost/asio/detail/config.hpp>
  17. #if defined(BOOST_ASIO_HAS_KQUEUE)
  18. #include <boost/asio/detail/kqueue_reactor.hpp>
  19. #include <boost/asio/detail/throw_error.hpp>
  20. #include <boost/asio/error.hpp>
  21. #include <boost/asio/detail/push_options.hpp>
  22. #if defined(__NetBSD__)
  23. # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
  24. EV_SET(ev, ident, filt, flags, fflags, data, \
  25. reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
  26. #else
  27. # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
  28. EV_SET(ev, ident, filt, flags, fflags, data, udata)
  29. #endif
  30. namespace boost {
  31. namespace asio {
  32. namespace detail {
  33. kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service)
  34. : boost::asio::detail::service_base<kqueue_reactor>(io_service),
  35. io_service_(use_service<io_service_impl>(io_service)),
  36. mutex_(),
  37. kqueue_fd_(do_kqueue_create()),
  38. interrupter_(),
  39. shutdown_(false)
  40. {
  41. // The interrupter is put into a permanently readable state. Whenever we want
  42. // to interrupt the blocked kevent call we register a read operation against
  43. // the descriptor.
  44. interrupter_.interrupt();
  45. }
  46. kqueue_reactor::~kqueue_reactor()
  47. {
  48. close(kqueue_fd_);
  49. }
  50. void kqueue_reactor::shutdown_service()
  51. {
  52. mutex::scoped_lock lock(mutex_);
  53. shutdown_ = true;
  54. lock.unlock();
  55. op_queue<operation> ops;
  56. while (descriptor_state* state = registered_descriptors_.first())
  57. {
  58. for (int i = 0; i < max_ops; ++i)
  59. ops.push(state->op_queue_[i]);
  60. state->shutdown_ = true;
  61. registered_descriptors_.free(state);
  62. }
  63. timer_queues_.get_all_timers(ops);
  64. io_service_.abandon_operations(ops);
  65. }
  66. void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
  67. {
  68. if (fork_ev == boost::asio::io_service::fork_child)
  69. {
  70. // The kqueue descriptor is automatically closed in the child.
  71. kqueue_fd_ = -1;
  72. kqueue_fd_ = do_kqueue_create();
  73. interrupter_.recreate();
  74. // Re-register all descriptors with kqueue.
  75. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  76. for (descriptor_state* state = registered_descriptors_.first();
  77. state != 0; state = state->next_)
  78. {
  79. struct kevent events[2];
  80. int num_events = 0;
  81. if (!state->op_queue_[read_op].empty())
  82. BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
  83. EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
  84. else if (!state->op_queue_[except_op].empty())
  85. BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
  86. EVFILT_READ, EV_ADD | EV_CLEAR, EV_OOBAND, 0, state);
  87. if (!state->op_queue_[write_op].empty())
  88. BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
  89. EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
  90. if (num_events && ::kevent(kqueue_fd_, events, num_events, 0, 0, 0) == -1)
  91. {
  92. boost::system::error_code error(errno,
  93. boost::asio::error::get_system_category());
  94. boost::asio::detail::throw_error(error);
  95. }
  96. }
  97. }
  98. }
  99. void kqueue_reactor::init_task()
  100. {
  101. io_service_.init_task();
  102. }
  103. int kqueue_reactor::register_descriptor(socket_type descriptor,
  104. kqueue_reactor::per_descriptor_data& descriptor_data)
  105. {
  106. descriptor_data = allocate_descriptor_state();
  107. mutex::scoped_lock lock(descriptor_data->mutex_);
  108. descriptor_data->descriptor_ = descriptor;
  109. descriptor_data->shutdown_ = false;
  110. return 0;
  111. }
  112. int kqueue_reactor::register_internal_descriptor(
  113. int op_type, socket_type descriptor,
  114. kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
  115. {
  116. descriptor_data = allocate_descriptor_state();
  117. mutex::scoped_lock lock(descriptor_data->mutex_);
  118. descriptor_data->descriptor_ = descriptor;
  119. descriptor_data->shutdown_ = false;
  120. descriptor_data->op_queue_[op_type].push(op);
  121. struct kevent event;
  122. switch (op_type)
  123. {
  124. case read_op:
  125. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  126. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  127. break;
  128. case write_op:
  129. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
  130. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  131. break;
  132. case except_op:
  133. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  134. EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
  135. break;
  136. }
  137. ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
  138. return 0;
  139. }
  140. void kqueue_reactor::move_descriptor(socket_type,
  141. kqueue_reactor::per_descriptor_data& target_descriptor_data,
  142. kqueue_reactor::per_descriptor_data& source_descriptor_data)
  143. {
  144. target_descriptor_data = source_descriptor_data;
  145. source_descriptor_data = 0;
  146. }
  147. void kqueue_reactor::start_op(int op_type, socket_type descriptor,
  148. kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
  149. bool is_continuation, bool allow_speculative)
  150. {
  151. if (!descriptor_data)
  152. {
  153. op->ec_ = boost::asio::error::bad_descriptor;
  154. post_immediate_completion(op, is_continuation);
  155. return;
  156. }
  157. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  158. if (descriptor_data->shutdown_)
  159. {
  160. post_immediate_completion(op, is_continuation);
  161. return;
  162. }
  163. bool first = descriptor_data->op_queue_[op_type].empty();
  164. if (first)
  165. {
  166. if (allow_speculative)
  167. {
  168. if (op_type != read_op || descriptor_data->op_queue_[except_op].empty())
  169. {
  170. if (op->perform())
  171. {
  172. descriptor_lock.unlock();
  173. io_service_.post_immediate_completion(op, is_continuation);
  174. return;
  175. }
  176. }
  177. }
  178. }
  179. descriptor_data->op_queue_[op_type].push(op);
  180. io_service_.work_started();
  181. if (first)
  182. {
  183. struct kevent event;
  184. switch (op_type)
  185. {
  186. case read_op:
  187. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  188. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  189. break;
  190. case write_op:
  191. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
  192. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  193. break;
  194. case except_op:
  195. if (!descriptor_data->op_queue_[read_op].empty())
  196. return; // Already registered for read events.
  197. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  198. EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
  199. break;
  200. }
  201. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  202. {
  203. op->ec_ = boost::system::error_code(errno,
  204. boost::asio::error::get_system_category());
  205. descriptor_data->op_queue_[op_type].pop();
  206. io_service_.post_deferred_completion(op);
  207. }
  208. }
  209. }
  210. void kqueue_reactor::cancel_ops(socket_type,
  211. kqueue_reactor::per_descriptor_data& descriptor_data)
  212. {
  213. if (!descriptor_data)
  214. return;
  215. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  216. op_queue<operation> ops;
  217. for (int i = 0; i < max_ops; ++i)
  218. {
  219. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  220. {
  221. op->ec_ = boost::asio::error::operation_aborted;
  222. descriptor_data->op_queue_[i].pop();
  223. ops.push(op);
  224. }
  225. }
  226. descriptor_lock.unlock();
  227. io_service_.post_deferred_completions(ops);
  228. }
  229. void kqueue_reactor::deregister_descriptor(socket_type descriptor,
  230. kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
  231. {
  232. if (!descriptor_data)
  233. return;
  234. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  235. if (!descriptor_data->shutdown_)
  236. {
  237. if (closing)
  238. {
  239. // The descriptor will be automatically removed from the kqueue when it
  240. // is closed.
  241. }
  242. else
  243. {
  244. struct kevent events[2];
  245. BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
  246. EVFILT_READ, EV_DELETE, 0, 0, 0);
  247. BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
  248. EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  249. ::kevent(kqueue_fd_, events, 2, 0, 0, 0);
  250. }
  251. op_queue<operation> ops;
  252. for (int i = 0; i < max_ops; ++i)
  253. {
  254. while (reactor_op* op = descriptor_data->op_queue_[i].front())
  255. {
  256. op->ec_ = boost::asio::error::operation_aborted;
  257. descriptor_data->op_queue_[i].pop();
  258. ops.push(op);
  259. }
  260. }
  261. descriptor_data->descriptor_ = -1;
  262. descriptor_data->shutdown_ = true;
  263. descriptor_lock.unlock();
  264. free_descriptor_state(descriptor_data);
  265. descriptor_data = 0;
  266. io_service_.post_deferred_completions(ops);
  267. }
  268. }
  269. void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
  270. kqueue_reactor::per_descriptor_data& descriptor_data)
  271. {
  272. if (!descriptor_data)
  273. return;
  274. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  275. if (!descriptor_data->shutdown_)
  276. {
  277. struct kevent events[2];
  278. BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
  279. EVFILT_READ, EV_DELETE, 0, 0, 0);
  280. BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
  281. EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  282. ::kevent(kqueue_fd_, events, 2, 0, 0, 0);
  283. op_queue<operation> ops;
  284. for (int i = 0; i < max_ops; ++i)
  285. ops.push(descriptor_data->op_queue_[i]);
  286. descriptor_data->descriptor_ = -1;
  287. descriptor_data->shutdown_ = true;
  288. descriptor_lock.unlock();
  289. free_descriptor_state(descriptor_data);
  290. descriptor_data = 0;
  291. }
  292. }
  293. void kqueue_reactor::run(bool block, op_queue<operation>& ops)
  294. {
  295. mutex::scoped_lock lock(mutex_);
  296. // Determine how long to block while waiting for events.
  297. timespec timeout_buf = { 0, 0 };
  298. timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
  299. lock.unlock();
  300. // Block on the kqueue descriptor.
  301. struct kevent events[128];
  302. int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
  303. // Dispatch the waiting events.
  304. for (int i = 0; i < num_events; ++i)
  305. {
  306. int descriptor = static_cast<int>(events[i].ident);
  307. void* ptr = reinterpret_cast<void*>(events[i].udata);
  308. if (ptr == &interrupter_)
  309. {
  310. // No need to reset the interrupter since we're leaving the descriptor
  311. // in a ready-to-read state and relying on edge-triggered notifications.
  312. }
  313. else
  314. {
  315. descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
  316. mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
  317. // Exception operations must be processed first to ensure that any
  318. // out-of-band data is read before normal data.
  319. #if defined(__NetBSD__)
  320. static const unsigned int filter[max_ops] =
  321. #else
  322. static const int filter[max_ops] =
  323. #endif
  324. { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
  325. for (int j = max_ops - 1; j >= 0; --j)
  326. {
  327. if (events[i].filter == filter[j])
  328. {
  329. if (j != except_op || events[i].flags & EV_OOBAND)
  330. {
  331. while (reactor_op* op = descriptor_data->op_queue_[j].front())
  332. {
  333. if (events[i].flags & EV_ERROR)
  334. {
  335. op->ec_ = boost::system::error_code(
  336. static_cast<int>(events[i].data),
  337. boost::asio::error::get_system_category());
  338. descriptor_data->op_queue_[j].pop();
  339. ops.push(op);
  340. }
  341. if (op->perform())
  342. {
  343. descriptor_data->op_queue_[j].pop();
  344. ops.push(op);
  345. }
  346. else
  347. break;
  348. }
  349. }
  350. }
  351. }
  352. // Renew registration for event notifications.
  353. struct kevent event;
  354. switch (events[i].filter)
  355. {
  356. case EVFILT_READ:
  357. if (!descriptor_data->op_queue_[read_op].empty())
  358. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  359. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  360. else if (!descriptor_data->op_queue_[except_op].empty())
  361. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
  362. EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
  363. else
  364. continue;
  365. break;
  366. case EVFILT_WRITE:
  367. if (!descriptor_data->op_queue_[write_op].empty())
  368. BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
  369. EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
  370. else
  371. continue;
  372. break;
  373. default:
  374. break;
  375. }
  376. if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
  377. {
  378. boost::system::error_code error(errno,
  379. boost::asio::error::get_system_category());
  380. for (int j = 0; j < max_ops; ++j)
  381. {
  382. while (reactor_op* op = descriptor_data->op_queue_[j].front())
  383. {
  384. op->ec_ = error;
  385. descriptor_data->op_queue_[j].pop();
  386. ops.push(op);
  387. }
  388. }
  389. }
  390. }
  391. }
  392. lock.lock();
  393. timer_queues_.get_ready_timers(ops);
  394. }
  395. void kqueue_reactor::interrupt()
  396. {
  397. struct kevent event;
  398. BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(),
  399. EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_);
  400. ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
  401. }
  402. int kqueue_reactor::do_kqueue_create()
  403. {
  404. int fd = ::kqueue();
  405. if (fd == -1)
  406. {
  407. boost::system::error_code ec(errno,
  408. boost::asio::error::get_system_category());
  409. boost::asio::detail::throw_error(ec, "kqueue");
  410. }
  411. return fd;
  412. }
  413. kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
  414. {
  415. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  416. return registered_descriptors_.alloc();
  417. }
  418. void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
  419. {
  420. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  421. registered_descriptors_.free(s);
  422. }
  423. void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
  424. {
  425. mutex::scoped_lock lock(mutex_);
  426. timer_queues_.insert(&queue);
  427. }
  428. void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
  429. {
  430. mutex::scoped_lock lock(mutex_);
  431. timer_queues_.erase(&queue);
  432. }
  433. timespec* kqueue_reactor::get_timeout(timespec& ts)
  434. {
  435. // By default we will wait no longer than 5 minutes. This will ensure that
  436. // any changes to the system clock are detected after no longer than this.
  437. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  438. ts.tv_sec = usec / 1000000;
  439. ts.tv_nsec = (usec % 1000000) * 1000;
  440. return &ts;
  441. }
  442. } // namespace detail
  443. } // namespace asio
  444. } // namespace boost
  445. #undef BOOST_ASIO_KQUEUE_EV_SET
  446. #include <boost/asio/detail/pop_options.hpp>
  447. #endif // defined(BOOST_ASIO_HAS_KQUEUE)
  448. #endif // BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP