| 1 | // |
| 2 | // composed_7.cpp |
| 3 | // ~~~~~~~~~~~~~~ |
| 4 | // |
| 5 | // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
| 6 | // |
| 7 | // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| 8 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| 9 | // |
| 10 | |
| 11 | #include <boost/asio/compose.hpp> |
| 12 | #include <boost/asio/deferred.hpp> |
| 13 | #include <boost/asio/io_context.hpp> |
| 14 | #include <boost/asio/ip/tcp.hpp> |
| 15 | #include <boost/asio/steady_timer.hpp> |
| 16 | #include <boost/asio/use_future.hpp> |
| 17 | #include <boost/asio/write.hpp> |
| 18 | #include <functional> |
| 19 | #include <iostream> |
| 20 | #include <memory> |
| 21 | #include <sstream> |
| 22 | #include <string> |
| 23 | #include <type_traits> |
| 24 | #include <utility> |
| 25 | |
| 26 | using boost::asio::ip::tcp; |
| 27 | |
| 28 | // NOTE: This example requires the new boost::asio::async_compose function. For |
| 29 | // an example that works with the Networking TS style of completion tokens, |
| 30 | // please see an older version of asio. |
| 31 | |
| 32 | //------------------------------------------------------------------------------ |
| 33 | |
| 34 | // This composed operation shows composition of multiple underlying operations. |
| 35 | // It automatically serialises a message, using its I/O streams insertion |
| 36 | // operator, before sending it N times on the socket. To do this, it must |
| 37 | // allocate a buffer for the encoded message and ensure this buffer's validity |
| 38 | // until all underlying async_write operation complete. A one second delay is |
| 39 | // inserted prior to each write operation, using a steady_timer. |
| 40 | |
| 41 | template <typename T, |
| 42 | boost::asio::completion_token_for<void(boost::system::error_code)> CompletionToken> |
| 43 | auto async_write_messages(tcp::socket& socket, |
| 44 | const T& message, std::size_t repeat_count, |
| 45 | CompletionToken&& token) |
| 46 | // The return type of the initiating function is deduced from the combination |
| 47 | // of: |
| 48 | // |
| 49 | // - the CompletionToken type, |
| 50 | // - the completion handler signature, and |
| 51 | // - the asynchronous operation's initiation function object. |
| 52 | // |
| 53 | // When the completion token is a simple callback, the return type is always |
| 54 | // void. In this example, when the completion token is boost::asio::yield_context |
| 55 | // (used for stackful coroutines) the return type would also be void, as |
| 56 | // there is no non-error argument to the completion handler. When the |
| 57 | // completion token is boost::asio::use_future it would be std::future<void>. When |
| 58 | // the completion token is boost::asio::deferred, the return type differs for each |
| 59 | // asynchronous operation. |
| 60 | // |
| 61 | // In C++20 we can omit the return type as it is automatically deduced from |
| 62 | // the return type of boost::asio::async_compose. |
| 63 | { |
| 64 | // Encode the message and copy it into an allocated buffer. The buffer will |
| 65 | // be maintained for the lifetime of the composed asynchronous operation. |
| 66 | std::ostringstream os; |
| 67 | os << message; |
| 68 | std::unique_ptr<std::string> encoded_message(new std::string(os.str())); |
| 69 | |
| 70 | // Create a steady_timer to be used for the delay between messages. |
| 71 | std::unique_ptr<boost::asio::steady_timer> delay_timer( |
| 72 | new boost::asio::steady_timer(socket.get_executor())); |
| 73 | |
| 74 | // To manage the cycle between the multiple underlying asychronous |
| 75 | // operations, our implementation is a state machine. |
| 76 | enum { starting, waiting, writing }; |
| 77 | |
| 78 | // The boost::asio::async_compose function takes: |
| 79 | // |
| 80 | // - our asynchronous operation implementation, |
| 81 | // - the completion token, |
| 82 | // - the completion handler signature, and |
| 83 | // - any I/O objects (or executors) used by the operation |
| 84 | // |
| 85 | // It then wraps our implementation, which is implemented here as a state |
| 86 | // machine in a lambda, in an intermediate completion handler that meets the |
| 87 | // requirements of a conforming asynchronous operation. This includes |
| 88 | // tracking outstanding work against the I/O executors associated with the |
| 89 | // operation (in this example, this is the socket's executor). |
| 90 | // |
| 91 | // The first argument to our lambda is a reference to the enclosing |
| 92 | // intermediate completion handler. This intermediate completion handler is |
| 93 | // provided for us by the boost::asio::async_compose function, and takes care |
| 94 | // of all the details required to implement a conforming asynchronous |
| 95 | // operation. When calling an underlying asynchronous operation, we pass it |
| 96 | // this enclosing intermediate completion handler as the completion token. |
| 97 | // |
| 98 | // All arguments to our lambda after the first must be defaulted to allow the |
| 99 | // state machine to be started, as well as to allow the completion handler to |
| 100 | // match the completion signature of both the async_write and |
| 101 | // steady_timer::async_wait operations. |
| 102 | return boost::asio::async_compose< |
| 103 | CompletionToken, void(boost::system::error_code)>( |
| 104 | [ |
| 105 | // The implementation holds a reference to the socket as it is used for |
| 106 | // multiple async_write operations. |
| 107 | &socket, |
| 108 | |
| 109 | // The allocated buffer for the encoded message. The std::unique_ptr |
| 110 | // smart pointer is move-only, and as a consequence our lambda |
| 111 | // implementation is also move-only. |
| 112 | encoded_message = std::move(encoded_message), |
| 113 | |
| 114 | // The repeat count remaining. |
| 115 | repeat_count, |
| 116 | |
| 117 | // A steady timer used for introducing a delay. |
| 118 | delay_timer = std::move(delay_timer), |
| 119 | |
| 120 | // To manage the cycle between the multiple underlying asychronous |
| 121 | // operations, our implementation is a state machine. |
| 122 | state = starting |
| 123 | ] |
| 124 | ( |
| 125 | auto& self, |
| 126 | const boost::system::error_code& error = {}, |
| 127 | std::size_t = 0 |
| 128 | ) mutable |
| 129 | { |
| 130 | if (!error) |
| 131 | { |
| 132 | switch (state) |
| 133 | { |
| 134 | case starting: |
| 135 | case writing: |
| 136 | if (repeat_count > 0) |
| 137 | { |
| 138 | --repeat_count; |
| 139 | state = waiting; |
| 140 | delay_timer->expires_after(expiry_time: std::chrono::seconds(1)); |
| 141 | delay_timer->async_wait(std::move(self)); |
| 142 | return; // Composed operation not yet complete. |
| 143 | } |
| 144 | break; // Composed operation complete, continue below. |
| 145 | case waiting: |
| 146 | state = writing; |
| 147 | boost::asio::async_write(socket, |
| 148 | boost::asio::buffer(data&: *encoded_message), std::move(self)); |
| 149 | return; // Composed operation not yet complete. |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | // This point is reached only on completion of the entire composed |
| 154 | // operation. |
| 155 | |
| 156 | // Deallocate the encoded message and delay timer before calling the |
| 157 | // user-supplied completion handler. |
| 158 | encoded_message.reset(); |
| 159 | delay_timer.reset(); |
| 160 | |
| 161 | // Call the user-supplied handler with the result of the operation. |
| 162 | self.complete(error); |
| 163 | }, |
| 164 | token, socket); |
| 165 | } |
| 166 | |
| 167 | //------------------------------------------------------------------------------ |
| 168 | |
| 169 | void test_callback() |
| 170 | { |
| 171 | boost::asio::io_context io_context; |
| 172 | |
| 173 | tcp::acceptor acceptor(io_context, {tcp::v4(), 55555}); |
| 174 | tcp::socket socket = acceptor.accept(); |
| 175 | |
| 176 | // Test our asynchronous operation using a lambda as a callback. |
| 177 | async_write_messages(socket, "Testing callback\r\n" , 5, |
| 178 | [](const boost::system::error_code& error) |
| 179 | { |
| 180 | if (!error) |
| 181 | { |
| 182 | std::cout << "Messages sent\n" ; |
| 183 | } |
| 184 | else |
| 185 | { |
| 186 | std::cout << "Error: " << error.message() << "\n" ; |
| 187 | } |
| 188 | }); |
| 189 | |
| 190 | io_context.run(); |
| 191 | } |
| 192 | |
| 193 | //------------------------------------------------------------------------------ |
| 194 | |
| 195 | void test_deferred() |
| 196 | { |
| 197 | boost::asio::io_context io_context; |
| 198 | |
| 199 | tcp::acceptor acceptor(io_context, {tcp::v4(), 55555}); |
| 200 | tcp::socket socket = acceptor.accept(); |
| 201 | |
| 202 | // Test our asynchronous operation using the deferred completion token. This |
| 203 | // token causes the operation's initiating function to package up the |
| 204 | // operation with its arguments to return a function object, which may then be |
| 205 | // used to launch the asynchronous operation. |
| 206 | boost::asio::async_operation auto op = async_write_messages( |
| 207 | socket, "Testing deferred\r\n" , 5, boost::asio::deferred); |
| 208 | |
| 209 | // Launch the operation using a lambda as a callback. |
| 210 | std::move(op)( |
| 211 | [](const boost::system::error_code& error) |
| 212 | { |
| 213 | if (!error) |
| 214 | { |
| 215 | std::cout << "Messages sent\n" ; |
| 216 | } |
| 217 | else |
| 218 | { |
| 219 | std::cout << "Error: " << error.message() << "\n" ; |
| 220 | } |
| 221 | }); |
| 222 | |
| 223 | io_context.run(); |
| 224 | } |
| 225 | |
| 226 | //------------------------------------------------------------------------------ |
| 227 | |
| 228 | void test_future() |
| 229 | { |
| 230 | boost::asio::io_context io_context; |
| 231 | |
| 232 | tcp::acceptor acceptor(io_context, {tcp::v4(), 55555}); |
| 233 | tcp::socket socket = acceptor.accept(); |
| 234 | |
| 235 | // Test our asynchronous operation using the use_future completion token. |
| 236 | // This token causes the operation's initiating function to return a future, |
| 237 | // which may be used to synchronously wait for the result of the operation. |
| 238 | std::future<void> f = async_write_messages( |
| 239 | socket, "Testing future\r\n" , 5, boost::asio::use_future); |
| 240 | |
| 241 | io_context.run(); |
| 242 | |
| 243 | try |
| 244 | { |
| 245 | // Get the result of the operation. |
| 246 | f.get(); |
| 247 | std::cout << "Messages sent\n" ; |
| 248 | } |
| 249 | catch (const std::exception& e) |
| 250 | { |
| 251 | std::cout << "Error: " << e.what() << "\n" ; |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | //------------------------------------------------------------------------------ |
| 256 | |
| 257 | int main() |
| 258 | { |
| 259 | test_callback(); |
| 260 | test_deferred(); |
| 261 | test_future(); |
| 262 | } |
| 263 | |