• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10 
11 #include <boost/interprocess/ipc/message_queue.hpp>
12 #include <boost/interprocess/managed_external_buffer.hpp>
13 #include <boost/interprocess/managed_heap_memory.hpp>
14 #include <boost/interprocess/containers/map.hpp>
15 #include <boost/interprocess/containers/set.hpp>
16 #include <boost/interprocess/allocators/node_allocator.hpp>
17 #include <boost/interprocess/detail/os_thread_functions.hpp>
18 // intrusive/detail
19 #include <boost/intrusive/detail/minimal_pair_header.hpp>
20 #include <boost/intrusive/detail/minimal_less_equal_header.hpp>
21 
22 #include <boost/move/unique_ptr.hpp>
23 
24 #include <cstddef>
25 #include <memory>
26 #include <iostream>
27 #include <vector>
28 #include <stdexcept>
29 #include <limits>
30 
31 #include "get_process_id_name.hpp"
32 
33 ////////////////////////////////////////////////////////////////////////////////
34 //                                                                            //
35 //  This example tests the process shared message queue.                      //
36 //                                                                            //
37 ////////////////////////////////////////////////////////////////////////////////
38 
39 using namespace boost::interprocess;
40 
41 //This test inserts messages with different priority and marks them with a
42 //time-stamp to check if receiver obtains highest priority messages first and
43 //messages with same priority are received in fifo order
test_priority_order()44 bool test_priority_order()
45 {
46    message_queue::remove(test::get_process_id_name());
47    {
48       message_queue mq1
49          (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
50          mq2
51          (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
52 
53       //We test that the queue is ordered by priority and in the
54       //same priority, is a FIFO
55       message_queue::size_type recvd = 0;
56       unsigned int priority = 0;
57       std::size_t tstamp;
58       unsigned int priority_prev;
59       std::size_t  tstamp_prev;
60 
61       //We will send 100 message with priority 0-9
62       //The message will contain the timestamp of the message
63       for(std::size_t i = 0; i < 100; ++i){
64          tstamp = i;
65          mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
66       }
67 
68       priority_prev = (std::numeric_limits<unsigned int>::max)();
69       tstamp_prev = 0;
70 
71       //Receive all messages and test those are ordered
72       //by priority and by FIFO in the same priority
73       for(std::size_t i = 0; i < 100; ++i){
74          mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
75          if(priority > priority_prev)
76             return false;
77          if(priority == priority_prev &&
78             tstamp   <= tstamp_prev){
79             return false;
80          }
81          priority_prev  = priority;
82          tstamp_prev    = tstamp;
83       }
84 
85       //Now retry it with different priority order
86       for(std::size_t i = 0; i < 100; ++i){
87          tstamp = i;
88          mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
89       }
90 
91       priority_prev = (std::numeric_limits<unsigned int>::max)();
92       tstamp_prev = 0;
93 
94       //Receive all messages and test those are ordered
95       //by priority and by FIFO in the same priority
96       for(std::size_t i = 0; i < 100; ++i){
97          mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
98          if(priority > priority_prev)
99             return false;
100          if(priority == priority_prev &&
101             tstamp   <= tstamp_prev){
102             return false;
103          }
104          priority_prev  = priority;
105          tstamp_prev    = tstamp;
106       }
107    }
108    message_queue::remove(test::get_process_id_name());
109    return true;
110 }
111 
112 //[message_queue_test_test_serialize_db
113 //This test creates a in memory data-base using Interprocess machinery and
114 //serializes it through a message queue. Then rebuilds the data-base in
115 //another buffer and checks it against the original data-base
test_serialize_db()116 bool test_serialize_db()
117 {
118    //Typedef data to create a Interprocess map
119    typedef std::pair<const std::size_t, std::size_t> MyPair;
120    typedef std::less<std::size_t>   MyLess;
121    typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
122       node_allocator_t;
123    typedef map<std::size_t,
124                std::size_t,
125                std::less<std::size_t>,
126                node_allocator_t>
127                MyMap;
128 
129    //Some constants
130    const std::size_t BufferSize  = 65536;
131    const std::size_t MaxMsgSize  = 100;
132 
133    //Allocate a memory buffer to hold the destiny database using vector<char>
134    std::vector<char> buffer_destiny(BufferSize, 0);
135 
136    message_queue::remove(test::get_process_id_name());
137    {
138       //Create the message-queues
139       message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
140 
141       //Open previously created message-queue simulating other process
142       message_queue mq2(open_only, test::get_process_id_name());
143 
144       //A managed heap memory to create the origin database
145       managed_heap_memory db_origin(buffer_destiny.size());
146 
147       //Construct the map in the first buffer
148       MyMap *map1 = db_origin.construct<MyMap>("MyMap")
149                                        (MyLess(),
150                                        db_origin.get_segment_manager());
151       if(!map1)
152          return false;
153 
154       //Fill map1 until is full
155       try{
156          std::size_t i = 0;
157          while(1){
158             (*map1)[i] = i;
159             ++i;
160          }
161       }
162       catch(boost::interprocess::bad_alloc &){}
163 
164       //Data control data sending through the message queue
165       std::size_t sent = 0;
166       message_queue::size_type recvd = 0;
167       message_queue::size_type total_recvd = 0;
168       unsigned int priority;
169 
170       //Send whole first buffer through the mq1, read it
171       //through mq2 to the second buffer
172       while(1){
173          //Send a fragment of buffer1 through mq1
174        std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
175                                        MaxMsgSize : (db_origin.get_size() - sent);
176          mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
177                , bytes_to_send
178                , 0);
179          sent += bytes_to_send;
180          //Receive the fragment through mq2 to buffer_destiny
181        mq2.receive( &buffer_destiny[total_recvd]
182                 , BufferSize - recvd
183                   , recvd
184                   , priority);
185          total_recvd += recvd;
186 
187          //Check if we have received all the buffer
188          if(total_recvd == BufferSize){
189             break;
190          }
191       }
192 
193       //The buffer will contain a copy of the original database
194       //so let's interpret the buffer with managed_external_buffer
195       managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
196 
197       //Let's find the map
198       std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
199       MyMap *map2 = ret.first;
200 
201       //Check if we have found it
202       if(!map2){
203          return false;
204       }
205 
206       //Check if it is a single variable (not an array)
207       if(ret.second != 1){
208          return false;
209       }
210 
211       //Now let's compare size
212       if(map1->size() != map2->size()){
213          return false;
214       }
215 
216       //Now let's compare all db values
217      MyMap::size_type num_elements = map1->size();
218      for(std::size_t i = 0; i < num_elements; ++i){
219          if((*map1)[i] != (*map2)[i]){
220             return false;
221          }
222       }
223 
224       //Destroy maps from db-s
225       db_origin.destroy_ptr(map1);
226       db_destiny.destroy_ptr(map2);
227    }
228    message_queue::remove(test::get_process_id_name());
229    return true;
230 }
231 //]
232 
233 static const int MsgSize = 10;
234 static const int NumMsg  = 1000;
235 static char msgsend [10];
236 static char msgrecv [10];
237 
238 static boost::interprocess::message_queue *pmessage_queue;
239 
receiver()240 void receiver()
241 {
242    boost::interprocess::message_queue::size_type recvd_size;
243    unsigned int priority;
244    int nummsg = NumMsg;
245 
246    while(nummsg--){
247       pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
248    }
249 }
250 
test_buffer_overflow()251 bool test_buffer_overflow()
252 {
253    boost::interprocess::message_queue::remove(test::get_process_id_name());
254    {
255       boost::movelib::unique_ptr<boost::interprocess::message_queue>
256          ptr(new boost::interprocess::message_queue
257                (create_only, test::get_process_id_name(), 10, 10));
258       pmessage_queue = ptr.get();
259 
260       //Launch the receiver thread
261       boost::interprocess::ipcdetail::OS_thread_t thread;
262       boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
263       boost::interprocess::ipcdetail::thread_yield();
264 
265       int nummsg = NumMsg;
266 
267       while(nummsg--){
268          pmessage_queue->send(msgsend, MsgSize, 0);
269       }
270 
271       boost::interprocess::ipcdetail::thread_join(thread);
272    }
273    boost::interprocess::message_queue::remove(test::get_process_id_name());
274    return true;
275 }
276 
277 
278 //////////////////////////////////////////////////////////////////////////////
279 //
280 // test_multi_sender_receiver is based on Alexander (aalutov's)
281 // testcase for ticket #9221. Many thanks.
282 //
283 //////////////////////////////////////////////////////////////////////////////
284 
285 static boost::interprocess::message_queue *global_queue = 0;
286 //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
287 static const int MULTI_NUM_MSG_PER_SENDER = 10000;
288 //Message queue message capacity
289 static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
290 //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
291 static const int MULTI_THREAD_COUNT = 10;
292 
multisend()293 static void multisend()
294 {
295    char buff;
296    for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
297       global_queue->send(&buff, 1, 0);
298    }
299    global_queue->send(&buff, 0, 0);
300    //std::cout<<"writer thread complete"<<std::endl;
301 }
302 
multireceive()303 static void multireceive()
304 {
305    char buff;
306    size_t size;
307    int received_msgs = 0;
308    unsigned int priority;
309    do {
310       global_queue->receive(&buff, 1, size, priority);
311       ++received_msgs;
312    } while (size > 0);
313    --received_msgs;
314    //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
315 }
316 
317 
test_multi_sender_receiver()318 bool test_multi_sender_receiver()
319 {
320    bool ret = true;
321    //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
322    try {
323       boost::interprocess::message_queue::remove(test::get_process_id_name());
324       boost::interprocess::message_queue mq
325          (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
326       global_queue = &mq;
327       std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
328 
329       //Launch senders receiver thread
330       for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
331          boost::interprocess::ipcdetail::thread_launch
332             (threads[i], &multisend);
333       }
334 
335       for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
336          boost::interprocess::ipcdetail::thread_launch
337             (threads[MULTI_THREAD_COUNT+i], &multireceive);
338       }
339 
340       for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
341          boost::interprocess::ipcdetail::thread_join(threads[i]);
342          //std::cout << "Joined thread " << i << std::endl;
343       }
344    }
345    catch (std::exception &e) {
346       std::cout << "error " << e.what() << std::endl;
347       ret = false;
348    }
349    boost::interprocess::message_queue::remove(test::get_process_id_name());
350    return ret;
351 }
352 
353 
main()354 int main ()
355 {
356    if(!test_priority_order()){
357       return 1;
358    }
359 
360    if(!test_serialize_db()){
361       return 1;
362    }
363 
364    if(!test_buffer_overflow()){
365       return 1;
366    }
367 
368    if(!test_multi_sender_receiver()){
369       return 1;
370    }
371 
372    return 0;
373 }
374 
375