| 1 | // |
| 2 | // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net) |
| 3 | // |
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| 6 | // |
| 7 | |
| 8 | #include <boost/cobalt/channel.hpp> |
| 9 | #include <boost/cobalt/promise.hpp> |
| 10 | #include <boost/cobalt/race.hpp> |
| 11 | #include <boost/cobalt/gather.hpp> |
| 12 | #include <boost/cobalt/async_for.hpp> |
| 13 | |
| 14 | #include <boost/cobalt/join.hpp> |
| 15 | #include <boost/asio/steady_timer.hpp> |
| 16 | |
| 17 | #include "test.hpp" |
| 18 | #include <boost/test/unit_test.hpp> |
| 19 | |
| 20 | namespace cobalt = boost::cobalt; |
| 21 | |
| 22 | cobalt::promise<void> do_write(cobalt::channel<void> &chn, std::vector<int> & seq) |
| 23 | { |
| 24 | seq.push_back(x: 0); |
| 25 | co_await chn.write(); seq.push_back(x: 1); |
| 26 | co_await chn.write(); seq.push_back(x: 2); |
| 27 | (co_await cobalt::as_result(aw: chn.write())).value(); seq.push_back(x: 3); |
| 28 | co_await cobalt::as_tuple(aw: chn.write()); seq.push_back(x: 4); |
| 29 | co_await chn.write(); seq.push_back(x: 5); |
| 30 | co_await chn.write(); seq.push_back(x: 6); |
| 31 | co_await chn.write(); seq.push_back(x: 7); |
| 32 | } |
| 33 | |
| 34 | cobalt::promise<void> do_read(cobalt::channel<void> &chn, std::vector<int> & seq) |
| 35 | { |
| 36 | seq.push_back(x: 10); |
| 37 | co_await chn.read(); seq.push_back(x: 11); |
| 38 | co_await chn.read(); seq.push_back(x: 12); |
| 39 | (co_await cobalt::as_result(aw: chn.read())).value(); seq.push_back(x: 13); |
| 40 | co_await cobalt::as_tuple(aw: chn.read()); seq.push_back(x: 14); |
| 41 | co_await chn.read(); seq.push_back(x: 15); |
| 42 | co_await chn.read(); seq.push_back(x: 16); |
| 43 | co_await chn.read(); seq.push_back(x: 17); |
| 44 | } |
| 45 | |
| 46 | BOOST_AUTO_TEST_SUITE(channel); |
| 47 | |
| 48 | CO_TEST_CASE(void_) |
| 49 | { |
| 50 | cobalt::channel<void> chn{2u, co_await cobalt::this_coro::executor}; |
| 51 | |
| 52 | std::vector<int> seq; |
| 53 | auto r = do_read(chn, seq); |
| 54 | auto w = do_write(chn, seq); |
| 55 | |
| 56 | co_await r; |
| 57 | co_await w; |
| 58 | BOOST_REQUIRE(seq.size() == 16); |
| 59 | BOOST_CHECK(seq[0] == 10); |
| 60 | BOOST_CHECK(seq[1] == 0); |
| 61 | BOOST_CHECK(seq[2] == 1); |
| 62 | BOOST_CHECK(seq[3] == 2); |
| 63 | BOOST_CHECK(seq[4] == 11); |
| 64 | BOOST_CHECK(seq[5] == 12); |
| 65 | BOOST_CHECK(seq[6] == 3); |
| 66 | BOOST_CHECK(seq[7] == 4); |
| 67 | BOOST_CHECK(seq[8] == 13); |
| 68 | BOOST_CHECK(seq[9] == 14); |
| 69 | BOOST_CHECK(seq[10] == 5); |
| 70 | BOOST_CHECK(seq[11] == 6); |
| 71 | BOOST_CHECK(seq[12] == 15); |
| 72 | BOOST_CHECK(seq[13] == 16); |
| 73 | BOOST_CHECK(seq[14] == 7); |
| 74 | BOOST_CHECK(seq[15] == 17); |
| 75 | } |
| 76 | |
| 77 | CO_TEST_CASE(void_0) |
| 78 | { |
| 79 | cobalt::channel<void> chn{0u, co_await cobalt::this_coro::executor}; |
| 80 | |
| 81 | std::vector<int> seq; |
| 82 | auto r = do_read(chn, seq); |
| 83 | auto w = do_write(chn, seq); |
| 84 | |
| 85 | co_await r; |
| 86 | co_await w; |
| 87 | BOOST_REQUIRE(seq.size() == 16); |
| 88 | BOOST_CHECK(seq[0] == 10); |
| 89 | BOOST_CHECK(seq[1] == 0); |
| 90 | BOOST_CHECK(seq[2] == 11); |
| 91 | BOOST_CHECK(seq[3] == 1); |
| 92 | BOOST_CHECK(seq[4] == 12); |
| 93 | BOOST_CHECK(seq[5] == 2); |
| 94 | BOOST_CHECK(seq[6] == 13); |
| 95 | BOOST_CHECK(seq[7] == 3); |
| 96 | BOOST_CHECK(seq[8] == 14); |
| 97 | BOOST_CHECK(seq[9] == 4); |
| 98 | BOOST_CHECK(seq[10] == 15); |
| 99 | BOOST_CHECK(seq[11] == 5); |
| 100 | BOOST_CHECK(seq[12] == 16); |
| 101 | BOOST_CHECK(seq[13] == 6); |
| 102 | BOOST_CHECK(seq[14] == 17); |
| 103 | BOOST_CHECK(seq[15] == 7); |
| 104 | } |
| 105 | |
| 106 | cobalt::promise<void> do_write(cobalt::channel<int> &chn, std::vector<int> & seq) |
| 107 | { |
| 108 | seq.push_back(x: 0); |
| 109 | co_await chn.write(value: 1); seq.push_back(x: 1); |
| 110 | co_await chn.write(value: 2); seq.push_back(x: 2); |
| 111 | (co_await cobalt::as_result(aw: chn.write(value: 3))).value(); seq.push_back(x: 3); |
| 112 | co_await cobalt::as_tuple(aw: chn.write(value: 4)); seq.push_back(x: 4); |
| 113 | co_await chn.write(value: 5); seq.push_back(x: 5); |
| 114 | co_await chn.write(value: 6); seq.push_back(x: 6); |
| 115 | co_await chn.write(value: 7); seq.push_back(x: 7); |
| 116 | } |
| 117 | |
| 118 | cobalt::promise<void> do_read(cobalt::channel<int> &chn, std::vector<int> & seq) |
| 119 | { |
| 120 | seq.push_back(x: 10); |
| 121 | BOOST_CHECK(1 == co_await chn.read()); seq.push_back(x: 11); |
| 122 | BOOST_CHECK(2 == co_await chn.read()); seq.push_back(x: 12); |
| 123 | BOOST_CHECK(3 == (co_await cobalt::as_result(chn.read())).value()); seq.push_back(x: 13); |
| 124 | BOOST_CHECK(4 == std::get<1>(co_await cobalt::as_tuple(chn.read()))); seq.push_back(x: 14); |
| 125 | BOOST_CHECK(5 == co_await chn.read()); seq.push_back(x: 15); |
| 126 | BOOST_CHECK(6 == co_await chn.read()); seq.push_back(x: 16); |
| 127 | BOOST_CHECK(7 == co_await chn.read()); seq.push_back(x: 17); |
| 128 | } |
| 129 | |
| 130 | |
| 131 | CO_TEST_CASE(int_) |
| 132 | { |
| 133 | cobalt::channel<int> chn{2u, co_await cobalt::this_coro::executor}; |
| 134 | |
| 135 | std::vector<int> seq; |
| 136 | auto w = do_write(chn, seq); |
| 137 | auto r = do_read(chn, seq); |
| 138 | |
| 139 | co_await r; |
| 140 | co_await w; |
| 141 | BOOST_REQUIRE(seq.size() == 16); |
| 142 | BOOST_CHECK(seq[0] == 0); |
| 143 | BOOST_CHECK(seq[1] == 1); |
| 144 | BOOST_CHECK(seq[2] == 2); |
| 145 | BOOST_CHECK(seq[3] == 10); |
| 146 | BOOST_CHECK(seq[4] == 11); |
| 147 | BOOST_CHECK(seq[5] == 12); |
| 148 | BOOST_CHECK(seq[6] == 3); |
| 149 | BOOST_CHECK(seq[7] == 4); |
| 150 | BOOST_CHECK(seq[8] == 13); |
| 151 | BOOST_CHECK(seq[9] == 14); |
| 152 | BOOST_CHECK(seq[10] == 5); |
| 153 | BOOST_CHECK(seq[11] == 6); |
| 154 | BOOST_CHECK(seq[12] == 15); |
| 155 | BOOST_CHECK(seq[13] == 16); |
| 156 | BOOST_CHECK(seq[14] == 7); |
| 157 | BOOST_CHECK(seq[15] == 17); |
| 158 | } |
| 159 | |
| 160 | cobalt::promise<void> do_write(cobalt::channel<std::string> &chn, std::vector<int> & seq) |
| 161 | { |
| 162 | seq.push_back(x: 0); |
| 163 | co_await chn.write(value: std::string("1" )); seq.push_back(x: 1); |
| 164 | co_await chn.write(value: std::string("2" )); seq.push_back(x: 2); |
| 165 | co_await chn.write(value: std::string("3" )); seq.push_back(x: 3); |
| 166 | co_await chn.write(value: std::string("4" )); seq.push_back(x: 4); |
| 167 | co_await chn.write(value: std::string("5" )); seq.push_back(x: 5); |
| 168 | co_await chn.write(value: std::string("6" )); seq.push_back(x: 6); |
| 169 | co_await chn.write(value: std::string("7 but we need to be sure we get ouf of SSO" )); seq.push_back(x: 7); |
| 170 | } |
| 171 | |
| 172 | cobalt::promise<void> do_read(cobalt::channel<std::string> &chn, std::vector<int> & seq) |
| 173 | { |
| 174 | seq.push_back(x: 10); |
| 175 | BOOST_CHECK("1" == co_await chn.read()); seq.push_back(x: 11); |
| 176 | BOOST_CHECK("2" == co_await chn.read()); seq.push_back(x: 12); |
| 177 | BOOST_CHECK("3" == co_await chn.read()); seq.push_back(x: 13); |
| 178 | BOOST_CHECK("4" == co_await chn.read()); seq.push_back(x: 14); |
| 179 | BOOST_CHECK("5" == co_await chn.read()); seq.push_back(x: 15); |
| 180 | BOOST_CHECK("6" == co_await chn.read()); seq.push_back(x: 16); |
| 181 | BOOST_CHECK("7 but we need to be sure we get ouf of SSO" == co_await chn.read()); seq.push_back(x: 17); |
| 182 | } |
| 183 | |
| 184 | |
| 185 | CO_TEST_CASE(str) |
| 186 | { |
| 187 | cobalt::channel<std::string> chn{0u, co_await cobalt::this_coro::executor}; |
| 188 | |
| 189 | std::vector<int> seq; |
| 190 | auto w = do_write(chn, seq); |
| 191 | auto r = do_read(chn, seq); |
| 192 | |
| 193 | co_await r; |
| 194 | co_await w; |
| 195 | BOOST_REQUIRE(seq.size() == 16); |
| 196 | BOOST_CHECK(seq[0] == 0); |
| 197 | BOOST_CHECK(seq[1] == 10); |
| 198 | BOOST_CHECK(seq[2] == 1); |
| 199 | BOOST_CHECK(seq[3] == 11); |
| 200 | BOOST_CHECK(seq[4] == 2); |
| 201 | BOOST_CHECK(seq[5] == 12); |
| 202 | BOOST_CHECK(seq[6] == 3); |
| 203 | BOOST_CHECK(seq[7] == 13); |
| 204 | BOOST_CHECK(seq[8] == 4); |
| 205 | BOOST_CHECK(seq[9] == 14); |
| 206 | BOOST_CHECK(seq[10] == 5); |
| 207 | BOOST_CHECK(seq[11] == 15); |
| 208 | BOOST_CHECK(seq[12] == 6); |
| 209 | BOOST_CHECK(seq[13] == 16); |
| 210 | BOOST_CHECK(seq[14] == 7); |
| 211 | BOOST_CHECK(seq[15] == 17); |
| 212 | } |
| 213 | |
| 214 | CO_TEST_CASE(raceable) |
| 215 | { |
| 216 | cobalt::channel<int> ci{0u}; |
| 217 | cobalt::channel<void> cv{0u}; |
| 218 | auto [r1, r2] = co_await cobalt::gather(p: cobalt::race(p: ci.read(), p: cv.read()), p: cv.write()); |
| 219 | r1.value(); |
| 220 | BOOST_REQUIRE(r1.has_value()); |
| 221 | BOOST_CHECK(r1->index() == 1u); |
| 222 | BOOST_CHECK(!r2.has_error()); |
| 223 | } |
| 224 | |
| 225 | CO_TEST_CASE(raceable_1) |
| 226 | { |
| 227 | cobalt::channel<int> ci{1u}; |
| 228 | cobalt::channel<void> cv{1u}; |
| 229 | auto [r1, r2] = co_await cobalt::gather( |
| 230 | p: cobalt::race(p: ci.read(), p: cv.read()), |
| 231 | p: cv.write()); |
| 232 | BOOST_CHECK(r1->index() == 1u); |
| 233 | BOOST_CHECK(!r2.has_error()); |
| 234 | } |
| 235 | |
| 236 | |
| 237 | |
| 238 | namespace issue_53 |
| 239 | { |
| 240 | |
| 241 | cobalt::promise<void> timeout_and_write(cobalt::channel<std::string> &channel) |
| 242 | { |
| 243 | while (!co_await cobalt::this_coro::cancelled) |
| 244 | { |
| 245 | boost::asio::steady_timer timer{co_await cobalt::this_coro::executor}; |
| 246 | timer.expires_after(expiry_time: std::chrono::seconds{20}); |
| 247 | co_await timer.async_wait(token: cobalt::use_op); |
| 248 | std::string val("Test!" ); |
| 249 | co_await channel.write(value&: val); |
| 250 | } |
| 251 | |
| 252 | co_return; |
| 253 | } |
| 254 | |
| 255 | cobalt::promise<void> read(cobalt::channel<std::string> &channel) |
| 256 | { |
| 257 | while (!co_await cobalt::this_coro::cancelled) |
| 258 | co_await channel.read(); |
| 259 | } |
| 260 | |
| 261 | cobalt::promise<void> test() |
| 262 | { |
| 263 | cobalt::channel<std::string> channel; |
| 264 | co_await cobalt::join(p: timeout_and_write(channel), p: read(channel)); |
| 265 | } |
| 266 | |
| 267 | CO_TEST_CASE(issue_93) |
| 268 | { |
| 269 | co_await cobalt::race(p: test(), p: boost::asio::post(token: cobalt::use_op)); |
| 270 | } |
| 271 | |
| 272 | cobalt::promise<void> writer(cobalt::channel<int> & c) |
| 273 | { |
| 274 | for (int i = 0; i < 10; i++) |
| 275 | co_await c.write(value&: i); |
| 276 | c.close(); |
| 277 | } |
| 278 | |
| 279 | CO_TEST_CASE(reader) |
| 280 | { |
| 281 | cobalt::channel<int> c; |
| 282 | |
| 283 | +writer(c); |
| 284 | int i = 0; |
| 285 | BOOST_COBALT_FOR(int value, cobalt::channel_reader(c)) |
| 286 | BOOST_CHECK(value == i++); |
| 287 | |
| 288 | } |
| 289 | |
| 290 | |
| 291 | } |
| 292 | |
| 293 | BOOST_AUTO_TEST_SUITE_END(); |