1//
2// Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
9#define BOOST_COBALT_IMPL_CHANNEL_HPP
10
11#include <boost/cobalt/channel.hpp>
12#include <boost/cobalt/result.hpp>
13
14#include <boost/asio/post.hpp>
15
16namespace boost::cobalt
17{
18
19#if !defined(BOOST_COBALT_NO_PMR)
20template<typename T>
21inline channel<T>::channel(
22 std::size_t limit,
23 executor executor,
24 pmr::memory_resource * resource)
25 : buffer_(limit, resource), executor_(executor) {}
26#else
27template<typename T>
28inline channel<T>::channel(
29 std::size_t limit,
30 executor executor)
31 : buffer_(limit), executor_(executor) {}
32#endif
33
34template<typename T>
35auto channel<T>::get_executor() -> const executor_type & {return executor_;}
36
37template<typename T>
38bool channel<T>::is_open() const {return !is_closed_;}
39
40
41template<typename T>
42channel<T>::~channel()
43{
44 while (!read_queue_.empty())
45 read_queue_.front().awaited_from.reset();
46
47 while (!write_queue_.empty())
48 write_queue_.front().awaited_from.reset();
49
50}
51
52template<typename T>
53void channel<T>::close()
54{
55 is_closed_ = true;
56 while (!read_queue_.empty())
57 {
58 auto & op = read_queue_.front();
59 op.unlink();
60 op.cancelled = true;
61 op.cancel_slot.clear();
62
63 if (op.awaited_from)
64 asio::post(executor_, std::move(op.awaited_from));
65 }
66 while (!write_queue_.empty())
67 {
68 auto & op = write_queue_.front();
69 op.unlink();
70 op.cancelled = true;
71 op.cancel_slot.clear();
72 if (op.awaited_from)
73 asio::post(executor_, std::move(op.awaited_from));
74 }
75}
76
77
78template<typename T>
79struct channel<T>::read_op::cancel_impl
80{
81 read_op * op;
82 cancel_impl(read_op * op) : op(op) {}
83 void operator()(asio::cancellation_type)
84 {
85 op->cancelled = true;
86 op->unlink();
87 if (op->awaited_from)
88 asio::post(
89 op->chn->executor_,
90 std::move(op->awaited_from));
91 op->cancel_slot.clear();
92 }
93};
94
95template<typename T>
96template<typename Promise>
97std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
98{
99 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
100 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
101 cancel_slot.emplace<cancel_impl>(this);
102
103 if (awaited_from)
104 boost::throw_exception(e: std::runtime_error("already-awaited"), loc);
105 awaited_from.reset(handle: h.address());
106 // currently nothing to read
107 if constexpr (requires (Promise p) {p.begin_transaction();})
108 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
109
110 if (chn->write_queue_.empty())
111 {
112 chn->read_queue_.push_back(*this);
113 return std::noop_coroutine();
114 }
115 else
116 {
117 cancel_slot.clear();
118 auto & op = chn->write_queue_.front();
119 op.transactional_unlink();
120 op.direct = true;
121 if (op.ref.index() == 0)
122 direct = std::move(*variant2::get<0>(op.ref));
123 else
124 direct = *variant2::get<1>(op.ref);
125 BOOST_ASSERT(op.awaited_from);
126 asio::post(chn->executor_, std::move(awaited_from));
127 return op.awaited_from.release();
128 }
129}
130
131
132template<typename T>
133T channel<T>::read_op::await_resume()
134{
135 return await_resume(as_result_tag{}).value(loc);
136}
137
138template<typename T>
139std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
140{
141 auto res = await_resume(as_result_tag{});
142
143 if (res.has_error())
144 return {res.error(), T{}};
145 else
146 return {system::error_code{}, std::move(*res)};
147
148}
149
150template<typename T>
151system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
152{
153 if (cancel_slot.is_connected())
154 cancel_slot.clear();
155
156 if (cancelled)
157 return {system::in_place_error, asio::error::operation_aborted};
158
159 T value = direct ? std::move(*direct) : std::move(chn->buffer_.front());
160 if (!direct)
161 chn->buffer_.pop_front();
162
163 if (!chn->write_queue_.empty())
164 {
165 auto &op = chn->write_queue_.front();
166 BOOST_ASSERT(chn->read_queue_.empty());
167 if (op.await_ready())
168 {
169 op.transactional_unlink();
170 BOOST_ASSERT(op.awaited_from);
171 asio::post(chn->executor_, std::move(op.awaited_from));
172 }
173 }
174 return {system::in_place_value, value};
175}
176
177template<typename T>
178struct channel<T>::write_op::cancel_impl
179{
180 write_op * op;
181 cancel_impl(write_op * op) : op(op) {}
182 void operator()(asio::cancellation_type)
183 {
184 op->cancelled = true;
185 op->unlink();
186 if (op->awaited_from)
187 asio::post(
188 op->chn->executor_, std::move(op->awaited_from));
189 op->cancel_slot.clear();
190 }
191};
192
193template<typename T>
194template<typename Promise>
195std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
196{
197 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
198 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
199 cancel_slot.emplace<cancel_impl>(this);
200
201 awaited_from.reset(handle: h.address());
202 if constexpr (requires (Promise p) {p.begin_transaction();})
203 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
204
205 // currently nothing to read
206 if (chn->read_queue_.empty())
207 {
208 chn->write_queue_.push_back(*this);
209 return std::noop_coroutine();
210 }
211 else
212 {
213 cancel_slot.clear();
214 auto & op = chn->read_queue_.front();
215 op.transactional_unlink();
216 if (ref.index() == 0)
217 op.direct = std::move(*variant2::get<0>(ref));
218 else
219 op.direct = *variant2::get<1>(ref);
220
221 BOOST_ASSERT(op.awaited_from);
222 direct = true;
223 asio::post(chn->executor_, std::move(awaited_from));
224
225 return op.awaited_from.release();
226 }
227}
228
229template<typename T>
230std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
231{
232 return await_resume(as_result_tag{}).error();
233}
234
235template<typename T>
236void channel<T>::write_op::await_resume()
237{
238 await_resume(as_result_tag{}).value(loc);
239}
240
241template<typename T>
242system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
243{
244 if (cancel_slot.is_connected())
245 cancel_slot.clear();
246 if (cancelled)
247 boost::throw_exception(e: system::system_error(asio::error::operation_aborted), loc);
248
249
250 if (!direct)
251 {
252 BOOST_ASSERT(!chn->buffer_.full());
253 if (ref.index() == 0)
254 chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
255 else
256 chn->buffer_.push_back(*variant2::get<1>(ref));
257 }
258
259 if (!chn->read_queue_.empty())
260 {
261 auto & op = chn->read_queue_.front();
262 BOOST_ASSERT(chn->write_queue_.empty());
263 if (op.await_ready())
264 {
265 op.transactional_unlink();
266 BOOST_ASSERT(op.awaited_from);
267 asio::post(chn->executor_, std::move(op.awaited_from));
268 }
269 }
270 return system::in_place_value;
271}
272
273struct channel<void>::read_op::cancel_impl
274{
275 read_op * op;
276 cancel_impl(read_op * op) : op(op) {}
277 void operator()(asio::cancellation_type)
278 {
279 op->cancelled = true;
280 op->unlink();
281 asio::post(ex: op->chn->executor_, token: std::move(op->awaited_from));
282 op->cancel_slot.clear();
283 }
284};
285
286struct channel<void>::write_op::cancel_impl
287{
288 write_op * op;
289 cancel_impl(write_op * op) : op(op) {}
290 void operator()(asio::cancellation_type)
291 {
292 op->cancelled = true;
293 op->unlink();
294 asio::post(ex: op->chn->executor_, token: std::move(op->awaited_from));
295 op->cancel_slot.clear();
296 }
297};
298
299template<typename Promise>
300std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
301{
302 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
303 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
304 cancel_slot.emplace<cancel_impl>(args: this);
305
306 awaited_from.reset(handle: h.address());
307
308 if constexpr (requires (Promise p) {p.begin_transaction();})
309 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
310
311 // nothing to read currently, enqueue
312 if (chn->write_queue_.empty())
313 {
314 chn->read_queue_.push_back(value&: *this);
315 return std::noop_coroutine();
316 }
317 else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
318 {
319 cancel_slot.clear();
320 auto & op = chn->write_queue_.front();
321 op.unlink();
322 op.direct = true;
323 BOOST_ASSERT(op.awaited_from);
324 direct = true;
325 asio::post(ex: chn->executor_, token: std::move(awaited_from));
326 return op.awaited_from.release();
327 }
328}
329
330
331template<typename Promise>
332std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
333{
334 if constexpr (requires (Promise p) {p.get_cancellation_slot();})
335 if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
336 cancel_slot.emplace<cancel_impl>(args: this);
337
338 awaited_from.reset(handle: h.address());
339 // currently nothing to read
340 if constexpr (requires (Promise p) {p.begin_transaction();})
341 begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
342
343 if (chn->read_queue_.empty())
344 {
345 chn->write_queue_.push_back(value&: *this);
346 return std::noop_coroutine();
347 }
348 else
349 {
350 cancel_slot.clear();
351 auto & op = chn->read_queue_.front();
352 op.unlink();
353 op.direct = true;
354 BOOST_ASSERT(op.awaited_from);
355 direct = true;
356 asio::post(ex: chn->executor_, token: std::move(awaited_from));
357 return op.awaited_from.release();
358 }
359}
360
361}
362
363#endif //BOOST_COBALT_IMPL_CHANNEL_HPP
364

source code of boost/libs/cobalt/include/boost/cobalt/impl/channel.hpp