1 /*
2 * Copyright Lingxi Li 2015.
3 * Copyright Andrey Semashev 2016.
4 * Distributed under the Boost Software License, Version 1.0.
5 * (See accompanying file LICENSE_1_0.txt or copy at
6 * http://www.boost.org/LICENSE_1_0.txt)
7 */
8 /*!
9 * \file util_ipc_reliable_mq.cpp
10 * \author Lingxi Li
11 * \author Andrey Semashev
12 * \date 19.10.2015
13 *
14 * \brief The test verifies that \c ipc::reliable_message_queue works.
15 */
16
17 #if !defined(BOOST_LOG_WITHOUT_IPC)
18
19 #define BOOST_TEST_MODULE util_ipc_reliable_mq
20
21 #include <boost/log/utility/ipc/reliable_message_queue.hpp>
22 #include <boost/log/utility/ipc/object_name.hpp>
23 #include <boost/log/utility/permissions.hpp>
24 #include <boost/log/utility/open_mode.hpp>
25 #include <boost/log/exceptions.hpp>
26 #include <boost/test/unit_test.hpp>
27 #include <cstddef>
28 #include <cstring>
29 #include <string>
30 #include <vector>
31 #include <iostream>
32 #include <stdexcept>
33 #include <boost/move/utility_core.hpp>
34 #if !defined(BOOST_LOG_NO_THREADS)
35 #include <algorithm>
36 #include <boost/ref.hpp>
37 #include <boost/atomic/fences.hpp>
38 #include <boost/thread/thread.hpp>
39 #include <boost/chrono/duration.hpp>
40 #endif
41 #include "char_definitions.hpp"
42
43 typedef boost::log::ipc::reliable_message_queue queue_t;
44 typedef queue_t::size_type size_type;
45
46 const boost::log::ipc::object_name ipc_queue_name(boost::log::ipc::object_name::session, "boost_log_test_ipc_reliable_mq");
47 const unsigned int capacity = 512;
48 const size_type block_size = 1024;
49 const char message1[] = "Hello, world!";
50 const char message2[] = "Hello, the brand new world!";
51
BOOST_AUTO_TEST_CASE(basic_functionality)52 BOOST_AUTO_TEST_CASE(basic_functionality)
53 {
54 // Default constructor.
55 {
56 queue_t queue;
57 BOOST_CHECK(!queue.is_open());
58 }
59
60 // Do a remove in case if a previous test failed
61 queue_t::remove(ipc_queue_name);
62
63 // Opening a non-existing queue
64 try
65 {
66 queue_t queue(boost::log::open_mode::open_only, ipc_queue_name);
67 BOOST_FAIL("Non-existing queue open succeeded, although it shouldn't have");
68 }
69 catch (std::exception&)
70 {
71 BOOST_TEST_PASSPOINT();
72 }
73
74 // Create constructor and destructor.
75 {
76 queue_t queue(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
77 BOOST_CHECK(equal_strings(queue.name().c_str(), ipc_queue_name.c_str()));
78 BOOST_CHECK(queue.is_open());
79 BOOST_CHECK_EQUAL(queue.capacity(), capacity);
80 BOOST_CHECK_EQUAL(queue.block_size(), block_size);
81 }
82
83 // Creating a duplicate queue
84 try
85 {
86 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
87 queue_t queue_b(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
88 BOOST_FAIL("Creating a duplicate queue succeeded, although it shouldn't have");
89 }
90 catch (std::exception&)
91 {
92 BOOST_TEST_PASSPOINT();
93 }
94
95 // Opening an existing queue
96 {
97 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
98 BOOST_CHECK(queue_a.is_open());
99
100 queue_t queue_b(boost::log::open_mode::open_or_create, ipc_queue_name, capacity * 2u, block_size * 2u); // queue geometry differs from the existing queue
101 BOOST_CHECK(queue_b.is_open());
102 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
103 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
104 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
105
106 queue_t queue_c(boost::log::open_mode::open_only, ipc_queue_name);
107 BOOST_CHECK(queue_c.is_open());
108 BOOST_CHECK(equal_strings(queue_c.name().c_str(), ipc_queue_name.c_str()));
109 BOOST_CHECK_EQUAL(queue_c.capacity(), capacity);
110 BOOST_CHECK_EQUAL(queue_c.block_size(), block_size);
111 }
112 // Closing a queue
113 {
114 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
115 BOOST_CHECK(queue_a.is_open());
116 queue_a.close();
117 BOOST_CHECK(!queue_a.is_open());
118 // Duplicate close()
119 queue_a.close();
120 BOOST_CHECK(!queue_a.is_open());
121 }
122 // Move constructor.
123 {
124 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
125 queue_t queue_b(boost::move(queue_a));
126 BOOST_CHECK(!queue_a.is_open());
127 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
128 BOOST_CHECK(queue_b.is_open());
129 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
130 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
131 }
132 // Move assignment operator.
133 {
134 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
135 queue_t queue_b;
136 queue_b = boost::move(queue_a);
137 BOOST_CHECK(!queue_a.is_open());
138 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
139 BOOST_CHECK(queue_b.is_open());
140 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
141 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
142 }
143 // Member and non-member swaps.
144 {
145 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
146 queue_a.swap(queue_a);
147 BOOST_CHECK(queue_a.is_open());
148 BOOST_CHECK(equal_strings(queue_a.name().c_str(), ipc_queue_name.c_str()));
149 BOOST_CHECK_EQUAL(queue_a.capacity(), capacity);
150 BOOST_CHECK_EQUAL(queue_a.block_size(), block_size);
151
152 queue_t queue_b;
153 swap(queue_a, queue_b);
154 BOOST_CHECK(!queue_a.is_open());
155 BOOST_CHECK(queue_b.is_open());
156 BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
157 BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
158 BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
159 }
160 }
161
BOOST_AUTO_TEST_CASE(message_passing)162 BOOST_AUTO_TEST_CASE(message_passing)
163 {
164 // try_send() and try_receive()
165 {
166 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
167 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
168 BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
169 BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
170 char buffer[block_size] = {};
171 size_type message_size = 0u;
172 BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
173 BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
174 BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
175 BOOST_CHECK(!queue_b.try_receive(buffer, sizeof(buffer), message_size));
176
177 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
178 std::string msg;
179 BOOST_CHECK(queue_b.try_receive(msg));
180 BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
181 BOOST_CHECK_EQUAL(msg, message2);
182
183 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
184 std::vector< unsigned char > buf;
185 BOOST_CHECK(queue_b.try_receive(buf));
186 BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
187 BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
188 }
189
190 // send() and receive() without blocking
191 {
192 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
193 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
194 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
195 char buffer[block_size] = {};
196 size_type message_size = 0u;
197 BOOST_CHECK(queue_b.receive(buffer, sizeof(buffer), message_size) == queue_t::succeeded);
198 BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
199 BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
200
201 BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
202 std::string msg;
203 BOOST_CHECK(queue_b.receive(msg) == queue_t::succeeded);
204 BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
205 BOOST_CHECK_EQUAL(msg, message2);
206
207 BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
208 std::vector< unsigned char > buf;
209 BOOST_CHECK(queue_b.receive(buf) == queue_t::succeeded);
210 BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
211 BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
212 }
213
214 // send() with an error code on overflow
215 {
216 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::fail_on_overflow);
217 BOOST_TEST_PASSPOINT();
218 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
219 BOOST_TEST_PASSPOINT();
220
221 queue_t::operation_result res = queue_a.send(message1, sizeof(message1) - 1u);
222 BOOST_CHECK_EQUAL(res, queue_t::no_space);
223 }
224
225 // send() with an exception on overflow
226 {
227 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::throw_on_overflow);
228 BOOST_TEST_PASSPOINT();
229 BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
230 BOOST_TEST_PASSPOINT();
231 try
232 {
233 queue_a.send(message1, sizeof(message1) - 1u);
234 BOOST_FAIL("Owerflowing the queue succeeded, although it shouldn't have");
235 }
236 catch (boost::log::capacity_limit_reached&)
237 {
238 BOOST_TEST_PASSPOINT();
239 }
240 }
241
242 // send() and receive() for messages larger than block_size. The message size and queue capacity below are such
243 // that the last enqueued message is expected to be split in the queue storage.
244 {
245 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 5u, block_size);
246 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
247
248 const size_type message_size = block_size * 3u / 2u;
249 std::vector< unsigned char > send_data;
250 send_data.resize(message_size);
251 for (unsigned int i = 0; i < message_size; ++i)
252 send_data[i] = static_cast< unsigned char >(i & 0xFF);
253
254 BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
255
256 for (unsigned int i = 0; i < 3; ++i)
257 {
258 BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
259
260 std::vector< unsigned char > receive_data;
261 BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
262 BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
263 }
264
265 std::vector< unsigned char > receive_data;
266 BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
267 BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
268 }
269
270 // clear()
271 {
272 queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
273 queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
274 BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
275 BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
276
277 queue_a.clear();
278
279 BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
280 char buffer[block_size] = {};
281 size_type message_size = 0u;
282 BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
283 BOOST_CHECK_EQUAL(message_size, sizeof(message2) - 1u);
284 BOOST_CHECK(std::memcmp(buffer, message2, message_size) == 0);
285 }
286 }
287
288 #if !defined(BOOST_LOG_NO_THREADS)
289
290 namespace {
291
292 const unsigned int message_count = 100000;
293
multithreaded_message_passing_feeding_thread(const char * message,unsigned int & failure_count)294 void multithreaded_message_passing_feeding_thread(const char* message, unsigned int& failure_count)
295 {
296 const size_type len = static_cast< size_type >(std::strlen(message));
297 queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
298 for (unsigned int i = 0; i < message_count; ++i)
299 {
300 failure_count += queue.send(message, len) != queue_t::succeeded;
301 }
302
303 boost::atomic_thread_fence(boost::memory_order_release);
304 }
305
306 } // namespace
307
BOOST_AUTO_TEST_CASE(multithreaded_message_passing)308 BOOST_AUTO_TEST_CASE(multithreaded_message_passing)
309 {
310 unsigned int failure_count1 = 0, failure_count2 = 0, failure_count3 = 0;
311 boost::atomic_thread_fence(boost::memory_order_release);
312
313 boost::thread thread1(&multithreaded_message_passing_feeding_thread, "Thread 1", boost::ref(failure_count1));
314 boost::thread thread2(&multithreaded_message_passing_feeding_thread, "Thread 2", boost::ref(failure_count2));
315 boost::thread thread3(&multithreaded_message_passing_feeding_thread, "Thread 3", boost::ref(failure_count3));
316
317 BOOST_TEST_PASSPOINT();
318
319 queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
320 unsigned int receive_failures = 0, receive_corruptions = 0;
321 unsigned int message_count1 = 0, message_count2 = 0, message_count3 = 0;
322 std::string msg;
323
324 BOOST_TEST_PASSPOINT();
325
326 for (unsigned int i = 0; i < message_count * 3u; ++i)
327 {
328 msg.clear();
329 if (queue.receive(msg) == queue_t::succeeded)
330 {
331 if (msg == "Thread 1")
332 ++message_count1;
333 else if (msg == "Thread 2")
334 ++message_count2;
335 else if (msg == "Thread 3")
336 ++message_count3;
337 else
338 ++receive_corruptions;
339 }
340 else
341 ++receive_failures;
342 }
343
344 BOOST_TEST_PASSPOINT();
345 thread1.join();
346
347 BOOST_TEST_PASSPOINT();
348 thread2.join();
349
350 BOOST_TEST_PASSPOINT();
351 thread3.join();
352
353 boost::atomic_thread_fence(boost::memory_order_acquire);
354
355 BOOST_CHECK_EQUAL(failure_count1, 0u);
356 BOOST_CHECK_EQUAL(message_count1, message_count);
357 BOOST_CHECK_EQUAL(failure_count2, 0u);
358 BOOST_CHECK_EQUAL(message_count2, message_count);
359 BOOST_CHECK_EQUAL(failure_count3, 0u);
360 BOOST_CHECK_EQUAL(message_count3, message_count);
361 BOOST_CHECK_EQUAL(receive_failures, 0u);
362 BOOST_CHECK_EQUAL(receive_corruptions, 0u);
363 }
364
365 namespace {
366
stop_reset_feeding_thread(queue_t & queue,queue_t::operation_result * results,unsigned int count)367 void stop_reset_feeding_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
368 {
369 for (unsigned int i = 0; i < count; ++i)
370 {
371 results[i] = queue.send(message1, sizeof(message1) - 1u);
372 if (results[i] != queue_t::succeeded)
373 break;
374 }
375
376 boost::atomic_thread_fence(boost::memory_order_release);
377 }
378
stop_reset_reading_thread(queue_t & queue,queue_t::operation_result * results,unsigned int count)379 void stop_reset_reading_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
380 {
381 std::string msg;
382 for (unsigned int i = 0; i < count; ++i)
383 {
384 msg.clear();
385 results[i] = queue.receive(msg);
386 if (results[i] != queue_t::succeeded)
387 break;
388 }
389
390 boost::atomic_thread_fence(boost::memory_order_release);
391 }
392
393 } // namespace
394
BOOST_AUTO_TEST_CASE(stop_reset_local)395 BOOST_AUTO_TEST_CASE(stop_reset_local)
396 {
397 queue_t feeder_queue(boost::log::open_mode::open_or_create, ipc_queue_name, 1u, block_size);
398 queue_t::operation_result feeder_results[3];
399 queue_t reader_queue(boost::log::open_mode::open_only, ipc_queue_name);
400 queue_t::operation_result reader_results[3];
401
402 std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
403 std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
404 boost::atomic_thread_fence(boost::memory_order_release);
405
406 BOOST_TEST_PASSPOINT();
407
408 // Case 1: Let the feeder block and then we unblock it with stop_local()
409 boost::thread feeder_thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 3);
410 boost::thread reader_thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 1);
411
412 BOOST_TEST_PASSPOINT();
413
414 reader_thread.join();
415 BOOST_TEST_PASSPOINT();
416 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
417
418 BOOST_TEST_PASSPOINT();
419
420 feeder_queue.stop_local();
421 BOOST_TEST_PASSPOINT();
422 feeder_thread.join();
423
424 boost::atomic_thread_fence(boost::memory_order_acquire);
425
426 BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
427 BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
428 BOOST_CHECK_EQUAL(feeder_results[2], queue_t::aborted);
429 BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
430
431 // Reset the aborted queue
432 feeder_queue.reset_local();
433 feeder_queue.clear();
434
435 std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
436 std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
437 boost::atomic_thread_fence(boost::memory_order_release);
438
439 BOOST_TEST_PASSPOINT();
440
441 // Case 2: Let the reader block and then we unblock it with stop_local()
442 boost::thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 1).swap(feeder_thread);
443 boost::thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 2).swap(reader_thread);
444
445 BOOST_TEST_PASSPOINT();
446
447 feeder_thread.join();
448 BOOST_TEST_PASSPOINT();
449 boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
450
451 BOOST_TEST_PASSPOINT();
452
453 reader_queue.stop_local();
454 BOOST_TEST_PASSPOINT();
455 reader_thread.join();
456
457 boost::atomic_thread_fence(boost::memory_order_acquire);
458
459 BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
460 BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
461 BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
462 BOOST_CHECK_EQUAL(reader_results[1], queue_t::aborted);
463 }
464
465 #endif // !defined(BOOST_LOG_NO_THREADS)
466
467 #else // !defined(BOOST_LOG_WITHOUT_IPC)
468
main()469 int main()
470 {
471 return 0;
472 }
473
474 #endif // !defined(BOOST_LOG_WITHOUT_IPC)
475