1 | // |
2 | // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com) |
3 | // |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
6 | // |
7 | |
8 | #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP |
9 | #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP |
10 | |
11 | #include <boost/mysql/client_errc.hpp> |
12 | #include <boost/mysql/error_code.hpp> |
13 | |
14 | #include <boost/mysql/impl/internal/protocol/constants.hpp> |
15 | #include <boost/mysql/impl/internal/protocol/protocol.hpp> |
16 | #include <boost/mysql/impl/internal/sansio/read_buffer.hpp> |
17 | |
18 | #include <boost/asio/coroutine.hpp> |
19 | #include <boost/assert.hpp> |
20 | |
21 | #include <cstddef> |
22 | #include <cstdint> |
23 | |
24 | namespace boost { |
25 | namespace mysql { |
26 | namespace detail { |
27 | |
28 | // Flow: |
29 | // Prepare a read operation with prepare_read() |
30 | // In a loop, until done(): |
31 | // prepare_buffer() to resize the buffer to an appropriate size |
32 | // Read bytes against buffer() |
33 | // Call resume with the number of bytes read |
34 | // Or call prepare_read() and check done() to attempt to get a cached message |
35 | // (further prepare_read calls should use keep_state=true) |
36 | class message_reader |
37 | { |
38 | public: |
39 | message_reader(std::size_t initial_buffer_size, std::size_t max_frame_size = MAX_PACKET_SIZE) |
40 | : buffer_(initial_buffer_size), max_frame_size_(max_frame_size) |
41 | { |
42 | } |
43 | |
44 | void reset() noexcept |
45 | { |
46 | buffer_.reset(); |
47 | state_ = parse_state(); |
48 | } |
49 | |
50 | // Prepares a read operation. sequence_number should be kept alive until |
51 | // the next read is prepared or no more calls to resume() are expected. |
52 | // If keep_state=true, and the op is not complete, parsing state is preserved |
53 | void prepare_read(std::uint8_t& sequence_number, bool keep_state = false) noexcept |
54 | { |
55 | if (!keep_state || done()) |
56 | state_ = parse_state(sequence_number); |
57 | else |
58 | state_.sequence_number = &sequence_number; |
59 | resume(bytes_read: 0); |
60 | } |
61 | |
62 | // Is parsing the current message done? |
63 | bool done() const noexcept { return state_.coro.is_complete(); } |
64 | |
65 | // Returns any errors generated during parsing. Requires this->done() |
66 | error_code error() const noexcept |
67 | { |
68 | BOOST_ASSERT(done()); |
69 | return state_.ec; |
70 | } |
71 | |
72 | // Returns the last parsed message. Valid until prepare_buffer() |
73 | // is next called. Requires done() && !error() |
74 | span<const std::uint8_t> message() const noexcept |
75 | { |
76 | BOOST_ASSERT(done()); |
77 | BOOST_ASSERT(!error()); |
78 | return buffer_.current_message(); |
79 | } |
80 | |
81 | // Returns buffer space suitable to read bytes to |
82 | span<std::uint8_t> buffer() noexcept { return buffer_.free_area(); } |
83 | |
84 | // Removes old messages stored in the buffer, and resizes it, if required, to accomodate |
85 | // the message currently being parsed. |
86 | void prepare_buffer() |
87 | { |
88 | buffer_.remove_reserved(); |
89 | buffer_.grow_to_fit(n: state_.required_size); |
90 | state_.required_size = 0; |
91 | } |
92 | |
93 | // The main operation. Call it after reading bytes against buffer(), |
94 | // with the number of bytes read |
95 | void resume(std::size_t bytes_read) |
96 | { |
97 | frame_header {}; |
98 | buffer_.move_to_pending(length: bytes_read); |
99 | |
100 | BOOST_ASIO_CORO_REENTER(state_.coro) |
101 | { |
102 | // Move the previously parsed message to the reserved area, if any |
103 | buffer_.move_to_reserved(length: buffer_.current_message_size()); |
104 | |
105 | while (true) |
106 | { |
107 | // Read the header |
108 | set_required_size(frame_header_size); |
109 | while (buffer_.pending_size() < frame_header_size) |
110 | BOOST_ASIO_CORO_YIELD; |
111 | |
112 | // Mark the header as belonging to the current message |
113 | buffer_.move_to_current_message(length: frame_header_size); |
114 | |
115 | // Deserialize the header |
116 | header = deserialize_frame_header(buffer: span<const std::uint8_t, frame_header_size>( |
117 | buffer_.pending_first() - frame_header_size, |
118 | frame_header_size |
119 | )); |
120 | |
121 | // Process the sequence number |
122 | if (*state_.sequence_number != header.sequence_number) |
123 | { |
124 | state_.ec = client_errc::sequence_number_mismatch; |
125 | BOOST_ASIO_CORO_YIELD break; |
126 | } |
127 | ++*state_.sequence_number; |
128 | |
129 | // Process the packet size |
130 | state_.body_bytes = header.size; |
131 | state_.more_frames_follow = (state_.body_bytes == max_frame_size_); |
132 | |
133 | // We are done with the header |
134 | if (state_.is_first_frame) |
135 | { |
136 | // If it's the 1st frame, we can just move the header bytes to the reserved |
137 | // area, avoiding a big memmove |
138 | buffer_.move_to_reserved(length: frame_header_size); |
139 | } |
140 | else |
141 | { |
142 | buffer_.remove_current_message_last(length: frame_header_size); |
143 | } |
144 | state_.is_first_frame = false; |
145 | |
146 | // Read the body |
147 | set_required_size(state_.body_bytes); |
148 | while (buffer_.pending_size() < state_.body_bytes) |
149 | BOOST_ASIO_CORO_YIELD; |
150 | |
151 | buffer_.move_to_current_message(length: state_.body_bytes); |
152 | |
153 | // Check if we're done |
154 | if (!state_.more_frames_follow) |
155 | { |
156 | BOOST_ASIO_CORO_YIELD break; |
157 | } |
158 | } |
159 | } |
160 | } |
161 | |
162 | // Exposed for testing |
163 | const read_buffer& internal_buffer() const noexcept { return buffer_; } |
164 | |
165 | private: |
166 | read_buffer buffer_; |
167 | std::size_t max_frame_size_; |
168 | |
169 | struct parse_state |
170 | { |
171 | asio::coroutine coro; |
172 | std::uint8_t* sequence_number{}; |
173 | bool is_first_frame{true}; |
174 | std::size_t body_bytes{0}; |
175 | bool more_frames_follow{false}; |
176 | std::size_t required_size{0}; |
177 | error_code ec; |
178 | |
179 | parse_state() = default; |
180 | parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {} |
181 | } state_; |
182 | |
183 | void set_required_size(std::size_t required_bytes) noexcept |
184 | { |
185 | if (required_bytes > buffer_.pending_size()) |
186 | state_.required_size = required_bytes - buffer_.pending_size(); |
187 | else |
188 | state_.required_size = 0; |
189 | } |
190 | }; |
191 | |
192 | } // namespace detail |
193 | } // namespace mysql |
194 | } // namespace boost |
195 | |
196 | #endif |
197 | |