| 1 | // |
| 2 | // Copyright (c) 2019-2024 Ruben Perez Hidalgo (rubenperez038 at gmail dot com) |
| 3 | // |
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| 6 | // |
| 7 | |
| 8 | #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP |
| 9 | #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_MESSAGE_READER_HPP |
| 10 | |
| 11 | #include <boost/mysql/client_errc.hpp> |
| 12 | #include <boost/mysql/error_code.hpp> |
| 13 | |
| 14 | #include <boost/mysql/impl/internal/protocol/constants.hpp> |
| 15 | #include <boost/mysql/impl/internal/protocol/protocol.hpp> |
| 16 | #include <boost/mysql/impl/internal/sansio/read_buffer.hpp> |
| 17 | |
| 18 | #include <boost/asio/coroutine.hpp> |
| 19 | #include <boost/assert.hpp> |
| 20 | |
| 21 | #include <cstddef> |
| 22 | #include <cstdint> |
| 23 | |
| 24 | namespace boost { |
| 25 | namespace mysql { |
| 26 | namespace detail { |
| 27 | |
| 28 | // Flow: |
| 29 | // Prepare a read operation with prepare_read() |
| 30 | // In a loop, until done(): |
| 31 | // prepare_buffer() to resize the buffer to an appropriate size |
| 32 | // Read bytes against buffer() |
| 33 | // Call resume with the number of bytes read |
| 34 | // Or call prepare_read() and check done() to attempt to get a cached message |
| 35 | // (further prepare_read calls should use keep_state=true) |
| 36 | class message_reader |
| 37 | { |
| 38 | public: |
| 39 | message_reader(std::size_t initial_buffer_size, std::size_t max_frame_size = MAX_PACKET_SIZE) |
| 40 | : buffer_(initial_buffer_size), max_frame_size_(max_frame_size) |
| 41 | { |
| 42 | } |
| 43 | |
| 44 | void reset() noexcept |
| 45 | { |
| 46 | buffer_.reset(); |
| 47 | state_ = parse_state(); |
| 48 | } |
| 49 | |
| 50 | // Prepares a read operation. sequence_number should be kept alive until |
| 51 | // the next read is prepared or no more calls to resume() are expected. |
| 52 | // If keep_state=true, and the op is not complete, parsing state is preserved |
| 53 | void prepare_read(std::uint8_t& sequence_number, bool keep_state = false) noexcept |
| 54 | { |
| 55 | if (!keep_state || done()) |
| 56 | state_ = parse_state(sequence_number); |
| 57 | else |
| 58 | state_.sequence_number = &sequence_number; |
| 59 | resume(bytes_read: 0); |
| 60 | } |
| 61 | |
| 62 | // Is parsing the current message done? |
| 63 | bool done() const noexcept { return state_.coro.is_complete(); } |
| 64 | |
| 65 | // Returns any errors generated during parsing. Requires this->done() |
| 66 | error_code error() const noexcept |
| 67 | { |
| 68 | BOOST_ASSERT(done()); |
| 69 | return state_.ec; |
| 70 | } |
| 71 | |
| 72 | // Returns the last parsed message. Valid until prepare_buffer() |
| 73 | // is next called. Requires done() && !error() |
| 74 | span<const std::uint8_t> message() const noexcept |
| 75 | { |
| 76 | BOOST_ASSERT(done()); |
| 77 | BOOST_ASSERT(!error()); |
| 78 | return buffer_.current_message(); |
| 79 | } |
| 80 | |
| 81 | // Returns buffer space suitable to read bytes to |
| 82 | span<std::uint8_t> buffer() noexcept { return buffer_.free_area(); } |
| 83 | |
| 84 | // Removes old messages stored in the buffer, and resizes it, if required, to accomodate |
| 85 | // the message currently being parsed. |
| 86 | void prepare_buffer() |
| 87 | { |
| 88 | buffer_.remove_reserved(); |
| 89 | buffer_.grow_to_fit(n: state_.required_size); |
| 90 | state_.required_size = 0; |
| 91 | } |
| 92 | |
| 93 | // The main operation. Call it after reading bytes against buffer(), |
| 94 | // with the number of bytes read |
| 95 | void resume(std::size_t bytes_read) |
| 96 | { |
| 97 | frame_header {}; |
| 98 | buffer_.move_to_pending(length: bytes_read); |
| 99 | |
| 100 | BOOST_ASIO_CORO_REENTER(state_.coro) |
| 101 | { |
| 102 | // Move the previously parsed message to the reserved area, if any |
| 103 | buffer_.move_to_reserved(length: buffer_.current_message_size()); |
| 104 | |
| 105 | while (true) |
| 106 | { |
| 107 | // Read the header |
| 108 | set_required_size(frame_header_size); |
| 109 | while (buffer_.pending_size() < frame_header_size) |
| 110 | BOOST_ASIO_CORO_YIELD; |
| 111 | |
| 112 | // Mark the header as belonging to the current message |
| 113 | buffer_.move_to_current_message(length: frame_header_size); |
| 114 | |
| 115 | // Deserialize the header |
| 116 | header = deserialize_frame_header(buffer: span<const std::uint8_t, frame_header_size>( |
| 117 | buffer_.pending_first() - frame_header_size, |
| 118 | frame_header_size |
| 119 | )); |
| 120 | |
| 121 | // Process the sequence number |
| 122 | if (*state_.sequence_number != header.sequence_number) |
| 123 | { |
| 124 | state_.ec = client_errc::sequence_number_mismatch; |
| 125 | BOOST_ASIO_CORO_YIELD break; |
| 126 | } |
| 127 | ++*state_.sequence_number; |
| 128 | |
| 129 | // Process the packet size |
| 130 | state_.body_bytes = header.size; |
| 131 | state_.more_frames_follow = (state_.body_bytes == max_frame_size_); |
| 132 | |
| 133 | // We are done with the header |
| 134 | if (state_.is_first_frame) |
| 135 | { |
| 136 | // If it's the 1st frame, we can just move the header bytes to the reserved |
| 137 | // area, avoiding a big memmove |
| 138 | buffer_.move_to_reserved(length: frame_header_size); |
| 139 | } |
| 140 | else |
| 141 | { |
| 142 | buffer_.remove_current_message_last(length: frame_header_size); |
| 143 | } |
| 144 | state_.is_first_frame = false; |
| 145 | |
| 146 | // Read the body |
| 147 | set_required_size(state_.body_bytes); |
| 148 | while (buffer_.pending_size() < state_.body_bytes) |
| 149 | BOOST_ASIO_CORO_YIELD; |
| 150 | |
| 151 | buffer_.move_to_current_message(length: state_.body_bytes); |
| 152 | |
| 153 | // Check if we're done |
| 154 | if (!state_.more_frames_follow) |
| 155 | { |
| 156 | BOOST_ASIO_CORO_YIELD break; |
| 157 | } |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | // Exposed for testing |
| 163 | const read_buffer& internal_buffer() const noexcept { return buffer_; } |
| 164 | |
| 165 | private: |
| 166 | read_buffer buffer_; |
| 167 | std::size_t max_frame_size_; |
| 168 | |
| 169 | struct parse_state |
| 170 | { |
| 171 | asio::coroutine coro; |
| 172 | std::uint8_t* sequence_number{}; |
| 173 | bool is_first_frame{true}; |
| 174 | std::size_t body_bytes{0}; |
| 175 | bool more_frames_follow{false}; |
| 176 | std::size_t required_size{0}; |
| 177 | error_code ec; |
| 178 | |
| 179 | parse_state() = default; |
| 180 | parse_state(std::uint8_t& seqnum) noexcept : sequence_number(&seqnum) {} |
| 181 | } state_; |
| 182 | |
| 183 | void set_required_size(std::size_t required_bytes) noexcept |
| 184 | { |
| 185 | if (required_bytes > buffer_.pending_size()) |
| 186 | state_.required_size = required_bytes - buffer_.pending_size(); |
| 187 | else |
| 188 | state_.required_size = 0; |
| 189 | } |
| 190 | }; |
| 191 | |
| 192 | } // namespace detail |
| 193 | } // namespace mysql |
| 194 | } // namespace boost |
| 195 | |
| 196 | #endif |
| 197 | |