• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *                Copyright Lingxi Li 2015.
3  *             Copyright Andrey Semashev 2016.
4  * Distributed under the Boost Software License, Version 1.0.
5  *    (See accompanying file LICENSE_1_0.txt or copy at
6  *          http://www.boost.org/LICENSE_1_0.txt)
7  */
8 /*!
9  * \file   ipc_reliable_message_queue_win.hpp
10  * \author Lingxi Li
11  * \author Andrey Semashev
12  * \date   28.10.2015
13  *
14  * \brief  This header is the Boost.Log library implementation, see the library documentation
15  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
16  *
17  * This file provides an interprocess message queue implementation on POSIX platforms.
18  */
19 
20 #include <boost/log/detail/config.hpp>
21 #include <cstddef>
22 #include <cstring>
23 #include <new>
24 #include <limits>
25 #include <string>
26 #include <algorithm>
27 #include <stdexcept>
28 #include <boost/assert.hpp>
29 #include <boost/static_assert.hpp>
30 #include <boost/cstdint.hpp>
31 #include <boost/atomic/atomic.hpp>
32 #include <boost/atomic/capabilities.hpp>
33 #include <boost/log/exceptions.hpp>
34 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
35 #include <boost/log/support/exception.hpp>
36 #include <boost/log/detail/pause.hpp>
37 #include <boost/exception/info.hpp>
38 #include <boost/exception/enable_error_info.hpp>
39 #include <boost/align/align_up.hpp>
40 #include <boost/winapi/thread.hpp> // SwitchToThread
41 #include "windows/ipc_sync_wrappers.hpp"
42 #include "windows/mapped_shared_memory.hpp"
43 #include "windows/utf_code_conversion.hpp"
44 #include "murmur3.hpp"
45 #include "bit_tools.hpp"
46 #include <windows.h>
47 #include <boost/log/detail/header.hpp>
48 
49 #if BOOST_ATOMIC_INT32_LOCK_FREE != 2
50 // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory
51 #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform
52 #endif
53 
54 //! A suffix used in names of interprocess objects created by the queue.
55 //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types.
56 #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651"
57 
58 namespace boost {
59 
60 BOOST_LOG_OPEN_NAMESPACE
61 
62 namespace ipc {
63 
64 //! Message queue implementation data
65 struct reliable_message_queue::implementation
66 {
67 private:
68     //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment.
69     struct block_header
70     {
71         // Element data alignment, in bytes
72         enum { data_alignment = 32u };
73 
74         //! Size of the element data, in bytes
75         size_type m_size;
76 
77         //! Returns the block header overhead, in bytes
get_header_overheadboost::ipc::reliable_message_queue::implementation::block_header78         static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT
79         {
80             return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment));
81         }
82 
83         //! Returns a pointer to the element data
get_databoost::ipc::reliable_message_queue::implementation::block_header84         void* get_data() const BOOST_NOEXCEPT
85         {
86             return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead();
87         }
88     };
89 
90     //! Header of the message queue. Placed at the beginning of the shared memory segment.
91     struct header
92     {
93         // Increment this constant whenever you change the binary layout of the queue (apart from this header structure)
94         enum { abi_version = 0 };
95 
96         // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!!
97 
98         //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment.
99         uint32_t m_abi_tag;
100         //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability.
101         unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)];
102         //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0).
103         boost::atomic< uint32_t > m_initialized;
104         //! Number of allocation blocks in the queue.
105         const uint32_t m_capacity;
106         //! Size of an allocation block, in bytes.
107         const size_type m_block_size;
108         //! Shared state of the mutex for protecting queue data structures.
109         boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state;
110         //! Shared state of the condition variable used to block writers when the queue is full.
111         boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state;
112         //! The current number of allocated blocks in the queue.
113         uint32_t m_size;
114         //! The current writing position (allocation block index).
115         uint32_t m_put_pos;
116         //! The current reading position (allocation block index).
117         uint32_t m_get_pos;
118 
headerboost::ipc::reliable_message_queue::implementation::header119         header(uint32_t capacity, size_type block_size) :
120             m_abi_tag(get_abi_tag()),
121             m_capacity(capacity),
122             m_block_size(block_size),
123             m_size(0u),
124             m_put_pos(0u),
125             m_get_pos(0u)
126         {
127             // Must be initialized last. m_initialized is zero-initialized initially.
128             m_initialized.fetch_add(1u, boost::memory_order_release);
129         }
130 
131         //! Returns the header structure ABI tag
get_abi_tagboost::ipc::reliable_message_queue::implementation::header132         static uint32_t get_abi_tag() BOOST_NOEXCEPT
133         {
134             // This FOURCC identifies the queue type
135             boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q'));
136 
137             // This FOURCC identifies the queue implementation
138             hash.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5'));
139             hash.mix(abi_version);
140 
141             // We will use these constants to align pointers
142             hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE);
143             hash.mix(block_header::data_alignment);
144 
145             // The members in the sequence below must be enumerated in the same order as they are declared in the header structure.
146             // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header.
147 
148 #define BOOST_LOG_MIX_HEADER_MEMBER(name)\
149             hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\
150             hash.mix(static_cast< uint32_t >(offsetof(header, name)))
151 
152             BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag);
153             BOOST_LOG_MIX_HEADER_MEMBER(m_padding);
154             BOOST_LOG_MIX_HEADER_MEMBER(m_initialized);
155             BOOST_LOG_MIX_HEADER_MEMBER(m_capacity);
156             BOOST_LOG_MIX_HEADER_MEMBER(m_block_size);
157             BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state);
158             BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state);
159             BOOST_LOG_MIX_HEADER_MEMBER(m_size);
160             BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos);
161             BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos);
162 
163 #undef BOOST_LOG_MIX_HEADER_MEMBER
164 
165             return hash.finalize();
166         }
167 
168         //! Returns an element header at the specified index
get_blockboost::ipc::reliable_message_queue::implementation::header169         block_header* get_block(uint32_t index) const BOOST_NOEXCEPT
170         {
171             BOOST_ASSERT(index < m_capacity);
172             unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE);
173             p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index);
174             return reinterpret_cast< block_header* >(p);
175         }
176 
177         BOOST_DELETED_FUNCTION(header(header const&))
178         BOOST_DELETED_FUNCTION(header& operator=(header const&))
179     };
180 
181 private:
182     //! Shared memory object and mapping
183     boost::log::ipc::aux::mapped_shared_memory m_shared_memory;
184     //! Queue overflow handling policy
185     const overflow_policy m_overflow_policy;
186     //! The mask for selecting bits that constitute size values from 0 to (block_size - 1)
187     size_type m_block_size_mask;
188     //! The number of the bit set in block_size (i.e. log base 2 of block_size)
189     uint32_t m_block_size_log2;
190 
191     //! Mutex for protecting queue data structures.
192     boost::log::ipc::aux::interprocess_mutex m_mutex;
193     //! Event used to block readers when the queue is empty.
194     boost::log::ipc::aux::interprocess_event m_nonempty_queue;
195     //! Condition variable used to block writers when the queue is full.
196     boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue;
197     //! The event indicates that stop has been requested
198     boost::log::ipc::aux::auto_handle m_stop;
199 
200     //! The queue name, as specified by the user
201     const object_name m_name;
202 
203 public:
204     //! The constructor creates a new shared memory segment
implementationboost::ipc::reliable_message_queue::implementation205     implementation
206     (
207         open_mode::create_only_tag,
208         object_name const& name,
209         uint32_t capacity,
210         size_type block_size,
211         overflow_policy oflow_policy,
212         permissions const& perms
213     ) :
214         m_overflow_policy(oflow_policy),
215         m_block_size_mask(0u),
216         m_block_size_log2(0u),
217         m_name(name)
218     {
219         const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
220         const std::size_t shmem_size = estimate_region_size(capacity, block_size);
221         m_shared_memory.create(wname.c_str(), shmem_size, perms);
222         m_shared_memory.map();
223 
224         create_queue(wname, capacity, block_size, perms);
225     }
226 
227     //! The constructor creates a new shared memory segment or opens the existing one
implementationboost::ipc::reliable_message_queue::implementation228     implementation
229     (
230         open_mode::open_or_create_tag,
231         object_name const& name,
232         uint32_t capacity,
233         size_type block_size,
234         overflow_policy oflow_policy,
235         permissions const& perms
236     ) :
237         m_overflow_policy(oflow_policy),
238         m_block_size_mask(0u),
239         m_block_size_log2(0u),
240         m_name(name)
241     {
242         const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
243         const std::size_t shmem_size = estimate_region_size(capacity, block_size);
244         const bool created = m_shared_memory.create_or_open(wname.c_str(), shmem_size, perms);
245         m_shared_memory.map();
246 
247         if (created)
248             create_queue(wname, capacity, block_size, perms);
249         else
250             adopt_queue(wname, m_shared_memory.size(), perms);
251     }
252 
253     //! The constructor opens the existing shared memory segment
implementationboost::ipc::reliable_message_queue::implementation254     implementation
255     (
256         open_mode::open_only_tag,
257         object_name const& name,
258         overflow_policy oflow_policy,
259         permissions const& perms
260     ) :
261         m_overflow_policy(oflow_policy),
262         m_block_size_mask(0u),
263         m_block_size_log2(0u),
264         m_name(name)
265     {
266         const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str());
267         m_shared_memory.open(wname.c_str());
268         m_shared_memory.map();
269 
270         adopt_queue(wname, m_shared_memory.size(), perms);
271     }
272 
nameboost::ipc::reliable_message_queue::implementation273     object_name const& name() const BOOST_NOEXCEPT
274     {
275         return m_name;
276     }
277 
capacityboost::ipc::reliable_message_queue::implementation278     uint32_t capacity() const BOOST_NOEXCEPT
279     {
280         return get_header()->m_capacity;
281     }
282 
block_sizeboost::ipc::reliable_message_queue::implementation283     size_type block_size() const BOOST_NOEXCEPT
284     {
285         return get_header()->m_block_size;
286     }
287 
sendboost::ipc::reliable_message_queue::implementation288     operation_result send(void const* message_data, size_type message_size)
289     {
290         const uint32_t block_count = estimate_block_count(message_size);
291 
292         header* const hdr = get_header();
293 
294         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
295             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
296 
297         if (!lock_queue())
298             return aborted;
299 
300         boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
301 
302         while (true)
303         {
304             if ((hdr->m_capacity - hdr->m_size) >= block_count)
305                 break;
306 
307             const overflow_policy oflow_policy = m_overflow_policy;
308             if (oflow_policy == fail_on_overflow)
309                 return no_space;
310             else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow))
311                 BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full");
312 
313             if (!m_nonfull_queue.wait(unlock, m_stop.get()))
314                 return aborted;
315         }
316 
317         enqueue_message(message_data, message_size, block_count);
318 
319         return succeeded;
320     }
321 
try_sendboost::ipc::reliable_message_queue::implementation322     bool try_send(void const* message_data, size_type message_size)
323     {
324         const uint32_t block_count = estimate_block_count(message_size);
325 
326         header* const hdr = get_header();
327 
328         if (BOOST_UNLIKELY(block_count > hdr->m_capacity))
329             BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity");
330 
331         if (!lock_queue())
332             return false;
333 
334         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
335 
336         if ((hdr->m_capacity - hdr->m_size) < block_count)
337             return false;
338 
339         enqueue_message(message_data, message_size, block_count);
340 
341         return true;
342     }
343 
receiveboost::ipc::reliable_message_queue::implementation344     operation_result receive(receive_handler handler, void* state)
345     {
346         if (!lock_queue())
347             return aborted;
348 
349         boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex);
350 
351         header* const hdr = get_header();
352 
353         while (true)
354         {
355             if (hdr->m_size > 0u)
356                 break;
357 
358             m_mutex.unlock();
359             unlock.disengage();
360 
361             if (!m_nonempty_queue.wait(m_stop.get()) || !lock_queue())
362                 return aborted;
363 
364             unlock.engage(m_mutex);
365         }
366 
367         dequeue_message(handler, state);
368 
369         return succeeded;
370     }
371 
try_receiveboost::ipc::reliable_message_queue::implementation372     bool try_receive(receive_handler handler, void* state)
373     {
374         if (!lock_queue())
375             return false;
376 
377         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
378 
379         header* const hdr = get_header();
380         if (hdr->m_size == 0u)
381             return false;
382 
383         dequeue_message(handler, state);
384 
385         return true;
386     }
387 
stop_localboost::ipc::reliable_message_queue::implementation388     void stop_local()
389     {
390         BOOST_VERIFY(boost::winapi::SetEvent(m_stop.get()) != 0);
391     }
392 
reset_localboost::ipc::reliable_message_queue::implementation393     void reset_local()
394     {
395         BOOST_VERIFY(boost::winapi::ResetEvent(m_stop.get()) != 0);
396     }
397 
clearboost::ipc::reliable_message_queue::implementation398     void clear()
399     {
400         m_mutex.lock();
401         boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex);
402         clear_queue();
403     }
404 
405 private:
get_headerboost::ipc::reliable_message_queue::implementation406     header* get_header() const BOOST_NOEXCEPT
407     {
408         return static_cast< header* >(m_shared_memory.address());
409     }
410 
estimate_region_sizeboost::ipc::reliable_message_queue::implementation411     static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT
412     {
413         return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size);
414     }
415 
create_stop_eventboost::ipc::reliable_message_queue::implementation416     void create_stop_event()
417     {
418 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
419         boost::winapi::HANDLE_ h = boost::winapi::CreateEventExW
420         (
421             NULL, // permissions
422             NULL, // name
423             boost::winapi::CREATE_EVENT_MANUAL_RESET_,
424             boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_
425         );
426 #else
427         boost::winapi::HANDLE_ h = boost::winapi::CreateEventW
428         (
429             NULL, // permissions
430             true, // manual reset
431             false, // initial state
432             NULL // name
433         );
434 #endif
435         if (BOOST_UNLIKELY(h == NULL))
436         {
437             boost::winapi::DWORD_ err = boost::winapi::GetLastError();
438             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an stop event object", (err));
439         }
440 
441         m_stop.init(h);
442     }
443 
create_queueboost::ipc::reliable_message_queue::implementation444     void create_queue(std::wstring const& name, uint32_t capacity, size_type block_size, permissions const& perms)
445     {
446         // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized
447         header* const hdr = get_header();
448         m_mutex.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state, perms);
449         m_nonempty_queue.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str(), false, perms);
450         m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
451         create_stop_event();
452 
453         new (hdr) header(capacity, block_size);
454 
455         init_block_size(block_size);
456     }
457 
adopt_queueboost::ipc::reliable_message_queue::implementation458     void adopt_queue(std::wstring const& name, std::size_t shmem_size, permissions const& perms)
459     {
460         if (shmem_size < sizeof(header))
461             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small");
462 
463         // Wait until the mapped region becomes initialized
464         header* const hdr = get_header();
465         BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 1000u, spin_loops = 16u, spins = 16u;
466         for (unsigned int i = 0; i < wait_loops; ++i)
467         {
468             uint32_t initialized = hdr->m_initialized.load(boost::memory_order_acquire);
469             if (initialized)
470             {
471                 goto done;
472             }
473 
474             if (i < spin_loops)
475             {
476                 for (unsigned int j = 0; j < spins; ++j)
477                 {
478                     boost::log::aux::pause();
479                 }
480             }
481             else
482             {
483                 boost::winapi::SwitchToThread();
484             }
485         }
486 
487         BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long");
488 
489     done:
490         // Check that the queue layout matches the current process ABI
491         if (hdr->m_abi_tag != header::get_abi_tag())
492             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible");
493 
494         if (!boost::log::aux::is_power_of_2(hdr->m_block_size))
495             BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2");
496 
497         m_mutex.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state);
498         m_nonempty_queue.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str());
499         m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms);
500         create_stop_event();
501 
502         init_block_size(hdr->m_block_size);
503     }
504 
init_block_sizeboost::ipc::reliable_message_queue::implementation505     void init_block_size(size_type block_size)
506     {
507         m_block_size_mask = block_size - 1u;
508 
509         uint32_t block_size_log2 = 0u;
510         if ((block_size & 0x0000ffff) == 0u)
511         {
512             block_size >>= 16u;
513             block_size_log2 += 16u;
514         }
515         if ((block_size & 0x000000ff) == 0u)
516         {
517             block_size >>= 8u;
518             block_size_log2 += 8u;
519         }
520         if ((block_size & 0x0000000f) == 0u)
521         {
522             block_size >>= 4u;
523             block_size_log2 += 4u;
524         }
525         if ((block_size & 0x00000003) == 0u)
526         {
527             block_size >>= 2u;
528             block_size_log2 += 2u;
529         }
530         if ((block_size & 0x00000001) == 0u)
531         {
532             ++block_size_log2;
533         }
534         m_block_size_log2 = block_size_log2;
535     }
536 
lock_queueboost::ipc::reliable_message_queue::implementation537     bool lock_queue()
538     {
539         return m_mutex.lock(m_stop.get());
540     }
541 
clear_queueboost::ipc::reliable_message_queue::implementation542     void clear_queue()
543     {
544         header* const hdr = get_header();
545         hdr->m_size = 0u;
546         hdr->m_put_pos = 0u;
547         hdr->m_get_pos = 0u;
548         m_nonfull_queue.notify_all();
549     }
550 
551     //! Returns the number of allocation blocks that are required to store user's payload of the specified size
estimate_block_countboost::ipc::reliable_message_queue::implementation552     uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT
553     {
554         // ceil((size + get_header_overhead()) / block_size)
555         return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2);
556     }
557 
558     //! Puts the message to the back of the queue
enqueue_messageboost::ipc::reliable_message_queue::implementation559     void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count)
560     {
561         header* const hdr = get_header();
562 
563         const uint32_t capacity = hdr->m_capacity;
564         const size_type block_size = hdr->m_block_size;
565         uint32_t pos = hdr->m_put_pos;
566 
567         block_header* block = hdr->get_block(pos);
568         block->m_size = message_size;
569 
570         size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
571         std::memcpy(block->get_data(), message_data, write_size);
572 
573         pos += block_count;
574         if (BOOST_UNLIKELY(pos >= capacity))
575         {
576             // Write the rest of the message at the beginning of the queue
577             pos -= capacity;
578             message_data = static_cast< const unsigned char* >(message_data) + write_size;
579             write_size = message_size - write_size;
580             if (write_size > 0u)
581                 std::memcpy(hdr->get_block(0u), message_data, write_size);
582         }
583 
584         hdr->m_put_pos = pos;
585 
586         const uint32_t old_queue_size = hdr->m_size;
587         hdr->m_size = old_queue_size + block_count;
588         if (old_queue_size == 0u)
589             m_nonempty_queue.set();
590     }
591 
592     //! Retrieves the next message and invokes the handler to store the message contents
dequeue_messageboost::ipc::reliable_message_queue::implementation593     void dequeue_message(receive_handler handler, void* state)
594     {
595         header* const hdr = get_header();
596 
597         const uint32_t capacity = hdr->m_capacity;
598         const size_type block_size = hdr->m_block_size;
599         uint32_t pos = hdr->m_get_pos;
600 
601         block_header* block = hdr->get_block(pos);
602         size_type message_size = block->m_size;
603         uint32_t block_count = estimate_block_count(message_size);
604 
605         BOOST_ASSERT(block_count <= hdr->m_size);
606 
607         size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size);
608         handler(state, block->get_data(), read_size);
609 
610         pos += block_count;
611         if (BOOST_UNLIKELY(pos >= capacity))
612         {
613             // Read the tail of the message
614             pos -= capacity;
615             read_size = message_size - read_size;
616             if (read_size > 0u)
617                 handler(state, hdr->get_block(0u), read_size);
618         }
619 
620         hdr->m_get_pos = pos;
621         hdr->m_size -= block_count;
622 
623         m_nonfull_queue.notify_all();
624     }
625 };
626 
create(object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy,permissions const & perms)627 BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
628 {
629     BOOST_ASSERT(m_impl == NULL);
630     if (!boost::log::aux::is_power_of_2(block_size))
631         BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
632     try
633     {
634         m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
635     }
636     catch (boost::exception& e)
637     {
638         e << boost::log::ipc::object_name_info(name);
639         throw;
640     }
641 }
642 
open_or_create(object_name const & name,uint32_t capacity,size_type block_size,overflow_policy oflow_policy,permissions const & perms)643 BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms)
644 {
645     BOOST_ASSERT(m_impl == NULL);
646     if (!boost::log::aux::is_power_of_2(block_size))
647         BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2"));
648     try
649     {
650         m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms);
651     }
652     catch (boost::exception& e)
653     {
654         e << boost::log::ipc::object_name_info(name);
655         throw;
656     }
657 }
658 
open(object_name const & name,overflow_policy oflow_policy,permissions const & perms)659 BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const& perms)
660 {
661     BOOST_ASSERT(m_impl == NULL);
662     try
663     {
664         m_impl = new implementation(open_mode::open_only, name, oflow_policy, perms);
665     }
666     catch (boost::exception& e)
667     {
668         e << boost::log::ipc::object_name_info(name);
669         throw;
670     }
671 }
672 
clear()673 BOOST_LOG_API void reliable_message_queue::clear()
674 {
675     BOOST_ASSERT(m_impl != NULL);
676     try
677     {
678         m_impl->clear();
679     }
680     catch (boost::exception& e)
681     {
682         e << boost::log::ipc::object_name_info(m_impl->name());
683         throw;
684     }
685 }
686 
name() const687 BOOST_LOG_API object_name const& reliable_message_queue::name() const
688 {
689     BOOST_ASSERT(m_impl != NULL);
690     return m_impl->name();
691 }
692 
capacity() const693 BOOST_LOG_API uint32_t reliable_message_queue::capacity() const
694 {
695     BOOST_ASSERT(m_impl != NULL);
696     return m_impl->capacity();
697 }
698 
block_size() const699 BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const
700 {
701     BOOST_ASSERT(m_impl != NULL);
702     return m_impl->block_size();
703 }
704 
stop_local()705 BOOST_LOG_API void reliable_message_queue::stop_local()
706 {
707     BOOST_ASSERT(m_impl != NULL);
708     try
709     {
710         m_impl->stop_local();
711     }
712     catch (boost::exception& e)
713     {
714         e << boost::log::ipc::object_name_info(m_impl->name());
715         throw;
716     }
717 }
718 
reset_local()719 BOOST_LOG_API void reliable_message_queue::reset_local()
720 {
721     BOOST_ASSERT(m_impl != NULL);
722     try
723     {
724         m_impl->reset_local();
725     }
726     catch (boost::exception& e)
727     {
728         e << boost::log::ipc::object_name_info(m_impl->name());
729         throw;
730     }
731 }
732 
do_close()733 BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT
734 {
735     delete m_impl;
736     m_impl = NULL;
737 }
738 
send(void const * message_data,size_type message_size)739 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size)
740 {
741     BOOST_ASSERT(m_impl != NULL);
742     try
743     {
744         return m_impl->send(message_data, message_size);
745     }
746     catch (boost::exception& e)
747     {
748         e << boost::log::ipc::object_name_info(m_impl->name());
749         throw;
750     }
751 }
752 
try_send(void const * message_data,size_type message_size)753 BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size)
754 {
755     BOOST_ASSERT(m_impl != NULL);
756     try
757     {
758         return m_impl->try_send(message_data, message_size);
759     }
760     catch (boost::exception& e)
761     {
762         e << boost::log::ipc::object_name_info(m_impl->name());
763         throw;
764     }
765 }
766 
do_receive(receive_handler handler,void * state)767 BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state)
768 {
769     BOOST_ASSERT(m_impl != NULL);
770     try
771     {
772         return m_impl->receive(handler, state);
773     }
774     catch (boost::exception& e)
775     {
776         e << boost::log::ipc::object_name_info(m_impl->name());
777         throw;
778     }
779 }
780 
do_try_receive(receive_handler handler,void * state)781 BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state)
782 {
783     BOOST_ASSERT(m_impl != NULL);
784     try
785     {
786         return m_impl->try_receive(handler, state);
787     }
788     catch (boost::exception& e)
789     {
790         e << boost::log::ipc::object_name_info(m_impl->name());
791         throw;
792     }
793 }
794 
795 //! Fixed buffer receive handler
fixed_buffer_receive_handler(void * state,const void * data,size_type size)796 BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size)
797 {
798     fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state);
799     if (BOOST_UNLIKELY(size > p->size))
800         BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message"));
801 
802     std::memcpy(p->data, data, size);
803     p->data += size;
804     p->size -= size;
805 }
806 
remove(object_name const &)807 BOOST_LOG_API void reliable_message_queue::remove(object_name const&)
808 {
809     // System objects are reference counted on Windows, nothing to do here
810 }
811 
812 } // namespace ipc
813 
814 BOOST_LOG_CLOSE_NAMESPACE // namespace log
815 
816 } // namespace boost
817 
818 #include <boost/log/detail/footer.hpp>
819