1/*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8/*!
9 * \file posix/ipc_reliable_message_queue.cpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 17.11.2015
13 *
14 * \brief This header is the Boost.Log library implementation, see the library documentation
15 * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16 *
17 * This file provides an interprocess message queue implementation on POSIX platforms.
18 */
19
20#include <boost/log/detail/config.hpp>
21#include <cstddef>
22#include <cerrno>
23#include <cstring>
24#include <ctime>
25#include <new>
26#include <string>
27#include <stdexcept>
28#include <algorithm>
29#include <unistd.h>
30#if defined(BOOST_HAS_SCHED_YIELD)
31#include <sched.h>
32#elif defined(BOOST_HAS_PTHREAD_YIELD)
33#include <pthread.h>
34#elif defined(BOOST_HAS_NANOSLEEP)
35#include <time.h>
36#endif
37#include <boost/assert.hpp>
38#include <boost/cstdint.hpp>
39#include <boost/memory_order.hpp>
40#include <boost/atomic/atomic.hpp>
41#include <boost/atomic/ipc_atomic.hpp>
42#include <boost/atomic/capabilities.hpp>
43#include <boost/throw_exception.hpp>
44#include <boost/log/exceptions.hpp>
45#include <boost/log/utility/ipc/reliable_message_queue.hpp>
46#include <boost/log/support/exception.hpp>
47#include <boost/log/detail/pause.hpp>
48#include <boost/exception/info.hpp>
49#include <boost/exception/enable_error_info.hpp>
50#include <boost/interprocess/creation_tags.hpp>
51#include <boost/interprocess/exceptions.hpp>
52#include <boost/interprocess/permissions.hpp>
53#include <boost/interprocess/mapped_region.hpp>
54#include <boost/interprocess/shared_memory_object.hpp>
55#include <boost/align/align_up.hpp>
56#include "ipc_sync_wrappers.hpp"
57#include "murmur3.hpp"
58#include "bit_tools.hpp"
59#include <boost/log/detail/header.hpp>
60
61namespace boost {
62
63BOOST_LOG_OPEN_NAMESPACE
64
65namespace ipc {
66
67//! Message queue implementation data
68struct reliable_message_queue::implementation
69{
70private:
71 //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
72 struct block_header
73 {
74 // Element data alignment, in bytes
75 enum { data_alignment = 32u };
76
77 //! Size of the element data, in bytes
78 size_type m_size;
79
80 //! Returns the block header overhead, in bytes
81 static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
82 {
83 return static_cast< size_type >(boost::alignment::align_up(value: sizeof(block_header), alignment: data_alignment));
84 }
85
86 //! Returns a pointer to the element data
87 void* get_data() const BOOST_NOEXCEPT
88 {
89 return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
90 }
91 };
92
93 //! Header of the message queue. Placed at the beginning of the shared memory segment.
94 struct header
95 {
96 // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
97 enum { abi_version = 0 };
98
99 // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
100
101 //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
102 uint32_t m_abi_tag;
103 //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
104 unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
105 //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
106 boost::ipc_atomic< uint32_t > m_ref_count;
107 //! Number of allocation blocks in the queue.
108 const uint32_t m_capacity;
109 //! Size of an allocation block, in bytes.
110 const size_type m_block_size;
111 //! Mutex for protecting queue data structures.
112 boost::log::ipc::aux::interprocess_mutex m_mutex;
113 //! Condition variable used to block readers when the queue is empty.
114 boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue;
115 //! Condition variable used to block writers when the queue is full.
116 boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
117 //! The current number of allocated blocks in the queue.
118 uint32_t m_size;
119 //! The current writing position (allocation block index).
120 uint32_t m_put_pos;
121 //! The current reading position (allocation block index).
122 uint32_t m_get_pos;
123
124 header(uint32_t capacity, size_type block_size) :
125 m_abi_tag(get_abi_tag()),
126 m_capacity(capacity),
127 m_block_size(block_size),
128 m_size(0u),
129 m_put_pos(0u),
130 m_get_pos(0u)
131 {
132 // Must be initialized last. m_ref_count is zero-initialized initially.
133 m_ref_count.opaque_add(v: 1u, order: boost::memory_order_release);
134 }
135
136 //! Returns the header structure ABI tag
137 static uint32_t get_abi_tag() BOOST_NOEXCEPT
138 {
139 // This FOURCC identifies the queue type
140 boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc(c1: 'r', c2: 'e', c3: 'l', c4: 'q'));
141
142 // This FOURCC identifies the queue implementation
143 hash.mix(value: boost::log::aux::make_fourcc(c1: 'p', c2: 't', c3: 'h', c4: 'r'));
144 hash.mix(value: abi_version);
145
146 // We will use these constants to align pointers
147 hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
148 hash.mix(value: block_header::data_alignment);
149
150 // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
151 // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
152
153#define BOOST_LOG_MIX_HEADER_MEMBER(name)\
154 hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
155 hash.mix(static_cast< uint32_t >(offsetof(header, name)))
156
157 BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
158 BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
159 BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count);
160 BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
161 BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
162 BOOST_LOG_MIX_HEADER_MEMBER(m_mutex);
163 BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue);
164 BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue);
165 BOOST_LOG_MIX_HEADER_MEMBER(m_size);
166 BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
167 BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
168
169#undef BOOST_LOG_MIX_HEADER_MEMBER
170
171 return hash.finalize();
172 }
173
174 //! Returns an element header at the specified index
175 block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
176 {
177 BOOST_ASSERT(index < m_capacity);
178 unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(value: sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
179 p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
180 return reinterpret_cast< block_header* >(p);
181 }
182
183 BOOST_DELETED_FUNCTION(header(header const&))
184 BOOST_DELETED_FUNCTION(header& operator=(header const&))
185 };
186
187private:
188 //! Shared memory object
189 boost::interprocess::shared_memory_object m_shared_memory;
190 //! Shared memory mapping into the process address space
191 boost::interprocess::mapped_region m_region;
192 //! Queue overflow handling policy
193 const overflow_policy m_overflow_policy;
194 //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
195 size_type m_block_size_mask;
196 //! The number of the bit set in block_size (i.e. log base 2 of block_size)
197 uint32_t m_block_size_log2;
198 //! The flag indicates that stop has been requested
199 boost::atomic< bool > m_stop;
200
201 //! Queue shared memory object name
202 const object_name m_name;
203
204 //! The total number of loop iterations in \c adopt_region for waiting for the region initialization to complete
205 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops = 200u;
206 //! Threshold of the number of loop iterations in \c adopt_region for using pause instructions for yielding
207 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_pause = 16u;
208 //! Threshold of the number of loop iterations in \c adopt_region for using \c short_yield for yielding
209 static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_short_yield = 64u;
210 //! Timeout, in seconds, for performing shared memory creation/opening loop
211 static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_timeout = 60u;
212 //! The number of short yields to perform during the shared memory creation/opening loop
213 static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_short_yield_loops = 64u;
214
215public:
216 //! The constructor creates a new shared memory segment
217 implementation
218 (
219 open_mode::create_only_tag,
220 object_name const& name,
221 uint32_t capacity,
222 size_type block_size,
223 overflow_policy oflow_policy,
224 permissions const& perms
225 ) :
226 m_shared_memory(),
227 m_region(),
228 m_overflow_policy(oflow_policy),
229 m_block_size_mask(0u),
230 m_block_size_log2(0u),
231 m_stop(false),
232 m_name(name)
233 {
234 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
235
236 boost::interprocess::permissions ipc_perms(perms.get_native());
237 while (true)
238 {
239 try
240 {
241 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
242 m_shared_memory.swap(other&: shared_memory);
243 break;
244 }
245 catch (boost::interprocess::interprocess_exception& e)
246 {
247 // shared_memory_object does not handle EINTR returned from shm_open internally.
248 // https://github.com/boostorg/interprocess/issues/152
249 if (e.get_native_error() != EINTR)
250 throw;
251 }
252 }
253
254 create_region(capacity, block_size);
255 }
256
257 //! The constructor creates a new shared memory segment or opens the existing one
258 implementation
259 (
260 open_mode::open_or_create_tag,
261 object_name const& name,
262 uint32_t capacity,
263 size_type block_size,
264 overflow_policy oflow_policy,
265 permissions const& perms
266 ) :
267 m_shared_memory(),
268 m_region(),
269 m_overflow_policy(oflow_policy),
270 m_block_size_mask(0u),
271 m_block_size_log2(0u),
272 m_stop(false),
273 m_name(name)
274 {
275 BOOST_ASSERT(block_size >= block_header::get_header_overhead());
276
277 // We need to know for certain whether we create the shared memory segment or open an existing one.
278 // This is to ensure that only one thread initializes the segment and all other threads wait until completion.
279 // Since shared_memory_object(open_or_create) constructor does not report whether the segment was actually created,
280 // we have to loop trying to create or open the segment. https://github.com/boostorg/interprocess/issues/151
281 boost::interprocess::permissions ipc_perms(perms.get_native());
282 bool created = false;
283 unsigned int i = 0u;
284 std::time_t start_time = std::time(NULL);
285 while (true)
286 {
287 while (true)
288 {
289 try
290 {
291 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms);
292 m_shared_memory.swap(other&: shared_memory);
293 created = true;
294 goto done;
295 }
296 catch (boost::interprocess::interprocess_exception& e)
297 {
298 if (e.get_error_code() == boost::interprocess::already_exists_error)
299 break;
300
301 // shared_memory_object does not handle EINTR returned from shm_open internally.
302 // https://github.com/boostorg/interprocess/issues/152
303 if (e.get_native_error() != EINTR)
304 throw;
305 }
306 }
307
308 while (true)
309 {
310 try
311 {
312 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
313 m_shared_memory.swap(other&: shared_memory);
314 created = false;
315 goto done;
316 }
317 catch (boost::interprocess::interprocess_exception& e)
318 {
319 if (e.get_error_code() == boost::interprocess::not_found_error)
320 break;
321
322 if (e.get_native_error() != EINTR)
323 throw;
324 }
325 }
326
327 std::time_t now = std::time(NULL);
328 if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
329 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be created or opened: shared memory segment failed to be created or opened until timeout (possible livelock)");
330
331 if (i < region_open_or_create_short_yield_loops)
332 short_yield();
333 else
334 long_yield();
335
336 ++i;
337 }
338
339 done:
340 if (created)
341 create_region(capacity, block_size);
342 else
343 adopt_region();
344 }
345
346 //! The constructor opens the existing shared memory segment
347 implementation
348 (
349 open_mode::open_only_tag,
350 object_name const& name,
351 overflow_policy oflow_policy
352 ) :
353 m_shared_memory(),
354 m_region(),
355 m_overflow_policy(oflow_policy),
356 m_block_size_mask(0u),
357 m_block_size_log2(0u),
358 m_stop(false),
359 m_name(name)
360 {
361 while (true)
362 {
363 try
364 {
365 boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write);
366 m_shared_memory.swap(other&: shared_memory);
367 break;
368 }
369 catch (boost::interprocess::interprocess_exception& e)
370 {
371 // shared_memory_object does not handle EINTR returned from shm_open internally.
372 // https://github.com/boostorg/interprocess/issues/152
373 if (e.get_native_error() != EINTR)
374 throw;
375 }
376 }
377
378 adopt_region();
379 }
380
381 ~implementation()
382 {
383 close_region();
384 }
385
386 object_name const& name() const BOOST_NOEXCEPT
387 {
388 return m_name;
389 }
390
391 uint32_t capacity() const BOOST_NOEXCEPT
392 {
393 return get_header()->m_capacity;
394 }
395
396 size_type block_size() const BOOST_NOEXCEPT
397 {
398 return get_header()->m_block_size;
399 }
400
401 operation_result send(void const* message_data, size_type message_size)
402 {
403 const uint32_t block_count = estimate_block_count(size: message_size);
404
405 header* const hdr = get_header();
406
407 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
408 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
409
410 if (m_stop.load(order: boost::memory_order_relaxed))
411 return aborted;
412
413 lock_queue();
414 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
415
416 while (true)
417 {
418 if (m_stop.load(order: boost::memory_order_relaxed))
419 return aborted;
420
421 if ((hdr->m_capacity - hdr->m_size) >= block_count)
422 break;
423
424 const overflow_policy oflow_policy = m_overflow_policy;
425 if (oflow_policy == fail_on_overflow)
426 return no_space;
427 else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
428 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
429
430 hdr->m_nonfull_queue.wait(mutex&: hdr->m_mutex);
431 }
432
433 enqueue_message(message_data, message_size, block_count);
434
435 return succeeded;
436 }
437
438 bool try_send(void const* message_data, size_type message_size)
439 {
440 const uint32_t block_count = estimate_block_count(size: message_size);
441
442 header* const hdr = get_header();
443
444 if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
445 BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
446
447 if (m_stop.load(order: boost::memory_order_relaxed))
448 return false;
449
450 lock_queue();
451 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
452
453 if (m_stop.load(order: boost::memory_order_relaxed))
454 return false;
455
456 if ((hdr->m_capacity - hdr->m_size) < block_count)
457 return false;
458
459 enqueue_message(message_data, message_size, block_count);
460
461 return true;
462 }
463
464 operation_result receive(receive_handler handler, void* state)
465 {
466 if (m_stop.load(order: boost::memory_order_relaxed))
467 return aborted;
468
469 lock_queue();
470 header* const hdr = get_header();
471 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
472
473 while (true)
474 {
475 if (m_stop.load(order: boost::memory_order_relaxed))
476 return aborted;
477
478 if (hdr->m_size > 0u)
479 break;
480
481 hdr->m_nonempty_queue.wait(mutex&: hdr->m_mutex);
482 }
483
484 dequeue_message(handler, state);
485
486 return succeeded;
487 }
488
489 bool try_receive(receive_handler handler, void* state)
490 {
491 if (m_stop.load(order: boost::memory_order_relaxed))
492 return false;
493
494 lock_queue();
495 header* const hdr = get_header();
496 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
497
498 if (hdr->m_size == 0u)
499 return false;
500
501 dequeue_message(handler, state);
502
503 return true;
504 }
505
506 void stop_local()
507 {
508 if (m_stop.load(order: boost::memory_order_relaxed))
509 return;
510
511 lock_queue();
512 header* const hdr = get_header();
513 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
514
515 m_stop.store(v: true, order: boost::memory_order_relaxed);
516
517 hdr->m_nonempty_queue.notify_all();
518 hdr->m_nonfull_queue.notify_all();
519 }
520
521 void reset_local()
522 {
523 m_stop.store(v: false, order: boost::memory_order_relaxed);
524 }
525
526 void clear()
527 {
528 lock_queue();
529 header* const hdr = get_header();
530 boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
531 clear_queue();
532 }
533
534private:
535 header* get_header() const BOOST_NOEXCEPT
536 {
537 return static_cast< header* >(m_region.get_address());
538 }
539
540 static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
541 {
542 return boost::alignment::align_up(value: sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
543 }
544
545 void create_region(uint32_t capacity, size_type block_size)
546 {
547 const std::size_t shmem_size = estimate_region_size(capacity, block_size);
548 m_shared_memory.truncate(length: shmem_size);
549 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(other&: m_region);
550
551 new (m_region.get_address()) header(capacity, block_size);
552
553 init_block_size(block_size);
554 }
555
556 void adopt_region()
557 {
558 std::size_t shmem_size = 0u;
559 unsigned int i = 0u;
560 std::time_t start_time = std::time(NULL);
561 while (true)
562 {
563 boost::interprocess::offset_t shm_size = 0;
564 const bool get_size_result = m_shared_memory.get_size(size&: shm_size);
565 if (BOOST_LIKELY(get_size_result && shm_size > 0))
566 {
567 shmem_size = static_cast< std::size_t >(shm_size);
568 break;
569 }
570
571 std::time_t now = std::time(NULL);
572 if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout))
573 {
574 if (get_size_result)
575 goto shmem_size_too_small;
576 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size could not be determined until timeout");
577 }
578
579 if (i < region_open_or_create_short_yield_loops)
580 short_yield();
581 else
582 long_yield();
583
584 ++i;
585 }
586
587 if (BOOST_UNLIKELY(shmem_size < sizeof(header)))
588 {
589 shmem_size_too_small:
590 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
591 }
592
593 boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0, shmem_size).swap(other&: m_region);
594
595 // Wait until the mapped region becomes initialized
596 header* const hdr = get_header();
597 for (i = 0u; i < region_init_wait_loops; ++i)
598 {
599 uint32_t ref_count = hdr->m_ref_count.load(order: boost::memory_order_acquire);
600 while (ref_count > 0u)
601 {
602 if (hdr->m_ref_count.compare_exchange_weak(expected&: ref_count, desired: ref_count + 1u, success_order: boost::memory_order_acq_rel, failure_order: boost::memory_order_acquire))
603 goto done;
604 }
605
606 if (i < region_init_wait_loops_pause)
607 boost::log::aux::pause();
608 else if (i < region_init_wait_loops_short_yield)
609 short_yield();
610 else
611 long_yield();
612 }
613
614 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
615
616 done:
617 try
618 {
619 // Check that the queue layout matches the current process ABI
620 if (hdr->m_abi_tag != header::get_abi_tag())
621 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
622
623 if (!boost::log::aux::is_power_of_2(n: hdr->m_block_size))
624 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
625
626 init_block_size(block_size: hdr->m_block_size);
627 }
628 catch (...)
629 {
630 close_region();
631 throw;
632 }
633 }
634
635 void close_region() BOOST_NOEXCEPT
636 {
637 header* const hdr = get_header();
638
639 if (hdr->m_ref_count.fetch_sub(v: 1u, order: boost::memory_order_acq_rel) == 1u)
640 {
641 boost::interprocess::shared_memory_object::remove(filename: m_shared_memory.get_name());
642
643 hdr->~header();
644
645 boost::interprocess::mapped_region().swap(other&: m_region);
646 boost::interprocess::shared_memory_object().swap(other&: m_shared_memory);
647
648 m_block_size_mask = 0u;
649 m_block_size_log2 = 0u;
650 }
651 }
652
653 void init_block_size(size_type block_size)
654 {
655 m_block_size_mask = block_size - 1u;
656
657 uint32_t block_size_log2 = 0u;
658 if ((block_size & 0x0000ffff) == 0u)
659 {
660 block_size >>= 16u;
661 block_size_log2 += 16u;
662 }
663 if ((block_size & 0x000000ff) == 0u)
664 {
665 block_size >>= 8u;
666 block_size_log2 += 8u;
667 }
668 if ((block_size & 0x0000000f) == 0u)
669 {
670 block_size >>= 4u;
671 block_size_log2 += 4u;
672 }
673 if ((block_size & 0x00000003) == 0u)
674 {
675 block_size >>= 2u;
676 block_size_log2 += 2u;
677 }
678 if ((block_size & 0x00000001) == 0u)
679 {
680 ++block_size_log2;
681 }
682 m_block_size_log2 = block_size_log2;
683 }
684
685 void lock_queue()
686 {
687 header* const hdr = get_header();
688
689#if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
690 try
691 {
692#endif
693 hdr->m_mutex.lock();
694#if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
695 }
696 catch (boost::log::ipc::aux::lock_owner_dead&)
697 {
698 // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock
699 try
700 {
701 clear_queue();
702 hdr->m_mutex.recover();
703 }
704 catch (...)
705 {
706 hdr->m_mutex.unlock();
707 throw;
708 }
709 }
710#endif
711 }
712
713 void clear_queue()
714 {
715 header* const hdr = get_header();
716 hdr->m_size = 0u;
717 hdr->m_put_pos = 0u;
718 hdr->m_get_pos = 0u;
719 hdr->m_nonfull_queue.notify_all();
720 }
721
722 //! Returns the number of allocation blocks that are required to store user's payload of the specified size
723 uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
724 {
725 // ceil((size + get_header_overhead()) / block_size)
726 return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
727 }
728
729 //! Puts the message to the back of the queue
730 void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
731 {
732 header* const hdr = get_header();
733
734 const uint32_t capacity = hdr->m_capacity;
735 const size_type block_size = hdr->m_block_size;
736 uint32_t pos = hdr->m_put_pos;
737 BOOST_ASSERT(pos < capacity);
738
739 block_header* block = hdr->get_block(index: pos);
740 block->m_size = message_size;
741
742 size_type write_size = (std::min)(a: static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), b: message_size);
743 std::memcpy(dest: block->get_data(), src: message_data, n: write_size);
744
745 pos += block_count;
746 if (BOOST_UNLIKELY(pos >= capacity))
747 {
748 // Write the rest of the message at the beginning of the queue
749 pos -= capacity;
750 message_data = static_cast< const unsigned char* >(message_data) + write_size;
751 write_size = message_size - write_size;
752 if (write_size > 0u)
753 std::memcpy(dest: hdr->get_block(index: 0u), src: message_data, n: write_size);
754 }
755
756 hdr->m_put_pos = pos;
757
758 const uint32_t old_queue_size = hdr->m_size;
759 hdr->m_size = old_queue_size + block_count;
760 if (old_queue_size == 0u)
761 hdr->m_nonempty_queue.notify_one();
762 }
763
764 //! Retrieves the next message and invokes the handler to store the message contents
765 void dequeue_message(receive_handler handler, void* state)
766 {
767 header* const hdr = get_header();
768
769 const uint32_t capacity = hdr->m_capacity;
770 const size_type block_size = hdr->m_block_size;
771 uint32_t pos = hdr->m_get_pos;
772 BOOST_ASSERT(pos < capacity);
773
774 block_header* block = hdr->get_block(index: pos);
775 size_type message_size = block->m_size;
776 uint32_t block_count = estimate_block_count(size: message_size);
777
778 BOOST_ASSERT(block_count <= hdr->m_size);
779
780 size_type read_size = (std::min)(a: static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), b: message_size);
781 handler(state, block->get_data(), read_size);
782
783 pos += block_count;
784 if (BOOST_UNLIKELY(pos >= capacity))
785 {
786 // Read the tail of the message
787 pos -= capacity;
788 read_size = message_size - read_size;
789 if (read_size > 0u)
790 handler(state, hdr->get_block(index: 0u), read_size);
791 }
792
793 hdr->m_get_pos = pos;
794 hdr->m_size -= block_count;
795
796 hdr->m_nonfull_queue.notify_all();
797 }
798
799 static void short_yield() BOOST_NOEXCEPT
800 {
801#if defined(BOOST_HAS_SCHED_YIELD)
802 sched_yield();
803#elif defined(BOOST_HAS_PTHREAD_YIELD)
804 pthread_yield();
805#else
806 long_yield();
807#endif
808 }
809
810 static void long_yield() BOOST_NOEXCEPT
811 {
812#if defined(BOOST_HAS_NANOSLEEP)
813 timespec ts = {};
814 ts.tv_sec = 0;
815 ts.tv_nsec = 1000;
816 nanosleep(requested_time: &ts, NULL);
817#else
818 usleep(1);
819#endif
820 }
821};
822
823BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
824{
825 BOOST_ASSERT(m_impl == NULL);
826 if (!boost::log::aux::is_power_of_2(n: block_size))
827 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
828 try
829 {
830 m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(value: block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
831 }
832 catch (boost::exception& e)
833 {
834 e << boost::log::ipc::object_name_info(name);
835 throw;
836 }
837 catch (boost::interprocess::interprocess_exception& e)
838 {
839 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
840 }
841}
842
843BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
844{
845 BOOST_ASSERT(m_impl == NULL);
846 if (!boost::log::aux::is_power_of_2(n: block_size))
847 BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
848 try
849 {
850 m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(value: block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
851 }
852 catch (boost::exception& e)
853 {
854 e << boost::log::ipc::object_name_info(name);
855 throw;
856 }
857 catch (boost::interprocess::interprocess_exception& e)
858 {
859 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
860 }
861}
862
863BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const&)
864{
865 BOOST_ASSERT(m_impl == NULL);
866 try
867 {
868 m_impl = new implementation(open_mode::open_only, name, oflow_policy);
869 }
870 catch (boost::exception& e)
871 {
872 e << boost::log::ipc::object_name_info(name);
873 throw;
874 }
875 catch (boost::interprocess::interprocess_exception& e)
876 {
877 BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
878 }
879}
880
881BOOST_LOG_API void reliable_message_queue::clear()
882{
883 BOOST_ASSERT(m_impl != NULL);
884 try
885 {
886 m_impl->clear();
887 }
888 catch (boost::exception& e)
889 {
890 e << boost::log::ipc::object_name_info(m_impl->name());
891 throw;
892 }
893}
894
895BOOST_LOG_API object_name const& reliable_message_queue::name() const
896{
897 BOOST_ASSERT(m_impl != NULL);
898 return m_impl->name();
899}
900
901BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
902{
903 BOOST_ASSERT(m_impl != NULL);
904 return m_impl->capacity();
905}
906
907BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
908{
909 BOOST_ASSERT(m_impl != NULL);
910 return m_impl->block_size();
911}
912
913BOOST_LOG_API void reliable_message_queue::stop_local()
914{
915 BOOST_ASSERT(m_impl != NULL);
916 try
917 {
918 m_impl->stop_local();
919 }
920 catch (boost::exception& e)
921 {
922 e << boost::log::ipc::object_name_info(m_impl->name());
923 throw;
924 }
925}
926
927BOOST_LOG_API void reliable_message_queue::reset_local()
928{
929 BOOST_ASSERT(m_impl != NULL);
930 try
931 {
932 m_impl->reset_local();
933 }
934 catch (boost::exception& e)
935 {
936 e << boost::log::ipc::object_name_info(m_impl->name());
937 throw;
938 }
939}
940
941BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
942{
943 delete m_impl;
944 m_impl = NULL;
945}
946
947BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
948{
949 BOOST_ASSERT(m_impl != NULL);
950 try
951 {
952 return m_impl->send(message_data, message_size);
953 }
954 catch (boost::exception& e)
955 {
956 e << boost::log::ipc::object_name_info(m_impl->name());
957 throw;
958 }
959}
960
961BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
962{
963 BOOST_ASSERT(m_impl != NULL);
964 try
965 {
966 return m_impl->try_send(message_data, message_size);
967 }
968 catch (boost::exception& e)
969 {
970 e << boost::log::ipc::object_name_info(m_impl->name());
971 throw;
972 }
973}
974
975BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
976{
977 BOOST_ASSERT(m_impl != NULL);
978 try
979 {
980 return m_impl->receive(handler, state);
981 }
982 catch (boost::exception& e)
983 {
984 e << boost::log::ipc::object_name_info(m_impl->name());
985 throw;
986 }
987}
988
989BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
990{
991 BOOST_ASSERT(m_impl != NULL);
992 try
993 {
994 return m_impl->try_receive(handler, state);
995 }
996 catch (boost::exception& e)
997 {
998 e << boost::log::ipc::object_name_info(m_impl->name());
999 throw;
1000 }
1001}
1002
1003//! Fixed buffer receive handler
1004BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
1005{
1006 fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
1007 if (BOOST_UNLIKELY(size > p->size))
1008 BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
1009
1010 std::memcpy(dest: p->data, src: data, n: size);
1011 p->data += size;
1012 p->size -= size;
1013}
1014
1015BOOST_LOG_API void reliable_message_queue::remove(object_name const& name)
1016{
1017 boost::interprocess::shared_memory_object::remove(filename: name.c_str());
1018}
1019
1020} // namespace ipc
1021
1022BOOST_LOG_CLOSE_NAMESPACE // namespace log
1023
1024} // namespace boost
1025
1026#include <boost/log/detail/footer.hpp>
1027

source code of boost/libs/log/src/posix/ipc_reliable_message_queue.cpp