• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *               Copyright Lingxi Li 2015.
3  *            Copyright Andrey Semashev 2016.
4  * Distributed under the Boost Software License, Version 1.0.
5  *    (See accompanying file LICENSE_1_0.txt or copy at
6  *          http://www.boost.org/LICENSE_1_0.txt)
7  */
8 /*!
9  * \file   util_ipc_reliable_mq.cpp
10  * \author Lingxi Li
11  * \author Andrey Semashev
12  * \date   19.10.2015
13  *
14  * \brief  The test verifies that \c ipc::reliable_message_queue works.
15  */
16 
17 #if !defined(BOOST_LOG_WITHOUT_IPC)
18 
19 #define BOOST_TEST_MODULE util_ipc_reliable_mq
20 
21 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
22 #include <boost/log/utility/ipc/object_name.hpp>
23 #include <boost/log/utility/permissions.hpp>
24 #include <boost/log/utility/open_mode.hpp>
25 #include <boost/log/exceptions.hpp>
26 #include <boost/test/unit_test.hpp>
27 #include <cstddef>
28 #include <cstring>
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <stdexcept>
33 #include <boost/move/utility_core.hpp>
34 #if !defined(BOOST_LOG_NO_THREADS)
35 #include <algorithm>
36 #include <boost/ref.hpp>
37 #include <boost/atomic/fences.hpp>
38 #include <boost/thread/thread.hpp>
39 #include <boost/chrono/duration.hpp>
40 #endif
41 #include "char_definitions.hpp"
42 
43 typedef boost::log::ipc::reliable_message_queue queue_t;
44 typedef queue_t::size_type size_type;
45 
46 const boost::log::ipc::object_name ipc_queue_name(boost::log::ipc::object_name::session, "boost_log_test_ipc_reliable_mq");
47 const unsigned int capacity = 512;
48 const size_type block_size = 1024;
49 const char message1[] = "Hello, world!";
50 const char message2[] = "Hello, the brand new world!";
51 
BOOST_AUTO_TEST_CASE(basic_functionality)52 BOOST_AUTO_TEST_CASE(basic_functionality)
53 {
54     // Default constructor.
55     {
56         queue_t queue;
57         BOOST_CHECK(!queue.is_open());
58     }
59 
60     // Do a remove in case if a previous test failed
61     queue_t::remove(ipc_queue_name);
62 
63     // Opening a non-existing queue
64     try
65     {
66         queue_t queue(boost::log::open_mode::open_only, ipc_queue_name);
67         BOOST_FAIL("Non-existing queue open succeeded, although it shouldn't have");
68     }
69     catch (std::exception&)
70     {
71         BOOST_TEST_PASSPOINT();
72     }
73 
74     // Create constructor and destructor.
75     {
76         queue_t queue(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
77         BOOST_CHECK(equal_strings(queue.name().c_str(), ipc_queue_name.c_str()));
78         BOOST_CHECK(queue.is_open());
79         BOOST_CHECK_EQUAL(queue.capacity(), capacity);
80         BOOST_CHECK_EQUAL(queue.block_size(), block_size);
81     }
82 
83     // Creating a duplicate queue
84     try
85     {
86         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
87         queue_t queue_b(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
88         BOOST_FAIL("Creating a duplicate queue succeeded, although it shouldn't have");
89     }
90     catch (std::exception&)
91     {
92         BOOST_TEST_PASSPOINT();
93     }
94 
95     // Opening an existing queue
96     {
97         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
98         BOOST_CHECK(queue_a.is_open());
99 
100         queue_t queue_b(boost::log::open_mode::open_or_create, ipc_queue_name, capacity * 2u, block_size * 2u); // queue geometry differs from the existing queue
101         BOOST_CHECK(queue_b.is_open());
102         BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
103         BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
104         BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
105 
106         queue_t queue_c(boost::log::open_mode::open_only, ipc_queue_name);
107         BOOST_CHECK(queue_c.is_open());
108         BOOST_CHECK(equal_strings(queue_c.name().c_str(), ipc_queue_name.c_str()));
109         BOOST_CHECK_EQUAL(queue_c.capacity(), capacity);
110         BOOST_CHECK_EQUAL(queue_c.block_size(), block_size);
111     }
112     // Closing a queue
113     {
114         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
115         BOOST_CHECK(queue_a.is_open());
116         queue_a.close();
117         BOOST_CHECK(!queue_a.is_open());
118         // Duplicate close()
119         queue_a.close();
120         BOOST_CHECK(!queue_a.is_open());
121     }
122     // Move constructor.
123     {
124         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
125         queue_t queue_b(boost::move(queue_a));
126         BOOST_CHECK(!queue_a.is_open());
127         BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
128         BOOST_CHECK(queue_b.is_open());
129         BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
130         BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
131     }
132     // Move assignment operator.
133     {
134         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
135         queue_t queue_b;
136         queue_b = boost::move(queue_a);
137         BOOST_CHECK(!queue_a.is_open());
138         BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
139         BOOST_CHECK(queue_b.is_open());
140         BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
141         BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
142     }
143     // Member and non-member swaps.
144     {
145         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
146         queue_a.swap(queue_a);
147         BOOST_CHECK(queue_a.is_open());
148         BOOST_CHECK(equal_strings(queue_a.name().c_str(), ipc_queue_name.c_str()));
149         BOOST_CHECK_EQUAL(queue_a.capacity(), capacity);
150         BOOST_CHECK_EQUAL(queue_a.block_size(), block_size);
151 
152         queue_t queue_b;
153         swap(queue_a, queue_b);
154         BOOST_CHECK(!queue_a.is_open());
155         BOOST_CHECK(queue_b.is_open());
156         BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
157         BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
158         BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
159     }
160 }
161 
BOOST_AUTO_TEST_CASE(message_passing)162 BOOST_AUTO_TEST_CASE(message_passing)
163 {
164     // try_send() and try_receive()
165     {
166         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
167         queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
168         BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
169         BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
170         char buffer[block_size] = {};
171         size_type message_size = 0u;
172         BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
173         BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
174         BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
175         BOOST_CHECK(!queue_b.try_receive(buffer, sizeof(buffer), message_size));
176 
177         BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
178         std::string msg;
179         BOOST_CHECK(queue_b.try_receive(msg));
180         BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
181         BOOST_CHECK_EQUAL(msg, message2);
182 
183         BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
184         std::vector< unsigned char > buf;
185         BOOST_CHECK(queue_b.try_receive(buf));
186         BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
187         BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
188     }
189 
190     // send() and receive() without blocking
191     {
192         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
193         queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
194         BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
195         char buffer[block_size] = {};
196         size_type message_size = 0u;
197         BOOST_CHECK(queue_b.receive(buffer, sizeof(buffer), message_size) == queue_t::succeeded);
198         BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
199         BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
200 
201         BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
202         std::string msg;
203         BOOST_CHECK(queue_b.receive(msg) == queue_t::succeeded);
204         BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
205         BOOST_CHECK_EQUAL(msg, message2);
206 
207         BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
208         std::vector< unsigned char > buf;
209         BOOST_CHECK(queue_b.receive(buf) == queue_t::succeeded);
210         BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
211         BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
212     }
213 
214     // send() with an error code on overflow
215     {
216         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::fail_on_overflow);
217         BOOST_TEST_PASSPOINT();
218         BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
219         BOOST_TEST_PASSPOINT();
220 
221         queue_t::operation_result res = queue_a.send(message1, sizeof(message1) - 1u);
222         BOOST_CHECK_EQUAL(res, queue_t::no_space);
223     }
224 
225     // send() with an exception on overflow
226     {
227         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::throw_on_overflow);
228         BOOST_TEST_PASSPOINT();
229         BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
230         BOOST_TEST_PASSPOINT();
231         try
232         {
233             queue_a.send(message1, sizeof(message1) - 1u);
234             BOOST_FAIL("Owerflowing the queue succeeded, although it shouldn't have");
235         }
236         catch (boost::log::capacity_limit_reached&)
237         {
238             BOOST_TEST_PASSPOINT();
239         }
240     }
241 
242     // send() and receive() for messages larger than block_size. The message size and queue capacity below are such
243     // that the last enqueued message is expected to be split in the queue storage.
244     {
245         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 5u, block_size);
246         queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
247 
248         const size_type message_size = block_size * 3u / 2u;
249         std::vector< unsigned char > send_data;
250         send_data.resize(message_size);
251         for (unsigned int i = 0; i < message_size; ++i)
252             send_data[i] = static_cast< unsigned char >(i & 0xFF);
253 
254         BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
255 
256         for (unsigned int i = 0; i < 3; ++i)
257         {
258             BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
259 
260             std::vector< unsigned char > receive_data;
261             BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
262             BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
263         }
264 
265         std::vector< unsigned char > receive_data;
266         BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
267         BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
268     }
269 
270     // clear()
271     {
272         queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
273         queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
274         BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
275         BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
276 
277         queue_a.clear();
278 
279         BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
280         char buffer[block_size] = {};
281         size_type message_size = 0u;
282         BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
283         BOOST_CHECK_EQUAL(message_size, sizeof(message2) - 1u);
284         BOOST_CHECK(std::memcmp(buffer, message2, message_size) == 0);
285     }
286 }
287 
288 #if !defined(BOOST_LOG_NO_THREADS)
289 
290 namespace {
291 
292 const unsigned int message_count = 100000;
293 
multithreaded_message_passing_feeding_thread(const char * message,unsigned int & failure_count)294 void multithreaded_message_passing_feeding_thread(const char* message, unsigned int& failure_count)
295 {
296     const size_type len = static_cast< size_type >(std::strlen(message));
297     queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
298     for (unsigned int i = 0; i < message_count; ++i)
299     {
300         failure_count += queue.send(message, len) != queue_t::succeeded;
301     }
302 
303     boost::atomic_thread_fence(boost::memory_order_release);
304 }
305 
306 } // namespace
307 
BOOST_AUTO_TEST_CASE(multithreaded_message_passing)308 BOOST_AUTO_TEST_CASE(multithreaded_message_passing)
309 {
310     unsigned int failure_count1 = 0, failure_count2 = 0, failure_count3 = 0;
311     boost::atomic_thread_fence(boost::memory_order_release);
312 
313     boost::thread thread1(&multithreaded_message_passing_feeding_thread, "Thread 1", boost::ref(failure_count1));
314     boost::thread thread2(&multithreaded_message_passing_feeding_thread, "Thread 2", boost::ref(failure_count2));
315     boost::thread thread3(&multithreaded_message_passing_feeding_thread, "Thread 3", boost::ref(failure_count3));
316 
317     BOOST_TEST_PASSPOINT();
318 
319     queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
320     unsigned int receive_failures = 0, receive_corruptions = 0;
321     unsigned int message_count1 = 0, message_count2 = 0, message_count3 = 0;
322     std::string msg;
323 
324     BOOST_TEST_PASSPOINT();
325 
326     for (unsigned int i = 0; i < message_count * 3u; ++i)
327     {
328         msg.clear();
329         if (queue.receive(msg) == queue_t::succeeded)
330         {
331             if (msg == "Thread 1")
332                 ++message_count1;
333             else if (msg == "Thread 2")
334                 ++message_count2;
335             else if (msg == "Thread 3")
336                 ++message_count3;
337             else
338                 ++receive_corruptions;
339         }
340         else
341             ++receive_failures;
342     }
343 
344     BOOST_TEST_PASSPOINT();
345     thread1.join();
346 
347     BOOST_TEST_PASSPOINT();
348     thread2.join();
349 
350     BOOST_TEST_PASSPOINT();
351     thread3.join();
352 
353     boost::atomic_thread_fence(boost::memory_order_acquire);
354 
355     BOOST_CHECK_EQUAL(failure_count1, 0u);
356     BOOST_CHECK_EQUAL(message_count1, message_count);
357     BOOST_CHECK_EQUAL(failure_count2, 0u);
358     BOOST_CHECK_EQUAL(message_count2, message_count);
359     BOOST_CHECK_EQUAL(failure_count3, 0u);
360     BOOST_CHECK_EQUAL(message_count3, message_count);
361     BOOST_CHECK_EQUAL(receive_failures, 0u);
362     BOOST_CHECK_EQUAL(receive_corruptions, 0u);
363 }
364 
365 namespace {
366 
stop_reset_feeding_thread(queue_t & queue,queue_t::operation_result * results,unsigned int count)367 void stop_reset_feeding_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
368 {
369     for (unsigned int i = 0; i < count; ++i)
370     {
371         results[i] = queue.send(message1, sizeof(message1) - 1u);
372         if (results[i] != queue_t::succeeded)
373             break;
374     }
375 
376     boost::atomic_thread_fence(boost::memory_order_release);
377 }
378 
stop_reset_reading_thread(queue_t & queue,queue_t::operation_result * results,unsigned int count)379 void stop_reset_reading_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
380 {
381     std::string msg;
382     for (unsigned int i = 0; i < count; ++i)
383     {
384         msg.clear();
385         results[i] = queue.receive(msg);
386         if (results[i] != queue_t::succeeded)
387             break;
388     }
389 
390     boost::atomic_thread_fence(boost::memory_order_release);
391 }
392 
393 } // namespace
394 
BOOST_AUTO_TEST_CASE(stop_reset_local)395 BOOST_AUTO_TEST_CASE(stop_reset_local)
396 {
397     queue_t feeder_queue(boost::log::open_mode::open_or_create, ipc_queue_name, 1u, block_size);
398     queue_t::operation_result feeder_results[3];
399     queue_t reader_queue(boost::log::open_mode::open_only, ipc_queue_name);
400     queue_t::operation_result reader_results[3];
401 
402     std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
403     std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
404     boost::atomic_thread_fence(boost::memory_order_release);
405 
406     BOOST_TEST_PASSPOINT();
407 
408     // Case 1: Let the feeder block and then we unblock it with stop_local()
409     boost::thread feeder_thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 3);
410     boost::thread reader_thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 1);
411 
412     BOOST_TEST_PASSPOINT();
413 
414     reader_thread.join();
415     BOOST_TEST_PASSPOINT();
416     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
417 
418     BOOST_TEST_PASSPOINT();
419 
420     feeder_queue.stop_local();
421     BOOST_TEST_PASSPOINT();
422     feeder_thread.join();
423 
424     boost::atomic_thread_fence(boost::memory_order_acquire);
425 
426     BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
427     BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
428     BOOST_CHECK_EQUAL(feeder_results[2], queue_t::aborted);
429     BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
430 
431     // Reset the aborted queue
432     feeder_queue.reset_local();
433     feeder_queue.clear();
434 
435     std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
436     std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
437     boost::atomic_thread_fence(boost::memory_order_release);
438 
439     BOOST_TEST_PASSPOINT();
440 
441     // Case 2: Let the reader block and then we unblock it with stop_local()
442     boost::thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 1).swap(feeder_thread);
443     boost::thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 2).swap(reader_thread);
444 
445     BOOST_TEST_PASSPOINT();
446 
447     feeder_thread.join();
448     BOOST_TEST_PASSPOINT();
449     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
450 
451     BOOST_TEST_PASSPOINT();
452 
453     reader_queue.stop_local();
454     BOOST_TEST_PASSPOINT();
455     reader_thread.join();
456 
457     boost::atomic_thread_fence(boost::memory_order_acquire);
458 
459     BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
460     BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
461     BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
462     BOOST_CHECK_EQUAL(reader_results[1], queue_t::aborted);
463 }
464 
465 #endif // !defined(BOOST_LOG_NO_THREADS)
466 
467 #else // !defined(BOOST_LOG_WITHOUT_IPC)
468 
main()469 int main()
470 {
471     return 0;
472 }
473 
474 #endif // !defined(BOOST_LOG_WITHOUT_IPC)
475