1//
2// Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7
8#ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
9#define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP
10
11#include <boost/mysql/client_errc.hpp>
12#include <boost/mysql/error_code.hpp>
13
14#include <boost/mysql/impl/internal/protocol/constants.hpp>
15#include <boost/mysql/impl/internal/protocol/protocol.hpp>
16#include <boost/mysql/impl/internal/sansio/read_buffer.hpp>
17
18#include <boost/asio/coroutine.hpp>
19#include <boost/assert.hpp>
20
21#include <cstddef>
22#include <cstdint>
23
24namespace boost {
25namespace mysql {
26namespace detail {
27
28// Flow:
29// Prepare a read operation with prepare_read()
30// In a loop, until done():
31// prepare_buffer() to resize the buffer to an appropriate size
32// Read bytes against buffer()
33// Call resume with the number of bytes read
34// Or call prepare_read() and check done() to attempt to get a cached message
35// (further prepare_read calls should use keep_state=true)
36class message_reader
37{
38public:
39 message_reader(std::size_t initial_buffer_size, std::size_t max_frame_size = MAX_PACKET_SIZE)
40 : buffer_(initial_buffer_size), max_frame_size_(max_frame_size)
41 {
42 }
43
44 void reset() noexcept
45 {
46 buffer_.reset();
47 state_ = parse_state();
48 }
49
50 // Prepares a read operation. sequence_number should be kept alive until
51 // the next read is prepared or no more calls to resume() are expected.
52 // If keep_state=true, and the op is not complete, parsing state is preserved
53 void prepare_read(std::uint8_t& sequence_number, bool keep_state = false) noexcept
54 {
55 if (!keep_state || done())
56 state_ = parse_state(sequence_number);
57 else
58 state_.sequence_number = &sequence_number;
59 resume(bytes_read: 0);
60 }
61
62 // Is parsing the current message done?
63 bool done() const noexcept { return state_.coro.is_complete(); }
64
65 // Returns any errors generated during parsing. Requires this->done()
66 error_code error() const noexcept
67 {
68 BOOST_ASSERT(done());
69 return state_.ec;
70 }
71
72 // Returns the last parsed message. Valid until prepare_buffer()
73 // is next called. Requires done() && !error()
74 span<const std::uint8_t> message() const noexcept
75 {
76 BOOST_ASSERT(done());
77 BOOST_ASSERT(!error());
78 return buffer_.current_message();
79 }
80
81 // Returns buffer space suitable to read bytes to
82 span<std::uint8_t> buffer() noexcept { return buffer_.free_area(); }
83
84 // Removes old messages stored in the buffer, and resizes it, if required, to accomodate
85 // the message currently being parsed.
86 void prepare_buffer()
87 {
88 buffer_.remove_reserved();
89 buffer_.grow_to_fit(n: state_.required_size);
90 state_.required_size = 0;
91 }
92
93 // The main operation. Call it after reading bytes against buffer(),
94 // with the number of bytes read
95 void resume(std::size_t bytes_read)
96 {
97 frame_header header{};
98 buffer_.move_to_pending(length: bytes_read);
99
100 BOOST_ASIO_CORO_REENTER(state_.coro)
101 {
102 // Move the previously parsed message to the reserved area, if any
103 buffer_.move_to_reserved(length: buffer_.current_message_size());
104
105 while (true)
106 {
107 // Read the header
108 set_required_size(frame_header_size);
109 while (buffer_.pending_size() < frame_header_size)
110 BOOST_ASIO_CORO_YIELD;
111
112 // Mark the header as belonging to the current message
113 buffer_.move_to_current_message(length: frame_header_size);
114
115 // Deserialize the header
116 header = deserialize_frame_header(buffer: span<const std::uint8_t, frame_header_size>(
117 buffer_.pending_first() - frame_header_size,
118 frame_header_size
119 ));
120
121 // Process the sequence number
122 if (*state_.sequence_number != header.sequence_number)
123 {
124 state_.ec = client_errc::sequence_number_mismatch;
125 BOOST_ASIO_CORO_YIELD break;
126 }
127 ++*state_.sequence_number;
128
129 // Process the packet size
130 state_.body_bytes = header.size;
131 state_.more_frames_follow = (state_.body_bytes == max_frame_size_);
132
133 // We are done with the header
134 if (state_.is_first_frame)
135 {
136 // If it's the 1st frame, we can just move the header bytes to the reserved
137 // area, avoiding a big memmove
138 buffer_.move_to_reserved(length: frame_header_size);
139 }
140 else
141 {
142 buffer_.remove_current_message_last(length: frame_header_size);
143 }
144 state_.is_first_frame = false;
145
146 // Read the body
147 set_required_size(state_.body_bytes);
148 while (buffer_.pending_size() < state_.body_bytes)
149 BOOST_ASIO_CORO_YIELD;
150
151 buffer_.move_to_current_message(length: state_.body_bytes);
152
153 // Check if we're done
154 if (!state_.more_frames_follow)
155 {
156 BOOST_ASIO_CORO_YIELD break;
157 }
158 }
159 }
160 }
161
162 // Exposed for testing
163 const read_buffer& internal_buffer() const noexcept { return buffer_; }
164
165private:
166 read_buffer buffer_;
167 std::size_t max_frame_size_;
168
169 struct parse_state
170 {
171 asio::coroutine coro;
172 std::uint8_t* sequence_number{};
173 bool is_first_frame{true};
174 std::size_t body_bytes{0};
175 bool more_frames_follow{false};
176 std::size_t required_size{0};
177 error_code ec;
178
179 parse_state() = default;
180 parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {}
181 } state_;
182
183 void set_required_size(std::size_t required_bytes) noexcept
184 {
185 if (required_bytes > buffer_.pending_size())
186 state_.required_size = required_bytes - buffer_.pending_size();
187 else
188 state_.required_size = 0;
189 }
190};
191
192} // namespace detail
193} // namespace mysql
194} // namespace boost
195
196#endif
197

source code of boost/libs/mysql/include/boost/mysql/impl/internal/sansio/message_reader.hpp