1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10
11 #include <boost/interprocess/ipc/message_queue.hpp>
12 #include <boost/interprocess/managed_external_buffer.hpp>
13 #include <boost/interprocess/managed_heap_memory.hpp>
14 #include <boost/interprocess/containers/map.hpp>
15 #include <boost/interprocess/containers/set.hpp>
16 #include <boost/interprocess/allocators/node_allocator.hpp>
17 #include <boost/interprocess/detail/os_thread_functions.hpp>
18 // intrusive/detail
19 #include <boost/intrusive/detail/minimal_pair_header.hpp>
20 #include <boost/intrusive/detail/minimal_less_equal_header.hpp>
21
22 #include <boost/move/unique_ptr.hpp>
23
24 #include <cstddef>
25 #include <memory>
26 #include <iostream>
27 #include <vector>
28 #include <stdexcept>
29 #include <limits>
30
31 #include "get_process_id_name.hpp"
32
33 ////////////////////////////////////////////////////////////////////////////////
34 // //
35 // This example tests the process shared message queue. //
36 // //
37 ////////////////////////////////////////////////////////////////////////////////
38
39 using namespace boost::interprocess;
40
41 //This test inserts messages with different priority and marks them with a
42 //time-stamp to check if receiver obtains highest priority messages first and
43 //messages with same priority are received in fifo order
test_priority_order()44 bool test_priority_order()
45 {
46 message_queue::remove(test::get_process_id_name());
47 {
48 message_queue mq1
49 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
50 mq2
51 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
52
53 //We test that the queue is ordered by priority and in the
54 //same priority, is a FIFO
55 message_queue::size_type recvd = 0;
56 unsigned int priority = 0;
57 std::size_t tstamp;
58 unsigned int priority_prev;
59 std::size_t tstamp_prev;
60
61 //We will send 100 message with priority 0-9
62 //The message will contain the timestamp of the message
63 for(std::size_t i = 0; i < 100; ++i){
64 tstamp = i;
65 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
66 }
67
68 priority_prev = (std::numeric_limits<unsigned int>::max)();
69 tstamp_prev = 0;
70
71 //Receive all messages and test those are ordered
72 //by priority and by FIFO in the same priority
73 for(std::size_t i = 0; i < 100; ++i){
74 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
75 if(priority > priority_prev)
76 return false;
77 if(priority == priority_prev &&
78 tstamp <= tstamp_prev){
79 return false;
80 }
81 priority_prev = priority;
82 tstamp_prev = tstamp;
83 }
84
85 //Now retry it with different priority order
86 for(std::size_t i = 0; i < 100; ++i){
87 tstamp = i;
88 mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
89 }
90
91 priority_prev = (std::numeric_limits<unsigned int>::max)();
92 tstamp_prev = 0;
93
94 //Receive all messages and test those are ordered
95 //by priority and by FIFO in the same priority
96 for(std::size_t i = 0; i < 100; ++i){
97 mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
98 if(priority > priority_prev)
99 return false;
100 if(priority == priority_prev &&
101 tstamp <= tstamp_prev){
102 return false;
103 }
104 priority_prev = priority;
105 tstamp_prev = tstamp;
106 }
107 }
108 message_queue::remove(test::get_process_id_name());
109 return true;
110 }
111
112 //[message_queue_test_test_serialize_db
113 //This test creates a in memory data-base using Interprocess machinery and
114 //serializes it through a message queue. Then rebuilds the data-base in
115 //another buffer and checks it against the original data-base
test_serialize_db()116 bool test_serialize_db()
117 {
118 //Typedef data to create a Interprocess map
119 typedef std::pair<const std::size_t, std::size_t> MyPair;
120 typedef std::less<std::size_t> MyLess;
121 typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
122 node_allocator_t;
123 typedef map<std::size_t,
124 std::size_t,
125 std::less<std::size_t>,
126 node_allocator_t>
127 MyMap;
128
129 //Some constants
130 const std::size_t BufferSize = 65536;
131 const std::size_t MaxMsgSize = 100;
132
133 //Allocate a memory buffer to hold the destiny database using vector<char>
134 std::vector<char> buffer_destiny(BufferSize, 0);
135
136 message_queue::remove(test::get_process_id_name());
137 {
138 //Create the message-queues
139 message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
140
141 //Open previously created message-queue simulating other process
142 message_queue mq2(open_only, test::get_process_id_name());
143
144 //A managed heap memory to create the origin database
145 managed_heap_memory db_origin(buffer_destiny.size());
146
147 //Construct the map in the first buffer
148 MyMap *map1 = db_origin.construct<MyMap>("MyMap")
149 (MyLess(),
150 db_origin.get_segment_manager());
151 if(!map1)
152 return false;
153
154 //Fill map1 until is full
155 try{
156 std::size_t i = 0;
157 while(1){
158 (*map1)[i] = i;
159 ++i;
160 }
161 }
162 catch(boost::interprocess::bad_alloc &){}
163
164 //Data control data sending through the message queue
165 std::size_t sent = 0;
166 message_queue::size_type recvd = 0;
167 message_queue::size_type total_recvd = 0;
168 unsigned int priority;
169
170 //Send whole first buffer through the mq1, read it
171 //through mq2 to the second buffer
172 while(1){
173 //Send a fragment of buffer1 through mq1
174 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
175 MaxMsgSize : (db_origin.get_size() - sent);
176 mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
177 , bytes_to_send
178 , 0);
179 sent += bytes_to_send;
180 //Receive the fragment through mq2 to buffer_destiny
181 mq2.receive( &buffer_destiny[total_recvd]
182 , BufferSize - recvd
183 , recvd
184 , priority);
185 total_recvd += recvd;
186
187 //Check if we have received all the buffer
188 if(total_recvd == BufferSize){
189 break;
190 }
191 }
192
193 //The buffer will contain a copy of the original database
194 //so let's interpret the buffer with managed_external_buffer
195 managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
196
197 //Let's find the map
198 std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
199 MyMap *map2 = ret.first;
200
201 //Check if we have found it
202 if(!map2){
203 return false;
204 }
205
206 //Check if it is a single variable (not an array)
207 if(ret.second != 1){
208 return false;
209 }
210
211 //Now let's compare size
212 if(map1->size() != map2->size()){
213 return false;
214 }
215
216 //Now let's compare all db values
217 MyMap::size_type num_elements = map1->size();
218 for(std::size_t i = 0; i < num_elements; ++i){
219 if((*map1)[i] != (*map2)[i]){
220 return false;
221 }
222 }
223
224 //Destroy maps from db-s
225 db_origin.destroy_ptr(map1);
226 db_destiny.destroy_ptr(map2);
227 }
228 message_queue::remove(test::get_process_id_name());
229 return true;
230 }
231 //]
232
233 static const int MsgSize = 10;
234 static const int NumMsg = 1000;
235 static char msgsend [10];
236 static char msgrecv [10];
237
238 static boost::interprocess::message_queue *pmessage_queue;
239
receiver()240 void receiver()
241 {
242 boost::interprocess::message_queue::size_type recvd_size;
243 unsigned int priority;
244 int nummsg = NumMsg;
245
246 while(nummsg--){
247 pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
248 }
249 }
250
test_buffer_overflow()251 bool test_buffer_overflow()
252 {
253 boost::interprocess::message_queue::remove(test::get_process_id_name());
254 {
255 boost::movelib::unique_ptr<boost::interprocess::message_queue>
256 ptr(new boost::interprocess::message_queue
257 (create_only, test::get_process_id_name(), 10, 10));
258 pmessage_queue = ptr.get();
259
260 //Launch the receiver thread
261 boost::interprocess::ipcdetail::OS_thread_t thread;
262 boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
263 boost::interprocess::ipcdetail::thread_yield();
264
265 int nummsg = NumMsg;
266
267 while(nummsg--){
268 pmessage_queue->send(msgsend, MsgSize, 0);
269 }
270
271 boost::interprocess::ipcdetail::thread_join(thread);
272 }
273 boost::interprocess::message_queue::remove(test::get_process_id_name());
274 return true;
275 }
276
277
278 //////////////////////////////////////////////////////////////////////////////
279 //
280 // test_multi_sender_receiver is based on Alexander (aalutov's)
281 // testcase for ticket #9221. Many thanks.
282 //
283 //////////////////////////////////////////////////////////////////////////////
284
285 static boost::interprocess::message_queue *global_queue = 0;
286 //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
287 static const int MULTI_NUM_MSG_PER_SENDER = 10000;
288 //Message queue message capacity
289 static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
290 //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
291 static const int MULTI_THREAD_COUNT = 10;
292
multisend()293 static void multisend()
294 {
295 char buff;
296 for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
297 global_queue->send(&buff, 1, 0);
298 }
299 global_queue->send(&buff, 0, 0);
300 //std::cout<<"writer thread complete"<<std::endl;
301 }
302
multireceive()303 static void multireceive()
304 {
305 char buff;
306 size_t size;
307 int received_msgs = 0;
308 unsigned int priority;
309 do {
310 global_queue->receive(&buff, 1, size, priority);
311 ++received_msgs;
312 } while (size > 0);
313 --received_msgs;
314 //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
315 }
316
317
test_multi_sender_receiver()318 bool test_multi_sender_receiver()
319 {
320 bool ret = true;
321 //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
322 try {
323 boost::interprocess::message_queue::remove(test::get_process_id_name());
324 boost::interprocess::message_queue mq
325 (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
326 global_queue = &mq;
327 std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
328
329 //Launch senders receiver thread
330 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
331 boost::interprocess::ipcdetail::thread_launch
332 (threads[i], &multisend);
333 }
334
335 for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
336 boost::interprocess::ipcdetail::thread_launch
337 (threads[MULTI_THREAD_COUNT+i], &multireceive);
338 }
339
340 for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
341 boost::interprocess::ipcdetail::thread_join(threads[i]);
342 //std::cout << "Joined thread " << i << std::endl;
343 }
344 }
345 catch (std::exception &e) {
346 std::cout << "error " << e.what() << std::endl;
347 ret = false;
348 }
349 boost::interprocess::message_queue::remove(test::get_process_id_name());
350 return ret;
351 }
352
353
main()354 int main ()
355 {
356 if(!test_priority_order()){
357 return 1;
358 }
359
360 if(!test_serialize_db()){
361 return 1;
362 }
363
364 if(!test_buffer_overflow()){
365 return 1;
366 }
367
368 if(!test_multi_sender_receiver()){
369 return 1;
370 }
371
372 return 0;
373 }
374
375