1//
2// Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#include <boost/cobalt/channel.hpp>
9#include <boost/cobalt/promise.hpp>
10#include <boost/cobalt/race.hpp>
11#include <boost/cobalt/gather.hpp>
12#include <boost/cobalt/async_for.hpp>
13
14#include <boost/cobalt/join.hpp>
15#include <boost/asio/steady_timer.hpp>
16
17#include "test.hpp"
18#include <boost/test/unit_test.hpp>
19
20namespace cobalt = boost::cobalt;
21
22cobalt::promise<void> do_write(cobalt::channel<void> &chn, std::vector<int> & seq)
23{
24 seq.push_back(x: 0);
25 co_await chn.write(); seq.push_back(x: 1);
26 co_await chn.write(); seq.push_back(x: 2);
27 (co_await cobalt::as_result(aw: chn.write())).value(); seq.push_back(x: 3);
28 co_await cobalt::as_tuple(aw: chn.write()); seq.push_back(x: 4);
29 co_await chn.write(); seq.push_back(x: 5);
30 co_await chn.write(); seq.push_back(x: 6);
31 co_await chn.write(); seq.push_back(x: 7);
32}
33
34cobalt::promise<void> do_read(cobalt::channel<void> &chn, std::vector<int> & seq)
35{
36 seq.push_back(x: 10);
37 co_await chn.read(); seq.push_back(x: 11);
38 co_await chn.read(); seq.push_back(x: 12);
39 (co_await cobalt::as_result(aw: chn.read())).value(); seq.push_back(x: 13);
40 co_await cobalt::as_tuple(aw: chn.read()); seq.push_back(x: 14);
41 co_await chn.read(); seq.push_back(x: 15);
42 co_await chn.read(); seq.push_back(x: 16);
43 co_await chn.read(); seq.push_back(x: 17);
44}
45
46BOOST_AUTO_TEST_SUITE(channel);
47
48CO_TEST_CASE(void_)
49{
50 cobalt::channel<void> chn{2u, co_await cobalt::this_coro::executor};
51
52 std::vector<int> seq;
53 auto r = do_read(chn, seq);
54 auto w = do_write(chn, seq);
55
56 co_await r;
57 co_await w;
58 BOOST_REQUIRE(seq.size() == 16);
59 BOOST_CHECK(seq[0] == 10);
60 BOOST_CHECK(seq[1] == 0);
61 BOOST_CHECK(seq[2] == 1);
62 BOOST_CHECK(seq[3] == 2);
63 BOOST_CHECK(seq[4] == 11);
64 BOOST_CHECK(seq[5] == 12);
65 BOOST_CHECK(seq[6] == 3);
66 BOOST_CHECK(seq[7] == 4);
67 BOOST_CHECK(seq[8] == 13);
68 BOOST_CHECK(seq[9] == 14);
69 BOOST_CHECK(seq[10] == 5);
70 BOOST_CHECK(seq[11] == 6);
71 BOOST_CHECK(seq[12] == 15);
72 BOOST_CHECK(seq[13] == 16);
73 BOOST_CHECK(seq[14] == 7);
74 BOOST_CHECK(seq[15] == 17);
75}
76
77CO_TEST_CASE(void_0)
78{
79 cobalt::channel<void> chn{0u, co_await cobalt::this_coro::executor};
80
81 std::vector<int> seq;
82 auto r = do_read(chn, seq);
83 auto w = do_write(chn, seq);
84
85 co_await r;
86 co_await w;
87 BOOST_REQUIRE(seq.size() == 16);
88 BOOST_CHECK(seq[0] == 10);
89 BOOST_CHECK(seq[1] == 0);
90 BOOST_CHECK(seq[2] == 11);
91 BOOST_CHECK(seq[3] == 1);
92 BOOST_CHECK(seq[4] == 12);
93 BOOST_CHECK(seq[5] == 2);
94 BOOST_CHECK(seq[6] == 13);
95 BOOST_CHECK(seq[7] == 3);
96 BOOST_CHECK(seq[8] == 14);
97 BOOST_CHECK(seq[9] == 4);
98 BOOST_CHECK(seq[10] == 15);
99 BOOST_CHECK(seq[11] == 5);
100 BOOST_CHECK(seq[12] == 16);
101 BOOST_CHECK(seq[13] == 6);
102 BOOST_CHECK(seq[14] == 17);
103 BOOST_CHECK(seq[15] == 7);
104}
105
106cobalt::promise<void> do_write(cobalt::channel<int> &chn, std::vector<int> & seq)
107{
108 seq.push_back(x: 0);
109 co_await chn.write(value: 1); seq.push_back(x: 1);
110 co_await chn.write(value: 2); seq.push_back(x: 2);
111 (co_await cobalt::as_result(aw: chn.write(value: 3))).value(); seq.push_back(x: 3);
112 co_await cobalt::as_tuple(aw: chn.write(value: 4)); seq.push_back(x: 4);
113 co_await chn.write(value: 5); seq.push_back(x: 5);
114 co_await chn.write(value: 6); seq.push_back(x: 6);
115 co_await chn.write(value: 7); seq.push_back(x: 7);
116}
117
118cobalt::promise<void> do_read(cobalt::channel<int> &chn, std::vector<int> & seq)
119{
120 seq.push_back(x: 10);
121 BOOST_CHECK(1 == co_await chn.read()); seq.push_back(x: 11);
122 BOOST_CHECK(2 == co_await chn.read()); seq.push_back(x: 12);
123 BOOST_CHECK(3 == (co_await cobalt::as_result(chn.read())).value()); seq.push_back(x: 13);
124 BOOST_CHECK(4 == std::get<1>(co_await cobalt::as_tuple(chn.read()))); seq.push_back(x: 14);
125 BOOST_CHECK(5 == co_await chn.read()); seq.push_back(x: 15);
126 BOOST_CHECK(6 == co_await chn.read()); seq.push_back(x: 16);
127 BOOST_CHECK(7 == co_await chn.read()); seq.push_back(x: 17);
128}
129
130
131CO_TEST_CASE(int_)
132{
133 cobalt::channel<int> chn{2u, co_await cobalt::this_coro::executor};
134
135 std::vector<int> seq;
136 auto w = do_write(chn, seq);
137 auto r = do_read(chn, seq);
138
139 co_await r;
140 co_await w;
141 BOOST_REQUIRE(seq.size() == 16);
142 BOOST_CHECK(seq[0] == 0);
143 BOOST_CHECK(seq[1] == 1);
144 BOOST_CHECK(seq[2] == 2);
145 BOOST_CHECK(seq[3] == 10);
146 BOOST_CHECK(seq[4] == 11);
147 BOOST_CHECK(seq[5] == 12);
148 BOOST_CHECK(seq[6] == 3);
149 BOOST_CHECK(seq[7] == 4);
150 BOOST_CHECK(seq[8] == 13);
151 BOOST_CHECK(seq[9] == 14);
152 BOOST_CHECK(seq[10] == 5);
153 BOOST_CHECK(seq[11] == 6);
154 BOOST_CHECK(seq[12] == 15);
155 BOOST_CHECK(seq[13] == 16);
156 BOOST_CHECK(seq[14] == 7);
157 BOOST_CHECK(seq[15] == 17);
158}
159
160cobalt::promise<void> do_write(cobalt::channel<std::string> &chn, std::vector<int> & seq)
161{
162 seq.push_back(x: 0);
163 co_await chn.write(value: std::string("1")); seq.push_back(x: 1);
164 co_await chn.write(value: std::string("2")); seq.push_back(x: 2);
165 co_await chn.write(value: std::string("3")); seq.push_back(x: 3);
166 co_await chn.write(value: std::string("4")); seq.push_back(x: 4);
167 co_await chn.write(value: std::string("5")); seq.push_back(x: 5);
168 co_await chn.write(value: std::string("6")); seq.push_back(x: 6);
169 co_await chn.write(value: std::string("7 but we need to be sure we get ouf of SSO")); seq.push_back(x: 7);
170}
171
172cobalt::promise<void> do_read(cobalt::channel<std::string> &chn, std::vector<int> & seq)
173{
174 seq.push_back(x: 10);
175 BOOST_CHECK("1" == co_await chn.read()); seq.push_back(x: 11);
176 BOOST_CHECK("2" == co_await chn.read()); seq.push_back(x: 12);
177 BOOST_CHECK("3" == co_await chn.read()); seq.push_back(x: 13);
178 BOOST_CHECK("4" == co_await chn.read()); seq.push_back(x: 14);
179 BOOST_CHECK("5" == co_await chn.read()); seq.push_back(x: 15);
180 BOOST_CHECK("6" == co_await chn.read()); seq.push_back(x: 16);
181 BOOST_CHECK("7 but we need to be sure we get ouf of SSO" == co_await chn.read()); seq.push_back(x: 17);
182}
183
184
185CO_TEST_CASE(str)
186{
187 cobalt::channel<std::string> chn{0u, co_await cobalt::this_coro::executor};
188
189 std::vector<int> seq;
190 auto w = do_write(chn, seq);
191 auto r = do_read(chn, seq);
192
193 co_await r;
194 co_await w;
195 BOOST_REQUIRE(seq.size() == 16);
196 BOOST_CHECK(seq[0] == 0);
197 BOOST_CHECK(seq[1] == 10);
198 BOOST_CHECK(seq[2] == 1);
199 BOOST_CHECK(seq[3] == 11);
200 BOOST_CHECK(seq[4] == 2);
201 BOOST_CHECK(seq[5] == 12);
202 BOOST_CHECK(seq[6] == 3);
203 BOOST_CHECK(seq[7] == 13);
204 BOOST_CHECK(seq[8] == 4);
205 BOOST_CHECK(seq[9] == 14);
206 BOOST_CHECK(seq[10] == 5);
207 BOOST_CHECK(seq[11] == 15);
208 BOOST_CHECK(seq[12] == 6);
209 BOOST_CHECK(seq[13] == 16);
210 BOOST_CHECK(seq[14] == 7);
211 BOOST_CHECK(seq[15] == 17);
212}
213
214CO_TEST_CASE(raceable)
215{
216 cobalt::channel<int> ci{0u};
217 cobalt::channel<void> cv{0u};
218 auto [r1, r2] = co_await cobalt::gather(p: cobalt::race(p: ci.read(), p: cv.read()), p: cv.write());
219 r1.value();
220 BOOST_REQUIRE(r1.has_value());
221 BOOST_CHECK(r1->index() == 1u);
222 BOOST_CHECK(!r2.has_error());
223}
224
225CO_TEST_CASE(raceable_1)
226{
227 cobalt::channel<int> ci{1u};
228 cobalt::channel<void> cv{1u};
229 auto [r1, r2] = co_await cobalt::gather(
230 p: cobalt::race(p: ci.read(), p: cv.read()),
231 p: cv.write());
232 BOOST_CHECK(r1->index() == 1u);
233 BOOST_CHECK(!r2.has_error());
234}
235
236
237
238namespace issue_53
239{
240
241cobalt::promise<void> timeout_and_write(cobalt::channel<std::string> &channel)
242{
243 while (!co_await cobalt::this_coro::cancelled)
244 {
245 boost::asio::steady_timer timer{co_await cobalt::this_coro::executor};
246 timer.expires_after(expiry_time: std::chrono::seconds{20});
247 co_await timer.async_wait(token: cobalt::use_op);
248 std::string val("Test!");
249 co_await channel.write(value&: val);
250 }
251
252 co_return;
253}
254
255cobalt::promise<void> read(cobalt::channel<std::string> &channel)
256{
257 while (!co_await cobalt::this_coro::cancelled)
258 co_await channel.read();
259}
260
261cobalt::promise<void> test()
262{
263 cobalt::channel<std::string> channel;
264 co_await cobalt::join(p: timeout_and_write(channel), p: read(channel));
265}
266
267CO_TEST_CASE(issue_93)
268{
269 co_await cobalt::race(p: test(), p: boost::asio::post(token: cobalt::use_op));
270}
271
272cobalt::promise<void> writer(cobalt::channel<int> & c)
273{
274 for (int i = 0; i < 10; i++)
275 co_await c.write(value&: i);
276 c.close();
277}
278
279CO_TEST_CASE(reader)
280{
281 cobalt::channel<int> c;
282
283 +writer(c);
284 int i = 0;
285 BOOST_COBALT_FOR(int value, cobalt::channel_reader(c))
286 BOOST_CHECK(value == i++);
287
288}
289
290
291}
292
293BOOST_AUTO_TEST_SUITE_END();

source code of boost/libs/cobalt/test/channel.cpp