1
2// Copyright Oliver Kowalke 2013.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#include "boost/fiber/scheduler.hpp"
8
9#include <chrono>
10#include <mutex>
11
12#include <boost/assert.hpp>
13
14#include "boost/fiber/context.hpp"
15#include "boost/fiber/exceptions.hpp"
16
17#ifdef BOOST_HAS_ABI_HEADERS
18# include BOOST_ABI_PREFIX
19#endif
20
21namespace boost {
22namespace fibers {
23
24void
25scheduler::release_terminated_() noexcept {
26 while ( ! terminated_queue_.empty() ) {
27 context * ctx = & terminated_queue_.front();
28 terminated_queue_.pop_front();
29 BOOST_ASSERT( ctx->is_context( type::worker_context) );
30 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
31 BOOST_ASSERT( this == ctx->get_scheduler() );
32 BOOST_ASSERT( ctx->is_resumable() );
33 BOOST_ASSERT( ! ctx->worker_is_linked() );
34 BOOST_ASSERT( ! ctx->ready_is_linked() );
35#if ! defined(BOOST_FIBERS_NO_ATOMICS)
36 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
37#endif
38 BOOST_ASSERT( ! ctx->sleep_is_linked() );
39 BOOST_ASSERT( ctx->wait_queue_.empty() );
40 BOOST_ASSERT( ctx->terminated_);
41 // if last reference, e.g. fiber::join() or fiber::detach()
42 // have been already called, this will call ~context(),
43 // the context is automatically removeid from worker-queue
44 intrusive_ptr_release( ctx);
45 }
46}
47
48#if ! defined(BOOST_FIBERS_NO_ATOMICS)
49void
50scheduler::remote_ready2ready_() noexcept {
51 remote_ready_queue_type tmp;
52 detail::spinlock_lock lk{ remote_ready_splk_ };
53 remote_ready_queue_.swap( other&: tmp);
54 lk.unlock();
55 // get context from remote ready-queue
56 while ( ! tmp.empty() ) {
57 context * ctx = & tmp.front();
58 tmp.pop_front();
59 // store context in local queues
60 schedule( ctx);
61 }
62}
63#endif
64
65void
66scheduler::sleep2ready_() noexcept {
67 // move context which the deadline has reached
68 // to ready-queue
69 // sleep-queue is sorted (ascending)
70 std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
71 sleep_queue_type::iterator e = sleep_queue_.end();
72 for ( sleep_queue_type::iterator i = sleep_queue_.begin(); i != e;) {
73 context * ctx = & ( * i);
74 // dispatcher context must never be pushed to sleep-queue
75 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
76 BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
77 BOOST_ASSERT( ! ctx->ready_is_linked() );
78 // remote_ready_hook_ can be linked in that point in case when the ctx
79 // has been signaled concurrently when sleep2ready_ is called. In that
80 // case sleep_waker_.wake() is just no-op, because sleep_waker_ is
81 // outdated
82 BOOST_ASSERT( ! ctx->terminated_is_linked() );
83 // set fiber to state_ready if deadline was reached
84 if ( ctx->tp_ <= now) {
85 // remove context from sleep-queue
86 i = sleep_queue_.erase( i);
87 // reset sleep-tp
88 ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
89 ctx->sleep_waker_.wake();
90 } else {
91 break; // first context with now < deadline
92 }
93 }
94}
95
96scheduler::scheduler(algo::algorithm::ptr_t algo) noexcept :
97 algo_{algo} {
98}
99
100scheduler::~scheduler() {
101 BOOST_ASSERT( nullptr != main_ctx_);
102 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
103 BOOST_ASSERT( context::active() == main_ctx_);
104 // signal dispatcher-context termination
105 shutdown_ = true;
106 // resume pending fibers
107 // by resuming dispatcher-context
108 context::active()->suspend();
109 // no context' in worker-queue
110 BOOST_ASSERT( worker_queue_.empty() );
111 BOOST_ASSERT( terminated_queue_.empty() );
112 BOOST_ASSERT( sleep_queue_.empty() );
113 // set active context to nullptr
114 context::reset_active();
115 // deallocate dispatcher-context
116 BOOST_ASSERT( ! dispatcher_ctx_->ready_is_linked() );
117 dispatcher_ctx_.reset();
118 // set main-context to nullptr
119 main_ctx_ = nullptr;
120}
121
122boost::context::fiber
123scheduler::dispatch() noexcept {
124 BOOST_ASSERT( context::active() == dispatcher_ctx_);
125 for (;;) {
126 if ( shutdown_) {
127 // notify sched-algorithm about termination
128 algo_->notify();
129 if ( worker_queue_.empty() ) {
130 break;
131 }
132 }
133 // release terminated context'
134 release_terminated_();
135#if ! defined(BOOST_FIBERS_NO_ATOMICS)
136 // get context' from remote ready-queue
137 remote_ready2ready_();
138#endif
139 // get sleeping context'
140 // must be called after remote_ready2ready_()
141 sleep2ready_();
142 // get next ready context
143 context * ctx = algo_->pick_next();
144 if ( nullptr != ctx) {
145 BOOST_ASSERT( ctx->is_resumable() );
146 BOOST_ASSERT( ! ctx->ready_is_linked() );
147#if ! defined(BOOST_FIBERS_NO_ATOMICS)
148 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
149#endif
150 BOOST_ASSERT( ! ctx->sleep_is_linked() );
151 BOOST_ASSERT( ! ctx->terminated_is_linked() );
152 // push dispatcher-context to ready-queue
153 // so that ready-queue never becomes empty
154 ctx->resume( dispatcher_ctx_.get() );
155 BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
156 } else {
157 // no ready context, wait till signaled
158 // set deadline to highest value
159 std::chrono::steady_clock::time_point suspend_time =
160 (std::chrono::steady_clock::time_point::max)();
161 // get lowest deadline from sleep-queue
162 sleep_queue_type::iterator i = sleep_queue_.begin();
163 if ( sleep_queue_.end() != i) {
164 suspend_time = i->tp_;
165 }
166 // no ready context, wait till signaled
167 algo_->suspend_until( suspend_time);
168 }
169 }
170 // release termianted context'
171 release_terminated_();
172 // return to main-context
173 return main_ctx_->suspend_with_cc();
174}
175
176void
177scheduler::schedule( context * ctx) noexcept {
178 BOOST_ASSERT( nullptr != ctx);
179 BOOST_ASSERT( ! ctx->ready_is_linked() );
180#if ! defined(BOOST_FIBERS_NO_ATOMICS)
181 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
182#endif
183 BOOST_ASSERT( ! ctx->terminated_is_linked() );
184 // remove context ctx from sleep-queue
185 // (might happen if blocked in timed_mutex::try_lock_until())
186 if ( ctx->sleep_is_linked() ) {
187 // unlink it from sleep-queue
188 ctx->sleep_unlink();
189 }
190 // push new context to ready-queue
191 algo_->awakened( ctx);
192}
193
194#if ! defined(BOOST_FIBERS_NO_ATOMICS)
195void
196scheduler::schedule_from_remote( context * ctx) noexcept {
197 BOOST_ASSERT( nullptr != ctx);
198 // another thread might signal the main-context of this thread
199 BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) );
200 BOOST_ASSERT( this == ctx->get_scheduler() );
201 BOOST_ASSERT( ! ctx->ready_is_linked() );
202 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
203 BOOST_ASSERT( ! ctx->terminated_is_linked() );
204 // protect for concurrent access
205 detail::spinlock_lock lk{ remote_ready_splk_ };
206 BOOST_ASSERT( ! shutdown_);
207 BOOST_ASSERT( nullptr != main_ctx_);
208 BOOST_ASSERT( nullptr != dispatcher_ctx_.get() );
209 // push new context to remote ready-queue
210 ctx->remote_ready_link( lst&: remote_ready_queue_);
211 lk.unlock();
212 // notify scheduler
213 algo_->notify();
214}
215#endif
216
217boost::context::fiber
218scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
219 BOOST_ASSERT( nullptr != ctx);
220 BOOST_ASSERT( context::active() == ctx);
221 BOOST_ASSERT( this == ctx->get_scheduler() );
222 BOOST_ASSERT( ctx->is_context( type::worker_context) );
223 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
224 BOOST_ASSERT( ! ctx->ready_is_linked() );
225#if ! defined(BOOST_FIBERS_NO_ATOMICS)
226 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
227#endif
228 BOOST_ASSERT( ! ctx->sleep_is_linked() );
229 BOOST_ASSERT( ! ctx->terminated_is_linked() );
230 BOOST_ASSERT( ctx->wait_queue_.empty() );
231 // store the terminated fiber in the terminated-queue
232 // the dispatcher-context will call
233 ctx->terminated_link( lst&: terminated_queue_);
234 // remove from the worker-queue
235 ctx->worker_unlink();
236 // release lock
237 lk.unlock();
238 // resume another fiber
239 return algo_->pick_next()->suspend_with_cc();
240}
241
242void
243scheduler::yield( context * ctx) noexcept {
244 BOOST_ASSERT( nullptr != ctx);
245 BOOST_ASSERT( context::active() == ctx);
246 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
247 BOOST_ASSERT( ! ctx->ready_is_linked() );
248#if ! defined(BOOST_FIBERS_NO_ATOMICS)
249 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
250#endif
251 BOOST_ASSERT( ! ctx->sleep_is_linked() );
252 BOOST_ASSERT( ! ctx->terminated_is_linked() );
253 // resume another fiber
254 algo_->pick_next()->resume( ctx);
255}
256
257bool
258scheduler::wait_until( context * ctx,
259 std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
260 BOOST_ASSERT( nullptr != ctx);
261 BOOST_ASSERT( context::active() == ctx);
262 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
263 BOOST_ASSERT( ! ctx->ready_is_linked() );
264#if ! defined(BOOST_FIBERS_NO_ATOMICS)
265 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
266#endif
267 BOOST_ASSERT( ! ctx->sleep_is_linked() );
268 BOOST_ASSERT( ! ctx->terminated_is_linked() );
269 ctx->sleep_waker_ = ctx->create_waker();
270 ctx->tp_ = sleep_tp;
271 ctx->sleep_link( set&: sleep_queue_);
272 // resume another context
273 algo_->pick_next()->resume();
274 // context has been resumed
275 // check if deadline has reached
276 return std::chrono::steady_clock::now() < sleep_tp;
277}
278
279bool
280scheduler::wait_until( context * ctx,
281 std::chrono::steady_clock::time_point const& sleep_tp,
282 detail::spinlock_lock & lk,
283 waker && w) noexcept {
284 BOOST_ASSERT( nullptr != ctx);
285 BOOST_ASSERT( context::active() == ctx);
286 BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
287 BOOST_ASSERT( ! ctx->ready_is_linked() );
288#if ! defined(BOOST_FIBERS_NO_ATOMICS)
289 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
290#endif
291 BOOST_ASSERT( ! ctx->sleep_is_linked() );
292 BOOST_ASSERT( ! ctx->terminated_is_linked() );
293 // push active context to sleep-queue
294 ctx->sleep_waker_ = std::move( w);
295 ctx->tp_ = sleep_tp;
296 ctx->sleep_link( set&: sleep_queue_);
297 // resume another context
298 algo_->pick_next()->resume( lk);
299 // context has been resumed
300 // check if deadline has reached
301 return std::chrono::steady_clock::now() < sleep_tp;
302}
303
304void
305scheduler::suspend() noexcept {
306 // resume another context
307 algo_->pick_next()->resume();
308}
309
310void
311scheduler::suspend( detail::spinlock_lock & lk) noexcept {
312 // resume another context
313 algo_->pick_next()->resume( lk);
314}
315
316bool
317scheduler::has_ready_fibers() const noexcept {
318 return algo_->has_ready_fibers();
319}
320
321void
322scheduler::set_algo( algo::algorithm::ptr_t algo) noexcept {
323 // move remaining context in current scheduler to new one
324 while ( algo_->has_ready_fibers() ) {
325 algo->awakened( algo_->pick_next() );
326 }
327 algo_ = std::move( algo);
328}
329
330void
331scheduler::attach_main_context( context * ctx) noexcept {
332 BOOST_ASSERT( nullptr != ctx);
333 // main-context represents the execution context created
334 // by the system, e.g. main()- or thread-context
335 // should not be in worker-queue
336 main_ctx_ = ctx;
337 main_ctx_->scheduler_ = this;
338}
339
340void
341scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept {
342 BOOST_ASSERT( ctx);
343 // dispatcher context has to handle
344 // - remote ready context'
345 // - sleeping context'
346 // - extern event-loops
347 // - suspending the thread if ready-queue is empty (waiting on external event)
348 // should not be in worker-queue
349 dispatcher_ctx_.swap( rhs&: ctx);
350 // add dispatcher-context to ready-queue
351 // so it is the first element in the ready-queue
352 // if the main context tries to suspend the first time
353 // the dispatcher-context is resumed and
354 // scheduler::dispatch() is executed
355 dispatcher_ctx_->scheduler_ = this;
356 algo_->awakened( dispatcher_ctx_.get() );
357}
358
359void
360scheduler::attach_worker_context( context * ctx) noexcept {
361 BOOST_ASSERT( nullptr != ctx);
362 BOOST_ASSERT( nullptr == ctx->get_scheduler() );
363 BOOST_ASSERT( ! ctx->ready_is_linked() );
364#if ! defined(BOOST_FIBERS_NO_ATOMICS)
365 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
366#endif
367 BOOST_ASSERT( ! ctx->sleep_is_linked() );
368 BOOST_ASSERT( ! ctx->terminated_is_linked() );
369 BOOST_ASSERT( ! ctx->worker_is_linked() );
370 ctx->worker_link( lst&: worker_queue_);
371 ctx->scheduler_ = this;
372 // an attached context must belong at least to worker-queue
373}
374
375void
376scheduler::detach_worker_context( context * ctx) noexcept {
377 BOOST_ASSERT( nullptr != ctx);
378 BOOST_ASSERT( ! ctx->ready_is_linked() );
379#if ! defined(BOOST_FIBERS_NO_ATOMICS)
380 BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
381#endif
382 BOOST_ASSERT( ! ctx->sleep_is_linked() );
383 BOOST_ASSERT( ! ctx->terminated_is_linked() );
384 BOOST_ASSERT( ctx->worker_is_linked() );
385 BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
386 ctx->worker_unlink();
387 BOOST_ASSERT( ! ctx->worker_is_linked() );
388 ctx->scheduler_ = nullptr;
389 // a detached context must not belong to any queue
390}
391
392}}
393
394#ifdef BOOST_HAS_ABI_HEADERS
395# include BOOST_ABI_SUFFIX
396#endif
397

source code of boost/libs/fiber/src/scheduler.cpp