• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *                 Copyright Lingxi Li 2015.
3  *              Copyright Andrey Semashev 2016.
4  * Distributed under the Boost Software License, Version 1.0.
5  *    (See accompanying file LICENSE_1_0.txt or copy at
6  *          http://www.boost.org/LICENSE_1_0.txt)
7  */
8 /*!
9  * \file   posix/ipc_reliable_message_queue.cpp
10  * \author Lingxi Li
11  * \author Andrey Semashev
12  * \date   17.11.2015
13  *
14  * \brief  This header is the Boost.Log library implementation, see the library documentation
15  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16  *
17  * This file provides an interprocess message queue implementation on POSIX platforms.
18  */
19 
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <cerrno>
23 #include <cstring>
24 #include <new>
25 #include <string>
26 #include <stdexcept>
27 #include <algorithm>
28 #include <unistd.h>
29 #if defined(BOOST_HAS_SCHED_YIELD)
30 #include <sched.h>
31 #elif defined(BOOST_HAS_PTHREAD_YIELD)
32 #include <pthread.h>
33 #elif defined(BOOST_HAS_NANOSLEEP)
34 #include <time.h>
35 #endif
36 #include <boost/assert.hpp>
37 #include <boost/static_assert.hpp>
38 #include <boost/cstdint.hpp>
39 #include <boost/atomic/atomic.hpp>
40 #include <boost/atomic/capabilities.hpp>
41 #include <boost/throw_exception.hpp>
42 #include <boost/log/exceptions.hpp>
43 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
44 #include <boost/log/support/exception.hpp>
45 #include <boost/log/detail/pause.hpp>
46 #include <boost/exception/info.hpp>
47 #include <boost/exception/enable_error_info.hpp>
48 #include <boost/interprocess/creation_tags.hpp>
49 #include <boost/interprocess/exceptions.hpp>
50 #include <boost/interprocess/permissions.hpp>
51 #include <boost/interprocess/mapped_region.hpp>
52 #include <boost/interprocess/shared_memory_object.hpp>
53 #include <boost/align/align_up.hpp>
54 #include "ipc_sync_wrappers.hpp"
55 #include "murmur3.hpp"
56 #include "bit_tools.hpp"
57 #include <boost/log/detail/header.hpp>
58 
59 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
60 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
61 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
62 #endif
63 
64 namespace boost {
65 
66 BOOST_LOG_OPEN_NAMESPACE
67 
68 namespace ipc {
69 
70 //! Message queue implementation data
71 struct reliable_message_queue::implementation
72 {
73 private:
74     //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
75     struct block_header
76     {
77         // Element data alignment, in bytes
78         enum { data_alignment = 32u };
79 
80         //! Size of the element data, in bytes
81         size_type m_size;
82 
83         //! Returns the block header overhead, in bytes
get_header_overheadboost::ipc::reliable_message_queue::implementation::block_header84         static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
85         {
86             return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
87         }
88 
89         //! Returns a pointer to the element data
get_databoost::ipc::reliable_message_queue::implementation::block_header90         void* get_data() const BOOST_NOEXCEPT
91         {
92             return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
93         }
94     };
95 
96     //! Header of the message queue. Placed at the beginning of the shared memory segment.
97     struct header
98     {
99         // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
100         enum { abi_version = 0 };
101 
102         // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
103 
104         //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
105         uint32_t m_abi_tag;
106         //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
107         unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
108         //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0).
109         boost::atomic< uint32_t > m_ref_count;
110         //! Number of allocation blocks in the queue.
111         const uint32_t m_capacity;
112         //! Size of an allocation block, in bytes.
113         const size_type m_block_size;
114         //! Mutex for protecting queue data structures.
115         boost::log::ipc::aux::interprocess_mutex m_mutex;
116         //! Condition variable used to block readers when the queue is empty.
117         boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue;
118         //! Condition variable used to block writers when the queue is full.
119         boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
120         //! The current number of allocated blocks in the queue.
121         uint32_t m_size;
122         //! The current writing position (allocation block index).
123         uint32_t m_put_pos;
124         //! The current reading position (allocation block index).
125         uint32_t m_get_pos;
126 
headerboost::ipc::reliable_message_queue::implementation::header127         header(uint32_t capacity, size_type block_size) :
128             m_abi_tag(get_abi_tag()),
129             m_capacity(capacity),
130             m_block_size(block_size),
131             m_size(0u),
132             m_put_pos(0u),
133             m_get_pos(0u)
134         {
135             // Must be initialized last. m_ref_count is zero-initialized initially.
136             m_ref_count.fetch_add(1u, boost::memory_order_release);
137         }
138 
139         //! Returns the header structure ABI tag
get_abi_tagboost::ipc::reliable_message_queue::implementation::header140         static uint32_t get_abi_tag() BOOST_NOEXCEPT
141         {
142             // This FOURCC identifies the queue type
143             boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
144 
145             // This FOURCC identifies the queue implementation
146             hash.mix(boost::log::aux::make_fourcc('p', 't', 'h', 'r'));
147             hash.mix(abi_version);
148 
149             // We will use these constants to align pointers
150             hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
151             hash.mix(block_header::data_alignment);
152 
153             // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
154             // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
155 
156 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
157             hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
158             hash.mix(static_cast< uint32_t >(offsetof(header, name)))
159 
160             BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
161             BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
162             BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count);
163             BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
164             BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
165             BOOST_LOG_MIX_HEADER_MEMBER(m_mutex);
166             BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue);
167             BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue);
168             BOOST_LOG_MIX_HEADER_MEMBER(m_size);
169             BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
170             BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
171 
172 #undef BOOST_LOG_MIX_HEADER_MEMBER
173 
174             return hash.finalize();
175         }
176 
177         //! Returns an element header at the specified index
get_blockboost::ipc::reliable_message_queue::implementation::header178         block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
179         {
180             BOOST_ASSERT(index < m_capacity);
181             unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
182             p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
183             return reinterpret_cast< block_header* >(p);
184         }
185 
186         BOOST_DELETED_FUNCTION(header(header const&))
187         BOOST_DELETED_FUNCTION(header& operator=(header const&))
188     };
189 
190 private:
191     //! Shared memory object
192     boost::interprocess::shared_memory_object m_shared_memory;
193     //! Shared memory mapping into the process address space
194     boost::interprocess::mapped_region m_region;
195     //! Queue overflow handling policy
196     const overflow_policy m_overflow_policy;
197     //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
198     size_type m_block_size_mask;
199     //! The number of the bit set in block_size (i.e. log base 2 of block_size)
200     uint32_t m_block_size_log2;
201     //! The flag indicates that stop has been requested
202     bool m_stop;
203 
204     //! Queue shared memory object name
205     const object_name m_name;
206 
207 public:
208     //! The constructor creates a new shared memory segment
implementationboost::ipc::reliable_message_queue::implementation209     implementation
210     (
211         open_mode::create_only_tag,
212         object_name const& name,
213         uint32_t capacity,
214         size_type block_size,
215         overflow_policy oflow_policy,
216         permissions const& perms
217     ) :
218         m_shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
219         m_region(),
220         m_overflow_policy(oflow_policy),
221         m_block_size_mask(0u),
222         m_block_size_log2(0u),
223         m_stop(false),
224         m_name(name)
225     {
226         create_region(capacity, block_size);
227     }
228 
229     //! The constructor creates a new shared memory segment or opens the existing one
implementationboost::ipc::reliable_message_queue::implementation230     implementation
231     (
232         open_mode::open_or_create_tag,
233         object_name const& name,
234         uint32_t capacity,
235         size_type block_size,
236         overflow_policy oflow_policy,
237         permissions const& perms
238     ) :
239         m_shared_memory(boost::interprocess::open_or_create, name.c_str(), boost::interprocess::read_write, boost::interprocess::permissions(perms.get_native())),
240         m_region(),
241         m_overflow_policy(oflow_policy),
242         m_block_size_mask(0u),
243         m_block_size_log2(0u),
244         m_stop(false),
245         m_name(name)
246     {
247         boost::interprocess::offset_t shmem_size = 0;
248         if (!m_shared_memory.get_size(shmem_size) || shmem_size == 0)
249             create_region(capacity, block_size);
250         else
251             adopt_region(shmem_size);
252     }
253 
254     //! The constructor opens the existing shared memory segment
implementationboost::ipc::reliable_message_queue::implementation255     implementation
256     (
257         open_mode::open_only_tag,
258         object_name const& name,
259         overflow_policy oflow_policy
260     ) :
261         m_shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write),
262         m_region(),
263         m_overflow_policy(oflow_policy),
264         m_block_size_mask(0u),
265         m_block_size_log2(0u),
266         m_stop(false),
267         m_name(name)
268     {
269         boost::interprocess::offset_t shmem_size = 0;
270         if (!m_shared_memory.get_size(shmem_size))
271             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment not found");
272 
273         adopt_region(shmem_size);
274     }
275 
~implementationboost::ipc::reliable_message_queue::implementation276     ~implementation()
277     {
278         close_region();
279     }
280 
nameboost::ipc::reliable_message_queue::implementation281     object_name const& name() const BOOST_NOEXCEPT
282     {
283         return m_name;
284     }
285 
capacityboost::ipc::reliable_message_queue::implementation286     uint32_t capacity() const BOOST_NOEXCEPT
287     {
288         return get_header()->m_capacity;
289     }
290 
block_sizeboost::ipc::reliable_message_queue::implementation291     size_type block_size() const BOOST_NOEXCEPT
292     {
293         return get_header()->m_block_size;
294     }
295 
sendboost::ipc::reliable_message_queue::implementation296     operation_result send(void const* message_data, size_type message_size)
297     {
298         const uint32_t block_count = estimate_block_count(message_size);
299 
300         header* const hdr = get_header();
301 
302         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
303             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
304 
305         if (m_stop)
306             return aborted;
307 
308         lock_queue();
309         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
310 
311         while (true)
312         {
313             if (m_stop)
314                 return aborted;
315 
316             if ((hdr->m_capacity - hdr->m_size) >= block_count)
317                 break;
318 
319             const overflow_policy oflow_policy = m_overflow_policy;
320             if (oflow_policy == fail_on_overflow)
321                 return no_space;
322             else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
323                 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
324 
325             hdr->m_nonfull_queue.wait(hdr->m_mutex);
326         }
327 
328         enqueue_message(message_data, message_size, block_count);
329 
330         return succeeded;
331     }
332 
try_sendboost::ipc::reliable_message_queue::implementation333     bool try_send(void const* message_data, size_type message_size)
334     {
335         const uint32_t block_count = estimate_block_count(message_size);
336 
337         header* const hdr = get_header();
338 
339         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
340             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
341 
342         if (m_stop)
343             return false;
344 
345         lock_queue();
346         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
347 
348         if (m_stop)
349             return false;
350 
351         if ((hdr->m_capacity - hdr->m_size) < block_count)
352             return false;
353 
354         enqueue_message(message_data, message_size, block_count);
355 
356         return true;
357     }
358 
receiveboost::ipc::reliable_message_queue::implementation359     operation_result receive(receive_handler handler, void* state)
360     {
361         if (m_stop)
362             return aborted;
363 
364         lock_queue();
365         header* const hdr = get_header();
366         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
367 
368         while (true)
369         {
370             if (m_stop)
371                 return aborted;
372 
373             if (hdr->m_size > 0u)
374                 break;
375 
376             hdr->m_nonempty_queue.wait(hdr->m_mutex);
377         }
378 
379         dequeue_message(handler, state);
380 
381         return succeeded;
382     }
383 
try_receiveboost::ipc::reliable_message_queue::implementation384     bool try_receive(receive_handler handler, void* state)
385     {
386         if (m_stop)
387             return false;
388 
389         lock_queue();
390         header* const hdr = get_header();
391         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
392 
393         if (hdr->m_size == 0u)
394             return false;
395 
396         dequeue_message(handler, state);
397 
398         return true;
399     }
400 
stop_localboost::ipc::reliable_message_queue::implementation401     void stop_local()
402     {
403         if (m_stop)
404             return;
405 
406         lock_queue();
407         header* const hdr = get_header();
408         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
409 
410         m_stop = true;
411 
412         hdr->m_nonempty_queue.notify_all();
413         hdr->m_nonfull_queue.notify_all();
414     }
415 
reset_localboost::ipc::reliable_message_queue::implementation416     void reset_local()
417     {
418         m_stop = false;
419     }
420 
clearboost::ipc::reliable_message_queue::implementation421     void clear()
422     {
423         lock_queue();
424         header* const hdr = get_header();
425         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex);
426         clear_queue();
427     }
428 
429 private:
get_headerboost::ipc::reliable_message_queue::implementation430     header* get_header() const BOOST_NOEXCEPT
431     {
432         return static_cast< header* >(m_region.get_address());
433     }
434 
estimate_region_sizeboost::ipc::reliable_message_queue::implementation435     static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
436     {
437         return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
438     }
439 
create_regionboost::ipc::reliable_message_queue::implementation440     void create_region(uint32_t capacity, size_type block_size)
441     {
442         const std::size_t shmem_size = estimate_region_size(capacity, block_size);
443         m_shared_memory.truncate(shmem_size);
444         boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
445 
446         new (m_region.get_address()) header(capacity, block_size);
447 
448         init_block_size(block_size);
449     }
450 
adopt_regionboost::ipc::reliable_message_queue::implementation451     void adopt_region(std::size_t shmem_size)
452     {
453         if (shmem_size < sizeof(header))
454             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
455 
456         boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region);
457 
458         // Wait until the mapped region becomes initialized
459         header* const hdr = get_header();
460         BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 200u, spin_loops = 16u, spins = 16u;
461         for (unsigned int i = 0; i < wait_loops; ++i)
462         {
463             uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire);
464             while (ref_count > 0u)
465             {
466                 if (hdr->m_ref_count.compare_exchange_weak(ref_count, ref_count + 1u, boost::memory_order_acq_rel, boost::memory_order_acquire))
467                     goto done;
468             }
469 
470             if (i < spin_loops)
471             {
472                 for (unsigned int j = 0; j < spins; ++j)
473                 {
474                     boost::log::aux::pause();
475                 }
476             }
477             else
478             {
479 #if defined(BOOST_HAS_SCHED_YIELD)
480                 sched_yield();
481 #elif defined(BOOST_HAS_PTHREAD_YIELD)
482                 pthread_yield();
483 #elif defined(BOOST_HAS_NANOSLEEP)
484                 timespec ts = {};
485                 ts.tv_sec = 0;
486                 ts.tv_nsec = 1000;
487                 nanosleep(&ts, NULL);
488 #else
489                 usleep(1);
490 #endif
491             }
492         }
493 
494         BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
495 
496     done:
497         try
498         {
499             // Check that the queue layout matches the current process ABI
500             if (hdr->m_abi_tag != header::get_abi_tag())
501                 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
502 
503             if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
504                 BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
505 
506             init_block_size(hdr->m_block_size);
507         }
508         catch (...)
509         {
510             close_region();
511             throw;
512         }
513     }
514 
close_regionboost::ipc::reliable_message_queue::implementation515     void close_region() BOOST_NOEXCEPT
516     {
517         header* const hdr = get_header();
518 
519         if (hdr->m_ref_count.fetch_sub(1u, boost::memory_order_acq_rel) == 1u)
520         {
521             boost::interprocess::shared_memory_object::remove(m_shared_memory.get_name());
522 
523             hdr->~header();
524 
525             boost::interprocess::mapped_region().swap(m_region);
526             boost::interprocess::shared_memory_object().swap(m_shared_memory);
527 
528             m_block_size_mask = 0u;
529             m_block_size_log2 = 0u;
530         }
531     }
532 
init_block_sizeboost::ipc::reliable_message_queue::implementation533     void init_block_size(size_type block_size)
534     {
535         m_block_size_mask = block_size - 1u;
536 
537         uint32_t block_size_log2 = 0u;
538         if ((block_size & 0x0000ffff) == 0u)
539         {
540             block_size >>= 16u;
541             block_size_log2 += 16u;
542         }
543         if ((block_size & 0x000000ff) == 0u)
544         {
545             block_size >>= 8u;
546             block_size_log2 += 8u;
547         }
548         if ((block_size & 0x0000000f) == 0u)
549         {
550             block_size >>= 4u;
551             block_size_log2 += 4u;
552         }
553         if ((block_size & 0x00000003) == 0u)
554         {
555             block_size >>= 2u;
556             block_size_log2 += 2u;
557         }
558         if ((block_size & 0x00000001) == 0u)
559         {
560             ++block_size_log2;
561         }
562         m_block_size_log2 = block_size_log2;
563     }
564 
lock_queueboost::ipc::reliable_message_queue::implementation565     void lock_queue()
566     {
567         header* const hdr = get_header();
568 
569 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
570         try
571         {
572 #endif
573             hdr->m_mutex.lock();
574 #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST)
575         }
576         catch (boost::log::ipc::aux::lock_owner_dead&)
577         {
578             // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock
579             try
580             {
581                 clear_queue();
582                 hdr->m_mutex.recover();
583             }
584             catch (...)
585             {
586                 hdr->m_mutex.unlock();
587                 throw;
588             }
589         }
590 #endif
591     }
592 
clear_queueboost::ipc::reliable_message_queue::implementation593     void clear_queue()
594     {
595         header* const hdr = get_header();
596         hdr->m_size = 0u;
597         hdr->m_put_pos = 0u;
598         hdr->m_get_pos = 0u;
599         hdr->m_nonfull_queue.notify_all();
600     }
601 
602     //! Returns the number of allocation blocks that are required to store user's payload of the specified size
estimate_block_countboost::ipc::reliable_message_queue::implementation603     uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
604     {
605         // ceil((size + get_header_overhead()) / block_size)
606         return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
607     }
608 
609     //! Puts the message to the back of the queue
enqueue_messageboost::ipc::reliable_message_queue::implementation610     void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
611     {
612         header* const hdr = get_header();
613 
614         const uint32_t capacity = hdr->m_capacity;
615         const size_type block_size = hdr->m_block_size;
616         uint32_t pos = hdr->m_put_pos;
617 
618         block_header* block = hdr->get_block(pos);
619         block->m_size = message_size;
620 
621         size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
622         std::memcpy(block->get_data(), message_data, write_size);
623 
624         pos += block_count;
625         if (BOOST_UNLIKELY(pos >= capacity))
626         {
627             // Write the rest of the message at the beginning of the queue
628             pos -= capacity;
629             message_data = static_cast< const unsigned char* >(message_data) + write_size;
630             write_size = message_size - write_size;
631             if (write_size > 0u)
632                 std::memcpy(hdr->get_block(0u), message_data, write_size);
633         }
634 
635         hdr->m_put_pos = pos;
636 
637         const uint32_t old_queue_size = hdr->m_size;
638         hdr->m_size = old_queue_size + block_count;
639         if (old_queue_size == 0u)
640             hdr->m_nonempty_queue.notify_one();
641     }
642 
643     //! Retrieves the next message and invokes the handler to store the message contents
dequeue_messageboost::ipc::reliable_message_queue::implementation644     void dequeue_message(receive_handler handler, void* state)
645     {
646         header* const hdr = get_header();
647 
648         const uint32_t capacity = hdr->m_capacity;
649         const size_type block_size = hdr->m_block_size;
650         uint32_t pos = hdr->m_get_pos;
651 
652         block_header* block = hdr->get_block(pos);
653         size_type message_size = block->m_size;
654         uint32_t block_count = estimate_block_count(message_size);
655 
656         BOOST_ASSERT(block_count <= hdr->m_size);
657 
658         size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
659         handler(state, block->get_data(), read_size);
660 
661         pos += block_count;
662         if (BOOST_UNLIKELY(pos >= capacity))
663         {
664             // Read the tail of the message
665             pos -= capacity;
666             read_size = message_size - read_size;
667             if (read_size > 0u)
668                 handler(state, hdr->get_block(0u), read_size);
669         }
670 
671         hdr->m_get_pos = pos;
672         hdr->m_size -= block_count;
673 
674         hdr->m_nonfull_queue.notify_all();
675     }
676 };
677 
create(object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy,permissions const & perms)678 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
679 {
680     BOOST_ASSERT(m_impl == NULL);
681     if (!boost::log::aux::is_power_of_2(block_size))
682         BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
683     try
684     {
685         m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
686     }
687     catch (boost::exception& e)
688     {
689         e << boost::log::ipc::object_name_info(name);
690         throw;
691     }
692     catch (boost::interprocess::interprocess_exception& e)
693     {
694         BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
695     }
696 }
697 
open_or_create(object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy,permissions const & perms)698 BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
699 {
700     BOOST_ASSERT(m_impl == NULL);
701     if (!boost::log::aux::is_power_of_2(block_size))
702         BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
703     try
704     {
705         m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
706     }
707     catch (boost::exception& e)
708     {
709         e << boost::log::ipc::object_name_info(name);
710         throw;
711     }
712     catch (boost::interprocess::interprocess_exception& e)
713     {
714         BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
715     }
716 }
717 
open(object_name const & name,overflow_policy oflow_policy,permissions const &)718 BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const&)
719 {
720     BOOST_ASSERT(m_impl == NULL);
721     try
722     {
723         m_impl = new implementation(open_mode::open_only, name, oflow_policy);
724     }
725     catch (boost::exception& e)
726     {
727         e << boost::log::ipc::object_name_info(name);
728         throw;
729     }
730     catch (boost::interprocess::interprocess_exception& e)
731     {
732         BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name));
733     }
734 }
735 
clear()736 BOOST_LOG_API void reliable_message_queue::clear()
737 {
738     BOOST_ASSERT(m_impl != NULL);
739     try
740     {
741         m_impl->clear();
742     }
743     catch (boost::exception& e)
744     {
745         e << boost::log::ipc::object_name_info(m_impl->name());
746         throw;
747     }
748 }
749 
name() const750 BOOST_LOG_API object_name const& reliable_message_queue::name() const
751 {
752     BOOST_ASSERT(m_impl != NULL);
753     return m_impl->name();
754 }
755 
capacity() const756 BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
757 {
758     BOOST_ASSERT(m_impl != NULL);
759     return m_impl->capacity();
760 }
761 
block_size() const762 BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
763 {
764     BOOST_ASSERT(m_impl != NULL);
765     return m_impl->block_size();
766 }
767 
stop_local()768 BOOST_LOG_API void reliable_message_queue::stop_local()
769 {
770     BOOST_ASSERT(m_impl != NULL);
771     try
772     {
773         m_impl->stop_local();
774     }
775     catch (boost::exception& e)
776     {
777         e << boost::log::ipc::object_name_info(m_impl->name());
778         throw;
779     }
780 }
781 
reset_local()782 BOOST_LOG_API void reliable_message_queue::reset_local()
783 {
784     BOOST_ASSERT(m_impl != NULL);
785     try
786     {
787         m_impl->reset_local();
788     }
789     catch (boost::exception& e)
790     {
791         e << boost::log::ipc::object_name_info(m_impl->name());
792         throw;
793     }
794 }
795 
do_close()796 BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
797 {
798     delete m_impl;
799     m_impl = NULL;
800 }
801 
send(void const * message_data,size_type message_size)802 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
803 {
804     BOOST_ASSERT(m_impl != NULL);
805     try
806     {
807         return m_impl->send(message_data, message_size);
808     }
809     catch (boost::exception& e)
810     {
811         e << boost::log::ipc::object_name_info(m_impl->name());
812         throw;
813     }
814 }
815 
try_send(void const * message_data,size_type message_size)816 BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
817 {
818     BOOST_ASSERT(m_impl != NULL);
819     try
820     {
821         return m_impl->try_send(message_data, message_size);
822     }
823     catch (boost::exception& e)
824     {
825         e << boost::log::ipc::object_name_info(m_impl->name());
826         throw;
827     }
828 }
829 
do_receive(receive_handler handler,void * state)830 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
831 {
832     BOOST_ASSERT(m_impl != NULL);
833     try
834     {
835         return m_impl->receive(handler, state);
836     }
837     catch (boost::exception& e)
838     {
839         e << boost::log::ipc::object_name_info(m_impl->name());
840         throw;
841     }
842 }
843 
do_try_receive(receive_handler handler,void * state)844 BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
845 {
846     BOOST_ASSERT(m_impl != NULL);
847     try
848     {
849         return m_impl->try_receive(handler, state);
850     }
851     catch (boost::exception& e)
852     {
853         e << boost::log::ipc::object_name_info(m_impl->name());
854         throw;
855     }
856 }
857 
858 //! Fixed buffer receive handler
fixed_buffer_receive_handler(void * state,const void * data,size_type size)859 BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
860 {
861     fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
862     if (BOOST_UNLIKELY(size > p->size))
863         BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
864 
865     std::memcpy(p->data, data, size);
866     p->data += size;
867     p->size -= size;
868 }
869 
remove(object_name const & name)870 BOOST_LOG_API void reliable_message_queue::remove(object_name const& name)
871 {
872     boost::interprocess::shared_memory_object::remove(name.c_str());
873 }
874 
875 } // namespace ipc
876 
877 BOOST_LOG_CLOSE_NAMESPACE // namespace log
878 
879 } // namespace boost
880 
881 #include <boost/log/detail/footer.hpp>
882