1//
2// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#include "test_unit/test_stream.hpp"
9
10#include <boost/mysql/error_code.hpp>
11
12#include <boost/mysql/detail/any_stream.hpp>
13
14#include <boost/asio/buffer.hpp>
15#include <boost/asio/compose.hpp>
16#include <boost/asio/coroutine.hpp>
17#include <boost/asio/error.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/post.hpp>
20
21#include <algorithm>
22#include <cassert>
23#include <cstddef>
24#include <cstdint>
25#include <cstring>
26#include <set>
27#include <vector>
28
29#include "test_common/buffer_concat.hpp"
30#include "test_common/tracker_executor.hpp"
31
32using namespace boost::mysql::test;
33using boost::mysql::error_code;
34
35static boost::asio::io_context ctx;
36
37std::size_t boost::mysql::test::test_stream::get_size_to_read(std::size_t buffer_size) const
38{
39 auto it = read_break_offsets_.upper_bound(x: num_bytes_read_);
40 std::size_t max_bytes_by_break = it == read_break_offsets_.end() ? std::size_t(-1)
41 : *it - num_bytes_read_;
42 return (std::min)(l: {num_unread_bytes(), buffer_size, max_bytes_by_break});
43}
44
45std::size_t boost::mysql::test::test_stream::do_read(asio::mutable_buffer buff, error_code& ec)
46{
47 // Fail count
48 error_code err = fail_count_.maybe_fail();
49 if (err)
50 {
51 ec = err;
52 return 0;
53 }
54
55 // If the user requested some bytes but we don't have any,
56 // fail. In the real world, the stream would block until more
57 // bytes are received, but this is a test, and this condition
58 // indicates an error.
59 if (num_unread_bytes() == 0 && buff.size() != 0)
60 {
61 ec = boost::asio::error::eof;
62 return 0;
63 }
64
65 // Actually read
66 std::size_t bytes_to_transfer = get_size_to_read(buffer_size: buff.size());
67 if (bytes_to_transfer)
68 {
69 std::memcpy(dest: buff.data(), src: bytes_to_read_.data() + num_bytes_read_, n: bytes_to_transfer);
70 num_bytes_read_ += bytes_to_transfer;
71 }
72
73 // Clear errors
74 ec = error_code();
75
76 return bytes_to_transfer;
77}
78
79std::size_t boost::mysql::test::test_stream::do_write(asio::const_buffer buff, error_code& ec)
80{
81 // Fail count
82 error_code err = fail_count_.maybe_fail();
83 if (err)
84 {
85 ec = err;
86 return 0;
87 }
88
89 // Actually write
90 std::size_t num_bytes_to_transfer = (std::min)(a: buff.size(), b: write_break_size_);
91 span<const std::uint8_t> span_to_transfer(
92 static_cast<const std::uint8_t*>(buff.data()),
93 num_bytes_to_transfer
94 );
95 concat(lhs&: bytes_written_, rhs: span_to_transfer);
96
97 // Clear errors
98 ec = error_code();
99
100 return num_bytes_to_transfer;
101}
102
103struct boost::mysql::test::test_stream::read_op : boost::asio::coroutine
104{
105 test_stream& stream_;
106 asio::mutable_buffer buff_;
107
108 read_op(test_stream& stream, asio::mutable_buffer buff) noexcept : stream_(stream), buff_(buff){};
109
110 template <class Self>
111 void operator()(Self& self)
112 {
113 BOOST_ASIO_CORO_REENTER(*this)
114 {
115 BOOST_ASIO_CORO_YIELD boost::asio::post(stream_.get_executor(), std::move(self));
116 {
117 error_code err;
118 std::size_t bytes_read = stream_.do_read(buff: buff_, ec&: err);
119 self.complete(err, bytes_read);
120 }
121 }
122 }
123};
124
125struct boost::mysql::test::test_stream::write_op : boost::asio::coroutine
126{
127 test_stream& stream_;
128 asio::const_buffer buff_;
129
130 write_op(test_stream& stream, asio::const_buffer buff) noexcept : stream_(stream), buff_(buff){};
131
132 template <class Self>
133 void operator()(Self& self)
134 {
135 BOOST_ASIO_CORO_REENTER(*this)
136 {
137 BOOST_ASIO_CORO_YIELD boost::asio::post(stream_.get_executor(), std::move(self));
138 {
139 error_code err;
140 std::size_t bytes_written = stream_.do_write(buff: buff_, ec&: err);
141 self.complete(err, bytes_written);
142 }
143 }
144 }
145};
146
147boost::mysql::test::test_stream::executor_type boost::mysql::test::test_stream::get_executor()
148{
149 return create_tracker_executor(inner: ctx.get_executor(), tracked_values: &executor_info_);
150}
151
152// Reading
153std::size_t boost::mysql::test::test_stream::read_some(asio::mutable_buffer buff, error_code& ec)
154{
155 return do_read(buff, ec);
156}
157void boost::mysql::test::test_stream::async_read_some(
158 asio::mutable_buffer buff,
159 asio::any_completion_handler<void(error_code, std::size_t)> handler
160)
161{
162 boost::asio::async_compose<
163 asio::any_completion_handler<void(error_code, std::size_t)>,
164 void(error_code, std::size_t)>(implementation: read_op(*this, buff), token&: handler, io_objects_or_executors: get_executor());
165}
166
167// Writing
168std::size_t boost::mysql::test::test_stream::write_some(boost::asio::const_buffer buff, error_code& ec)
169{
170 return do_write(buff, ec);
171}
172
173void boost::mysql::test::test_stream::async_write_some(
174 boost::asio::const_buffer buff,
175 asio::any_completion_handler<void(error_code, std::size_t)> handler
176)
177{
178 boost::asio::async_compose<
179 asio::any_completion_handler<void(error_code, std::size_t)>,
180 void(error_code, std::size_t)>(implementation: write_op(*this, buff), token&: handler, io_objects_or_executors: get_executor());
181}
182
183test_stream& boost::mysql::test::test_stream::add_bytes(span<const std::uint8_t> bytes)
184{
185 concat(lhs&: bytes_to_read_, rhs: bytes);
186 return *this;
187}
188
189test_stream& boost::mysql::test::test_stream::add_break(std::size_t byte_num)
190{
191 BOOST_ASSERT(byte_num <= bytes_to_read_.size());
192 read_break_offsets_.insert(x: byte_num);
193 return *this;
194}
195
196template class boost::mysql::detail::any_stream_impl<boost::mysql::test::test_stream>;
197

source code of boost/libs/mysql/test/unit/src/test_stream.cpp