1//
2// composed_7.cpp
3// ~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#include <boost/asio/compose.hpp>
12#include <boost/asio/deferred.hpp>
13#include <boost/asio/io_context.hpp>
14#include <boost/asio/ip/tcp.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/asio/use_future.hpp>
17#include <boost/asio/write.hpp>
18#include <functional>
19#include <iostream>
20#include <memory>
21#include <sstream>
22#include <string>
23#include <type_traits>
24#include <utility>
25
26using boost::asio::ip::tcp;
27
28// NOTE: This example requires the new boost::asio::async_compose function. For
29// an example that works with the Networking TS style of completion tokens,
30// please see an older version of asio.
31
32//------------------------------------------------------------------------------
33
34// This composed operation shows composition of multiple underlying operations.
35// It automatically serialises a message, using its I/O streams insertion
36// operator, before sending it N times on the socket. To do this, it must
37// allocate a buffer for the encoded message and ensure this buffer's validity
38// until all underlying async_write operation complete. A one second delay is
39// inserted prior to each write operation, using a steady_timer.
40
41template <typename T,
42 boost::asio::completion_token_for<void(boost::system::error_code)> CompletionToken>
43auto async_write_messages(tcp::socket& socket,
44 const T& message, std::size_t repeat_count,
45 CompletionToken&& token)
46 // The return type of the initiating function is deduced from the combination
47 // of:
48 //
49 // - the CompletionToken type,
50 // - the completion handler signature, and
51 // - the asynchronous operation's initiation function object.
52 //
53 // When the completion token is a simple callback, the return type is always
54 // void. In this example, when the completion token is boost::asio::yield_context
55 // (used for stackful coroutines) the return type would also be void, as
56 // there is no non-error argument to the completion handler. When the
57 // completion token is boost::asio::use_future it would be std::future<void>. When
58 // the completion token is boost::asio::deferred, the return type differs for each
59 // asynchronous operation.
60 //
61 // In C++20 we can omit the return type as it is automatically deduced from
62 // the return type of boost::asio::async_compose.
63{
64 // Encode the message and copy it into an allocated buffer. The buffer will
65 // be maintained for the lifetime of the composed asynchronous operation.
66 std::ostringstream os;
67 os << message;
68 std::unique_ptr<std::string> encoded_message(new std::string(os.str()));
69
70 // Create a steady_timer to be used for the delay between messages.
71 std::unique_ptr<boost::asio::steady_timer> delay_timer(
72 new boost::asio::steady_timer(socket.get_executor()));
73
74 // To manage the cycle between the multiple underlying asychronous
75 // operations, our implementation is a state machine.
76 enum { starting, waiting, writing };
77
78 // The boost::asio::async_compose function takes:
79 //
80 // - our asynchronous operation implementation,
81 // - the completion token,
82 // - the completion handler signature, and
83 // - any I/O objects (or executors) used by the operation
84 //
85 // It then wraps our implementation, which is implemented here as a state
86 // machine in a lambda, in an intermediate completion handler that meets the
87 // requirements of a conforming asynchronous operation. This includes
88 // tracking outstanding work against the I/O executors associated with the
89 // operation (in this example, this is the socket's executor).
90 //
91 // The first argument to our lambda is a reference to the enclosing
92 // intermediate completion handler. This intermediate completion handler is
93 // provided for us by the boost::asio::async_compose function, and takes care
94 // of all the details required to implement a conforming asynchronous
95 // operation. When calling an underlying asynchronous operation, we pass it
96 // this enclosing intermediate completion handler as the completion token.
97 //
98 // All arguments to our lambda after the first must be defaulted to allow the
99 // state machine to be started, as well as to allow the completion handler to
100 // match the completion signature of both the async_write and
101 // steady_timer::async_wait operations.
102 return boost::asio::async_compose<
103 CompletionToken, void(boost::system::error_code)>(
104 [
105 // The implementation holds a reference to the socket as it is used for
106 // multiple async_write operations.
107 &socket,
108
109 // The allocated buffer for the encoded message. The std::unique_ptr
110 // smart pointer is move-only, and as a consequence our lambda
111 // implementation is also move-only.
112 encoded_message = std::move(encoded_message),
113
114 // The repeat count remaining.
115 repeat_count,
116
117 // A steady timer used for introducing a delay.
118 delay_timer = std::move(delay_timer),
119
120 // To manage the cycle between the multiple underlying asychronous
121 // operations, our implementation is a state machine.
122 state = starting
123 ]
124 (
125 auto& self,
126 const boost::system::error_code& error = {},
127 std::size_t = 0
128 ) mutable
129 {
130 if (!error)
131 {
132 switch (state)
133 {
134 case starting:
135 case writing:
136 if (repeat_count > 0)
137 {
138 --repeat_count;
139 state = waiting;
140 delay_timer->expires_after(expiry_time: std::chrono::seconds(1));
141 delay_timer->async_wait(std::move(self));
142 return; // Composed operation not yet complete.
143 }
144 break; // Composed operation complete, continue below.
145 case waiting:
146 state = writing;
147 boost::asio::async_write(socket,
148 boost::asio::buffer(data&: *encoded_message), std::move(self));
149 return; // Composed operation not yet complete.
150 }
151 }
152
153 // This point is reached only on completion of the entire composed
154 // operation.
155
156 // Deallocate the encoded message and delay timer before calling the
157 // user-supplied completion handler.
158 encoded_message.reset();
159 delay_timer.reset();
160
161 // Call the user-supplied handler with the result of the operation.
162 self.complete(error);
163 },
164 token, socket);
165}
166
167//------------------------------------------------------------------------------
168
169void test_callback()
170{
171 boost::asio::io_context io_context;
172
173 tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
174 tcp::socket socket = acceptor.accept();
175
176 // Test our asynchronous operation using a lambda as a callback.
177 async_write_messages(socket, "Testing callback\r\n", 5,
178 [](const boost::system::error_code& error)
179 {
180 if (!error)
181 {
182 std::cout << "Messages sent\n";
183 }
184 else
185 {
186 std::cout << "Error: " << error.message() << "\n";
187 }
188 });
189
190 io_context.run();
191}
192
193//------------------------------------------------------------------------------
194
195void test_deferred()
196{
197 boost::asio::io_context io_context;
198
199 tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
200 tcp::socket socket = acceptor.accept();
201
202 // Test our asynchronous operation using the deferred completion token. This
203 // token causes the operation's initiating function to package up the
204 // operation with its arguments to return a function object, which may then be
205 // used to launch the asynchronous operation.
206 boost::asio::async_operation auto op = async_write_messages(
207 socket, "Testing deferred\r\n", 5, boost::asio::deferred);
208
209 // Launch the operation using a lambda as a callback.
210 std::move(op)(
211 [](const boost::system::error_code& error)
212 {
213 if (!error)
214 {
215 std::cout << "Messages sent\n";
216 }
217 else
218 {
219 std::cout << "Error: " << error.message() << "\n";
220 }
221 });
222
223 io_context.run();
224}
225
226//------------------------------------------------------------------------------
227
228void test_future()
229{
230 boost::asio::io_context io_context;
231
232 tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
233 tcp::socket socket = acceptor.accept();
234
235 // Test our asynchronous operation using the use_future completion token.
236 // This token causes the operation's initiating function to return a future,
237 // which may be used to synchronously wait for the result of the operation.
238 std::future<void> f = async_write_messages(
239 socket, "Testing future\r\n", 5, boost::asio::use_future);
240
241 io_context.run();
242
243 try
244 {
245 // Get the result of the operation.
246 f.get();
247 std::cout << "Messages sent\n";
248 }
249 catch (const std::exception& e)
250 {
251 std::cout << "Error: " << e.what() << "\n";
252 }
253}
254
255//------------------------------------------------------------------------------
256
257int main()
258{
259 test_callback();
260 test_deferred();
261 test_future();
262}
263

source code of boost/libs/asio/example/cpp20/operations/composed_7.cpp