1 | // |
2 | // detail/impl/strand_service.ipp |
3 | // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4 | // |
5 | // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
6 | // |
7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
9 | // |
10 | |
11 | #ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP |
12 | #define BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP |
13 | |
14 | #if defined(_MSC_VER) && (_MSC_VER >= 1200) |
15 | # pragma once |
16 | #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) |
17 | |
18 | #include <boost/asio/detail/config.hpp> |
19 | #include <boost/asio/detail/call_stack.hpp> |
20 | #include <boost/asio/detail/strand_service.hpp> |
21 | |
22 | #include <boost/asio/detail/push_options.hpp> |
23 | |
24 | namespace boost { |
25 | namespace asio { |
26 | namespace detail { |
27 | |
28 | struct strand_service::on_do_complete_exit |
29 | { |
30 | io_service_impl* owner_; |
31 | strand_impl* impl_; |
32 | |
33 | ~on_do_complete_exit() |
34 | { |
35 | impl_->mutex_.lock(); |
36 | impl_->ready_queue_.push(q&: impl_->waiting_queue_); |
37 | bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty(); |
38 | impl_->mutex_.unlock(); |
39 | |
40 | if (more_handlers) |
41 | owner_->post_immediate_completion(op: impl_, is_continuation: true); |
42 | } |
43 | }; |
44 | |
45 | strand_service::strand_service(boost::asio::io_service& io_service) |
46 | : boost::asio::detail::service_base<strand_service>(io_service), |
47 | io_service_(boost::asio::use_service<io_service_impl>(ios&: io_service)), |
48 | mutex_(), |
49 | salt_(0) |
50 | { |
51 | } |
52 | |
53 | void strand_service::shutdown_service() |
54 | { |
55 | op_queue<operation> ops; |
56 | |
57 | boost::asio::detail::mutex::scoped_lock lock(mutex_); |
58 | |
59 | for (std::size_t i = 0; i < num_implementations; ++i) |
60 | { |
61 | if (strand_impl* impl = implementations_[i].get()) |
62 | { |
63 | ops.push(q&: impl->waiting_queue_); |
64 | ops.push(q&: impl->ready_queue_); |
65 | } |
66 | } |
67 | } |
68 | |
69 | void strand_service::construct(strand_service::implementation_type& impl) |
70 | { |
71 | boost::asio::detail::mutex::scoped_lock lock(mutex_); |
72 | |
73 | std::size_t salt = salt_++; |
74 | #if defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) |
75 | std::size_t index = salt; |
76 | #else // defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) |
77 | std::size_t index = reinterpret_cast<std::size_t>(&impl); |
78 | index += (reinterpret_cast<std::size_t>(&impl) >> 3); |
79 | index ^= salt + 0x9e3779b9 + (index << 6) + (index >> 2); |
80 | #endif // defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) |
81 | index = index % num_implementations; |
82 | |
83 | if (!implementations_[index].get()) |
84 | implementations_[index].reset(p: new strand_impl); |
85 | impl = implementations_[index].get(); |
86 | } |
87 | |
88 | bool strand_service::running_in_this_thread( |
89 | const implementation_type& impl) const |
90 | { |
91 | return call_stack<strand_impl>::contains(k: impl) != 0; |
92 | } |
93 | |
94 | bool strand_service::do_dispatch(implementation_type& impl, operation* op) |
95 | { |
96 | // If we are running inside the io_service, and no other handler already |
97 | // holds the strand lock, then the handler can run immediately. |
98 | bool can_dispatch = io_service_.can_dispatch(); |
99 | impl->mutex_.lock(); |
100 | if (can_dispatch && !impl->locked_) |
101 | { |
102 | // Immediate invocation is allowed. |
103 | impl->locked_ = true; |
104 | impl->mutex_.unlock(); |
105 | return true; |
106 | } |
107 | |
108 | if (impl->locked_) |
109 | { |
110 | // Some other handler already holds the strand lock. Enqueue for later. |
111 | impl->waiting_queue_.push(h: op); |
112 | impl->mutex_.unlock(); |
113 | } |
114 | else |
115 | { |
116 | // The handler is acquiring the strand lock and so is responsible for |
117 | // scheduling the strand. |
118 | impl->locked_ = true; |
119 | impl->mutex_.unlock(); |
120 | impl->ready_queue_.push(h: op); |
121 | io_service_.post_immediate_completion(op: impl, is_continuation: false); |
122 | } |
123 | |
124 | return false; |
125 | } |
126 | |
127 | void strand_service::do_post(implementation_type& impl, |
128 | operation* op, bool is_continuation) |
129 | { |
130 | impl->mutex_.lock(); |
131 | if (impl->locked_) |
132 | { |
133 | // Some other handler already holds the strand lock. Enqueue for later. |
134 | impl->waiting_queue_.push(h: op); |
135 | impl->mutex_.unlock(); |
136 | } |
137 | else |
138 | { |
139 | // The handler is acquiring the strand lock and so is responsible for |
140 | // scheduling the strand. |
141 | impl->locked_ = true; |
142 | impl->mutex_.unlock(); |
143 | impl->ready_queue_.push(h: op); |
144 | io_service_.post_immediate_completion(op: impl, is_continuation); |
145 | } |
146 | } |
147 | |
148 | void strand_service::do_complete(io_service_impl* owner, operation* base, |
149 | const boost::system::error_code& ec, std::size_t /*bytes_transferred*/) |
150 | { |
151 | if (owner) |
152 | { |
153 | strand_impl* impl = static_cast<strand_impl*>(base); |
154 | |
155 | // Indicate that this strand is executing on the current thread. |
156 | call_stack<strand_impl>::context ctx(impl); |
157 | |
158 | // Ensure the next handler, if any, is scheduled on block exit. |
159 | on_do_complete_exit on_exit = { .owner_: owner, .impl_: impl }; |
160 | (void)on_exit; |
161 | |
162 | // Run all ready handlers. No lock is required since the ready queue is |
163 | // accessed only within the strand. |
164 | while (operation* o = impl->ready_queue_.front()) |
165 | { |
166 | impl->ready_queue_.pop(); |
167 | o->complete(owner&: *owner, ec, bytes_transferred: 0); |
168 | } |
169 | } |
170 | } |
171 | |
172 | } // namespace detail |
173 | } // namespace asio |
174 | } // namespace boost |
175 | |
176 | #include <boost/asio/detail/pop_options.hpp> |
177 | |
178 | #endif // BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP |
179 | |