1/*
2 * Copyright Andrey Semashev 2007 - 2015.
3 * Distributed under the Boost Software License, Version 1.0.
4 * (See accompanying file LICENSE_1_0.txt or copy at
5 * http://www.boost.org/LICENSE_1_0.txt)
6 */
7/*!
8 * \file async_frontend.hpp
9 * \author Andrey Semashev
10 * \date 14.07.2009
11 *
12 * The header contains implementation of asynchronous sink frontend.
13 */
14
15#ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
16#define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
17
18#include <exception> // std::terminate
19#include <boost/log/detail/config.hpp>
20
21#ifdef BOOST_HAS_PRAGMA_ONCE
22#pragma once
23#endif
24
25#if defined(BOOST_LOG_NO_THREADS)
26#error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
27#endif
28
29#include <boost/memory_order.hpp>
30#include <boost/atomic/atomic.hpp>
31#include <boost/smart_ptr/shared_ptr.hpp>
32#include <boost/smart_ptr/make_shared_object.hpp>
33#include <boost/preprocessor/control/if.hpp>
34#include <boost/preprocessor/comparison/equal.hpp>
35#include <boost/thread/locks.hpp>
36#include <boost/thread/recursive_mutex.hpp>
37#include <boost/thread/thread.hpp>
38#include <boost/thread/condition_variable.hpp>
39#include <boost/log/exceptions.hpp>
40#include <boost/log/detail/locking_ptr.hpp>
41#include <boost/log/detail/parameter_tools.hpp>
42#include <boost/log/core/record_view.hpp>
43#include <boost/log/sinks/basic_sink_frontend.hpp>
44#include <boost/log/sinks/frontend_requirements.hpp>
45#include <boost/log/sinks/unbounded_fifo_queue.hpp>
46#include <boost/log/keywords/start_thread.hpp>
47#include <boost/log/detail/header.hpp>
48
49namespace boost {
50
51BOOST_LOG_OPEN_NAMESPACE
52
53namespace sinks {
54
55#ifndef BOOST_LOG_DOXYGEN_PASS
56
57#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
58 template< typename T0 >\
59 explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
60 base_type(true),\
61 queue_base_type(arg0),\
62 m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
63 m_ActiveOperation(idle),\
64 m_StopRequested(false),\
65 m_FlushRequested(false)\
66 {\
67 if (arg0[keywords::start_thread | true])\
68 start_feeding_thread();\
69 }\
70 template< typename T0 >\
71 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
72 base_type(true),\
73 queue_base_type(arg0),\
74 m_pBackend(backend),\
75 m_ActiveOperation(idle),\
76 m_StopRequested(false),\
77 m_FlushRequested(false)\
78 {\
79 if (arg0[keywords::start_thread | true])\
80 start_feeding_thread();\
81 }
82
83#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
84 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
85 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
86 base_type(true),\
87 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
88 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
89 m_ActiveOperation(idle),\
90 m_StopRequested(false),\
91 m_FlushRequested(false)\
92 {\
93 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
94 start_feeding_thread();\
95 }\
96 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
97 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
98 base_type(true),\
99 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
100 m_pBackend(backend),\
101 m_ActiveOperation(idle),\
102 m_StopRequested(false),\
103 m_FlushRequested(false)\
104 {\
105 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
106 start_feeding_thread();\
107 }
108
109#define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
110 BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
111
112#endif // BOOST_LOG_DOXYGEN_PASS
113
114/*!
115 * \brief Asynchronous logging sink frontend
116 *
117 * The frontend starts a separate thread on construction. All logging records are passed
118 * to the backend in this dedicated thread.
119 *
120 * The user can prevent spawning the internal thread by specifying \c start_thread parameter
121 * with the value of \c false on construction. In this case log records will be buffered
122 * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
123 * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
124 * parameter.
125 */
126template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
127class asynchronous_sink :
128 public aux::make_sink_frontend_base< SinkBackendT >::type,
129 public QueueingStrategyT
130{
131 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
132 typedef QueueingStrategyT queue_base_type;
133
134private:
135 //! Backend synchronization mutex type
136 typedef boost::recursive_mutex backend_mutex_type;
137 //! Frontend synchronization mutex type
138 typedef typename base_type::mutex_type frontend_mutex_type;
139
140 //! Operation bit mask
141 enum operation
142 {
143 idle = 0u,
144 feeding_records = 1u,
145 flushing = 3u
146 };
147
148 //! Function object to run the log record feeding thread
149 class run_func
150 {
151 public:
152 typedef void result_type;
153
154 private:
155 asynchronous_sink* m_self;
156
157 public:
158 explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
159 {
160 }
161
162 result_type operator()() const
163 {
164 m_self->run();
165 }
166 };
167
168 //! A scope guard that implements active operation management
169 class scoped_feeding_operation
170 {
171 private:
172 asynchronous_sink& m_self;
173
174 public:
175 //! Initializing constructor
176 explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
177 {
178 }
179 //! Destructor
180 ~scoped_feeding_operation()
181 {
182 m_self.complete_feeding_operation();
183 }
184
185 BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
186 BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
187 };
188
189 //! A scope guard that resets a flag on destructor
190 class scoped_flag
191 {
192 private:
193 frontend_mutex_type& m_Mutex;
194 condition_variable_any& m_Cond;
195 boost::atomic< bool >& m_Flag;
196
197 public:
198 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
199 m_Mutex(mut), m_Cond(cond), m_Flag(f)
200 {
201 }
202 ~scoped_flag()
203 {
204 try
205 {
206 lock_guard< frontend_mutex_type > lock(m_Mutex);
207 m_Flag.store(v: false, order: boost::memory_order_relaxed);
208 m_Cond.notify_all();
209 }
210 catch (...)
211 {
212 }
213 }
214
215 BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
216 BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
217 };
218
219public:
220 //! Sink implementation type
221 typedef SinkBackendT sink_backend_type;
222 //! \cond
223 static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
224 //! \endcond
225
226#ifndef BOOST_LOG_DOXYGEN_PASS
227
228 //! A pointer type that locks the backend until it's destroyed
229 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
230
231#else // BOOST_LOG_DOXYGEN_PASS
232
233 //! A pointer type that locks the backend until it's destroyed
234 typedef implementation_defined locked_backend_ptr;
235
236#endif // BOOST_LOG_DOXYGEN_PASS
237
238private:
239 //! Synchronization mutex
240 backend_mutex_type m_BackendMutex;
241 //! Pointer to the backend
242 const shared_ptr< sink_backend_type > m_pBackend;
243
244 //! Dedicated record feeding thread
245 thread m_DedicatedFeedingThread;
246 //! Condition variable to implement blocking operations
247 condition_variable_any m_BlockCond;
248
249 //! Currently active operation
250 operation m_ActiveOperation;
251 //! The flag indicates that the feeding loop has to be stopped
252 boost::atomic< bool > m_StopRequested;
253 //! The flag indicates that queue flush has been requested
254 boost::atomic< bool > m_FlushRequested;
255
256public:
257 /*!
258 * Default constructor. Constructs the sink backend instance.
259 * Requires the backend to be default-constructible.
260 *
261 * \param start_thread If \c true, the frontend creates a thread to feed
262 * log records to the backend. Otherwise no thread is
263 * started and it is assumed that the user will call
264 * \c run, \c feed_records or \c flush himself.
265 */
266 explicit asynchronous_sink(bool start_thread = true) :
267 base_type(true),
268 m_pBackend(boost::make_shared< sink_backend_type >()),
269 m_ActiveOperation(idle),
270 m_StopRequested(false),
271 m_FlushRequested(false)
272 {
273 if (start_thread)
274 start_feeding_thread();
275 }
276 /*!
277 * Constructor attaches user-constructed backend instance
278 *
279 * \param backend Pointer to the backend instance.
280 * \param start_thread If \c true, the frontend creates a thread to feed
281 * log records to the backend. Otherwise no thread is
282 * started and it is assumed that the user will call
283 * \c run, \c feed_records or \c flush himself.
284 *
285 * \pre \a backend is not \c NULL.
286 */
287 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
288 base_type(true),
289 m_pBackend(backend),
290 m_ActiveOperation(idle),
291 m_StopRequested(false),
292 m_FlushRequested(false)
293 {
294 if (start_thread)
295 start_feeding_thread();
296 }
297
298 /*!
299 * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
300 * Refer to the backend documentation for the list of supported parameters.
301 *
302 * The frontend uses the following named parameters:
303 *
304 * \li start_thread - If \c true, the frontend creates a thread to feed
305 * log records to the backend. Otherwise no thread is
306 * started and it is assumed that the user will call
307 * \c run, \c feed_records or \c flush himself.
308 */
309#ifndef BOOST_LOG_DOXYGEN_PASS
310 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
311#else
312 template< typename... Args >
313 explicit asynchronous_sink(Args&&... args);
314#endif
315
316 /*!
317 * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
318 */
319 ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
320 {
321 try
322 {
323 boost::this_thread::disable_interruption no_interrupts;
324 stop();
325 }
326 catch (...)
327 {
328 std::terminate();
329 }
330 }
331
332 /*!
333 * Locking accessor to the attached backend
334 */
335 locked_backend_ptr locked_backend()
336 {
337 return locked_backend_ptr(m_pBackend, m_BackendMutex);
338 }
339
340 /*!
341 * Enqueues the log record to the backend
342 */
343 void consume(record_view const& rec) BOOST_OVERRIDE
344 {
345 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
346 {
347 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
348 // Wait until flush is done
349 while (m_FlushRequested.load(order: boost::memory_order_acquire))
350 m_BlockCond.wait(lock);
351 }
352 queue_base_type::enqueue(rec);
353 }
354
355 /*!
356 * The method attempts to pass logging record to the backend
357 */
358 bool try_consume(record_view const& rec) BOOST_OVERRIDE
359 {
360 if (!m_FlushRequested.load(order: boost::memory_order_acquire))
361 {
362 return queue_base_type::try_enqueue(rec);
363 }
364 else
365 return false;
366 }
367
368 /*!
369 * The method starts record feeding loop and effectively blocks until either of this happens:
370 *
371 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
372 * \li an exception is thrown while processing a log record in the backend, and the exception is
373 * not terminated by the exception handler, if one is installed
374 *
375 * \pre The sink frontend must be constructed without spawning a dedicated thread
376 */
377 void run()
378 {
379 // First check that no other thread is running
380 {
381 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
382 if (start_feeding_operation(lock, op: feeding_records))
383 return;
384 }
385
386 scoped_feeding_operation guard(*this);
387
388 // Now start the feeding loop
389 while (true)
390 {
391 do_feed_records();
392 if (!m_StopRequested.load(order: boost::memory_order_acquire))
393 {
394 // Block until new record is available
395 record_view rec;
396 if (queue_base_type::dequeue_ready(rec))
397 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
398 }
399 else
400 break;
401 }
402 }
403
404 /*!
405 * The method softly interrupts record feeding loop. This method must be called when \c run,
406 * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
407 * interruption, calling \c stop will not interrupt the record processing in the middle.
408 * Instead, the sink frontend will attempt to finish its business with the record in progress
409 * and return afterwards. This method can be called either if the sink was created with
410 * an internal dedicated thread, or if the feeding loop was initiated by user.
411 *
412 * If no record feeding operation is in progress, calling \c stop marks the sink frontend
413 * so that the next feeding operation stops immediately.
414 *
415 * \note Returning from this method does not guarantee that there are no records left buffered
416 * in the sink frontend. It is possible that log records keep coming during and after this
417 * method is called. At some point of execution of this method log records stop being processed,
418 * and all records that come after this point are put into the queue. These records will be
419 * processed upon further calls to \c run or \c feed_records.
420 *
421 * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
422 * as \c false on frontend construction), this method does not guarantee that upon return the thread
423 * has returned from the record feeding loop or that it won't enter it in the future. The method
424 * only ensures that the record feeding thread will eventually return from the feeding loop. It is
425 * user's responsibility to synchronize with the user's record feeding thread.
426 */
427 void stop()
428 {
429 boost::thread feeding_thread;
430 {
431 lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
432
433 m_StopRequested.store(v: true, order: boost::memory_order_release);
434 queue_base_type::interrupt_dequeue();
435
436 m_DedicatedFeedingThread.swap(x&: feeding_thread);
437 }
438
439 if (feeding_thread.joinable())
440 feeding_thread.join();
441 }
442
443 /*!
444 * The method feeds log records that may have been buffered to the backend and returns
445 *
446 * \pre The sink frontend must be constructed without spawning a dedicated thread
447 */
448 void feed_records()
449 {
450 // First check that no other thread is running
451 {
452 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
453 if (start_feeding_operation(lock, op: feeding_records))
454 return;
455 }
456
457 scoped_feeding_operation guard(*this);
458
459 // Now start the feeding loop
460 do_feed_records();
461 }
462
463 /*!
464 * The method feeds all log records that may have been buffered to the backend and returns.
465 * Unlike \c feed_records, in case of ordering queueing the method also feeds records
466 * that were enqueued during the ordering window, attempting to drain the queue completely.
467 */
468 void flush() BOOST_OVERRIDE
469 {
470 {
471 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
472 if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
473 {
474 // There is already a thread feeding records, let it do the job
475 m_FlushRequested.store(v: true, order: boost::memory_order_release);
476 queue_base_type::interrupt_dequeue();
477 while (!m_StopRequested.load(order: boost::memory_order_acquire) && m_FlushRequested.load(order: boost::memory_order_acquire))
478 m_BlockCond.wait(lock);
479
480 // The condition may have been signalled when the feeding operation was finishing.
481 // In that case records may not have been flushed, and we do the flush ourselves.
482 if (m_ActiveOperation != idle)
483 return;
484 }
485
486 m_ActiveOperation = flushing;
487 m_FlushRequested.store(v: true, order: boost::memory_order_relaxed);
488 }
489
490 scoped_feeding_operation guard(*this);
491
492 do_feed_records();
493 }
494
495private:
496#ifndef BOOST_LOG_DOXYGEN_PASS
497 //! The method spawns record feeding thread
498 void start_feeding_thread()
499 {
500 boost::thread(run_func(this)).swap(x&: m_DedicatedFeedingThread);
501 }
502
503 //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
504 bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
505 {
506 while (m_ActiveOperation != idle)
507 {
508 if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
509 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
510
511 if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
512 {
513 m_StopRequested.store(v: false, order: boost::memory_order_relaxed);
514 return true;
515 }
516
517 m_BlockCond.wait(lock);
518 }
519
520 m_ActiveOperation = op;
521
522 return false;
523 }
524
525 //! Completes record feeding operation
526 void complete_feeding_operation() BOOST_NOEXCEPT
527 {
528 try
529 {
530 lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
531 m_ActiveOperation = idle;
532 m_StopRequested.store(v: false, order: boost::memory_order_relaxed);
533 m_BlockCond.notify_all();
534 }
535 catch (...)
536 {
537 }
538 }
539
540 //! The record feeding loop
541 void do_feed_records()
542 {
543 while (!m_StopRequested.load(order: boost::memory_order_acquire))
544 {
545 record_view rec;
546 bool dequeued = false;
547 if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
548 dequeued = queue_base_type::try_dequeue_ready(rec);
549 else
550 dequeued = queue_base_type::try_dequeue(rec);
551
552 if (dequeued)
553 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
554 else
555 break;
556 }
557
558 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
559 {
560 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
561 base_type::flush_backend(m_BackendMutex, *m_pBackend);
562 }
563 }
564#endif // BOOST_LOG_DOXYGEN_PASS
565};
566
567#undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
568#undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
569#undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
570
571} // namespace sinks
572
573BOOST_LOG_CLOSE_NAMESPACE // namespace log
574
575} // namespace boost
576
577#include <boost/log/detail/footer.hpp>
578
579#endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
580

source code of boost/libs/log/include/boost/log/sinks/async_frontend.hpp