1#include <boost/asio/associated_executor.hpp>
2#include <boost/asio/bind_executor.hpp>
3#include <boost/asio/execution.hpp>
4#include <condition_variable>
5#include <future>
6#include <memory>
7#include <mutex>
8#include <queue>
9#include <thread>
10#include <vector>
11#include <cctype>
12
13using boost::asio::executor_binder;
14using boost::asio::get_associated_executor;
15namespace execution = boost::asio::execution;
16
17// An executor that launches a new thread for each function submitted to it.
18// This class satisfies the executor requirements.
19class thread_executor
20{
21private:
22 // Singleton execution context that manages threads launched by the new_thread_executor.
23 class thread_bag
24 {
25 friend class thread_executor;
26
27 void add_thread(std::thread&& t)
28 {
29 std::unique_lock<std::mutex> lock(mutex_);
30 threads_.push_back(x: std::move(t));
31 }
32
33 thread_bag() = default;
34
35 ~thread_bag()
36 {
37 for (auto& t : threads_)
38 t.join();
39 }
40
41 std::mutex mutex_;
42 std::vector<std::thread> threads_;
43 };
44
45public:
46 static thread_bag& query(execution::context_t)
47 {
48 static thread_bag threads;
49 return threads;
50 }
51
52 static constexpr auto query(execution::blocking_t)
53 {
54 return execution::blocking.never;
55 }
56
57 template <class Func>
58 void execute(Func f) const
59 {
60 thread_bag& bag = query(execution::context);
61 bag.add_thread(t: std::thread(std::move(f)));
62 }
63
64 friend bool operator==(const thread_executor&,
65 const thread_executor&) noexcept
66 {
67 return true;
68 }
69
70 friend bool operator!=(const thread_executor&,
71 const thread_executor&) noexcept
72 {
73 return false;
74 }
75};
76
77// Base class for all thread-safe queue implementations.
78class queue_impl_base
79{
80 template <class> friend class queue_front;
81 template <class> friend class queue_back;
82 std::mutex mutex_;
83 std::condition_variable condition_;
84 bool stop_ = false;
85};
86
87// Underlying implementation of a thread-safe queue, shared between the
88// queue_front and queue_back classes.
89template <class T>
90class queue_impl : public queue_impl_base
91{
92 template <class> friend class queue_front;
93 template <class> friend class queue_back;
94 std::queue<T> queue_;
95};
96
97// The front end of a queue between consecutive pipeline stages.
98template <class T>
99class queue_front
100{
101public:
102 typedef T value_type;
103
104 explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
105 : impl_(impl)
106 {
107 }
108
109 void push(T t)
110 {
111 std::unique_lock<std::mutex> lock(impl_->mutex_);
112 impl_->queue_.push(std::move(t));
113 impl_->condition_.notify_one();
114 }
115
116 void stop()
117 {
118 std::unique_lock<std::mutex> lock(impl_->mutex_);
119 impl_->stop_ = true;
120 impl_->condition_.notify_one();
121 }
122
123private:
124 std::shared_ptr<queue_impl<T>> impl_;
125};
126
127// The back end of a queue between consecutive pipeline stages.
128template <class T>
129class queue_back
130{
131public:
132 typedef T value_type;
133
134 explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
135 : impl_(impl)
136 {
137 }
138
139 bool pop(T& t)
140 {
141 std::unique_lock<std::mutex> lock(impl_->mutex_);
142 while (impl_->queue_.empty() && !impl_->stop_)
143 impl_->condition_.wait(lock);
144 if (!impl_->queue_.empty())
145 {
146 t = impl_->queue_.front();
147 impl_->queue_.pop();
148 return true;
149 }
150 return false;
151 }
152
153private:
154 std::shared_ptr<queue_impl<T>> impl_;
155};
156
157// Launch the last stage in a pipeline.
158template <class T, class F>
159std::future<void> pipeline(queue_back<T> in, F f)
160{
161 // Get the function's associated executor, defaulting to thread_executor.
162 auto ex = get_associated_executor(f, thread_executor());
163
164 // Run the function, and as we're the last stage return a future so that the
165 // caller can wait for the pipeline to finish.
166 std::packaged_task<void()> task(
167 [in, f = std::move(f)]() mutable
168 {
169 f(in);
170 });
171 std::future<void> fut = task.get_future();
172 boost::asio::require(ex, execution::blocking.never).execute(std::move(task));
173 return fut;
174}
175
176// Launch an intermediate stage in a pipeline.
177template <class T, class F, class... Tail>
178std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
179{
180 // Determine the output queue type.
181 typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
182
183 // Create the output queue and its implementation.
184 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
185 queue_front<output_value_type> out(out_impl);
186 queue_back<output_value_type> next_in(out_impl);
187
188 // Get the function's associated executor, defaulting to thread_executor.
189 auto ex = get_associated_executor(f, thread_executor());
190
191 // Run the function.
192 boost::asio::require(ex, execution::blocking.never).execute(
193 [in, out, f = std::move(f)]() mutable
194 {
195 f(in, out);
196 out.stop();
197 });
198
199 // Launch the rest of the pipeline.
200 return pipeline(next_in, std::move(t)...);
201}
202
203// Launch the first stage in a pipeline.
204template <class F, class... Tail>
205std::future<void> pipeline(F f, Tail... t)
206{
207 // Determine the output queue type.
208 typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
209
210 // Create the output queue and its implementation.
211 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
212 queue_front<output_value_type> out(out_impl);
213 queue_back<output_value_type> next_in(out_impl);
214
215 // Get the function's associated executor, defaulting to thread_executor.
216 auto ex = get_associated_executor(f, thread_executor());
217
218 // Run the function.
219 boost::asio::require(ex, execution::blocking.never).execute(
220 [out, f = std::move(f)]() mutable
221 {
222 f(out);
223 out.stop();
224 });
225
226 // Launch the rest of the pipeline.
227 return pipeline(next_in, std::move(t)...);
228}
229
230//------------------------------------------------------------------------------
231
232#include <boost/asio/static_thread_pool.hpp>
233#include <iostream>
234#include <string>
235
236using boost::asio::bind_executor;
237using boost::asio::static_thread_pool;
238
239void reader(queue_front<std::string> out)
240{
241 std::string line;
242 while (std::getline(is&: std::cin, str&: line))
243 out.push(t: line);
244}
245
246void filter(queue_back<std::string> in, queue_front<std::string> out)
247{
248 std::string line;
249 while (in.pop(t&: line))
250 if (line.length() > 5)
251 out.push(t: line);
252}
253
254void upper(queue_back<std::string> in, queue_front<std::string> out)
255{
256 std::string line;
257 while (in.pop(t&: line))
258 {
259 std::string new_line;
260 for (char c : line)
261 new_line.push_back(c: std::toupper(c: c));
262 out.push(t: new_line);
263 }
264}
265
266void writer(queue_back<std::string> in)
267{
268 std::size_t count = 0;
269 std::string line;
270 while (in.pop(t&: line))
271 std::cout << count++ << ": " << line << std::endl;
272}
273
274int main()
275{
276 static_thread_pool pool(1);
277
278 auto f = pipeline(f: reader, t: filter, t: bind_executor(ex: pool.executor(), t&: upper), t: writer);
279 f.wait();
280}
281

source code of boost/libs/asio/example/cpp14/executors/pipeline.cpp