• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //  Copyright (C) 2011-2013 Tim Blechmann
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See
4 //  accompanying file LICENSE_1_0.txt or copy at
5 //  http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <boost/lockfree/spsc_queue.hpp>
8 #include <boost/thread.hpp>
9 
10 #define BOOST_TEST_MAIN
11 #ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12 #include <boost/test/included/unit_test.hpp>
13 #else
14 #include <boost/test/unit_test.hpp>
15 #endif
16 
17 #include <iostream>
18 #include <memory>
19 
20 #include "test_helpers.hpp"
21 #include "test_common.hpp"
22 
23 using namespace boost;
24 using namespace boost::lockfree;
25 using namespace std;
26 
27 #ifndef BOOST_LOCKFREE_STRESS_TEST
28 static const boost::uint32_t nodes_per_thread = 100000;
29 #else
30 static const boost::uint32_t nodes_per_thread = 100000000;
31 #endif
32 
33 struct spsc_queue_tester
34 {
35     spsc_queue<int, capacity<128> > sf;
36 
37     boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
38 
39 // In VxWorks one RTP just supports 65535 objects
40 #ifndef __VXWORKS__
41     static_hashed_set<int, 1<<16 > working_set;
42 #else
43     static_hashed_set<int, 1<<15 > working_set;
44 #endif
45 
spsc_queue_testerspsc_queue_tester46     spsc_queue_tester(void):
47         spsc_queue_cnt(0), received_nodes(0)
48     {}
49 
addspsc_queue_tester50     void add(void)
51     {
52         for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
53             int id = generate_id<int>();
54             working_set.insert(id);
55 
56             while (sf.push(id) == false)
57             {}
58 
59             ++spsc_queue_cnt;
60         }
61         running = false;
62     }
63 
get_elementspsc_queue_tester64     bool get_element(void)
65     {
66         int data;
67         bool success = sf.pop(data);
68 
69         if (success) {
70             ++received_nodes;
71             --spsc_queue_cnt;
72             bool erased = working_set.erase(data);
73             assert(erased);
74             return true;
75         } else
76             return false;
77     }
78 
79     boost::lockfree::detail::atomic<bool> running;
80 
getspsc_queue_tester81     void get(void)
82     {
83         for(;;) {
84             bool success = get_element();
85             if (!running && !success)
86                 break;
87         }
88 
89         while ( get_element() );
90     }
91 
runspsc_queue_tester92     void run(void)
93     {
94         running = true;
95 
96         BOOST_REQUIRE(sf.empty());
97 
98         boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
99         boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
100         cout << "reader and writer threads created" << endl;
101 
102         writer.join();
103         cout << "writer threads joined. waiting for readers to finish" << endl;
104 
105         reader.join();
106 
107         BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
108         BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
109         BOOST_REQUIRE(sf.empty());
110         BOOST_REQUIRE(working_set.count_nodes() == 0);
111     }
112 };
113 
BOOST_AUTO_TEST_CASE(spsc_queue_test_caching)114 BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
115 {
116     boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
117     test1->run();
118 }
119 
120 struct spsc_queue_tester_buffering
121 {
122     spsc_queue<int, capacity<128> > sf;
123 
124     boost::lockfree::detail::atomic<long> spsc_queue_cnt;
125 
126 // In VxWorks one RTP just supports 65535 objects
127 #ifndef __VXWORKS__
128     static_hashed_set<int, 1<<16 > working_set;
129 #else
130     static_hashed_set<int, 1<<15 > working_set;
131 #endif
132 
133     boost::lockfree::detail::atomic<size_t> received_nodes;
134 
spsc_queue_tester_bufferingspsc_queue_tester_buffering135     spsc_queue_tester_buffering(void):
136         spsc_queue_cnt(0), received_nodes(0)
137     {}
138 
139     static const size_t buf_size = 5;
140 
addspsc_queue_tester_buffering141     void add(void)
142     {
143         boost::array<int, buf_size> input_buffer;
144         for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
145             for (size_t i = 0; i != buf_size; ++i) {
146                 int id = generate_id<int>();
147                 working_set.insert(id);
148                 input_buffer[i] = id;
149             }
150 
151             size_t pushed = 0;
152 
153             do {
154                 pushed += sf.push(input_buffer.c_array() + pushed,
155                                   input_buffer.size()    - pushed);
156             } while (pushed != buf_size);
157 
158             spsc_queue_cnt+=buf_size;
159         }
160         running = false;
161     }
162 
get_elementsspsc_queue_tester_buffering163     bool get_elements(void)
164     {
165         boost::array<int, buf_size> output_buffer;
166 
167         size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
168 
169         if (popd) {
170             received_nodes += popd;
171             spsc_queue_cnt -= popd;
172 
173             for (size_t i = 0; i != popd; ++i) {
174                 bool erased = working_set.erase(output_buffer[i]);
175                 assert(erased);
176             }
177 
178             return true;
179         } else
180             return false;
181     }
182 
183     boost::lockfree::detail::atomic<bool> running;
184 
getspsc_queue_tester_buffering185     void get(void)
186     {
187         for(;;) {
188             bool success = get_elements();
189             if (!running && !success)
190                 break;
191         }
192 
193         while ( get_elements() );
194     }
195 
runspsc_queue_tester_buffering196     void run(void)
197     {
198         running = true;
199 
200         boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
201         boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
202         cout << "reader and writer threads created" << endl;
203 
204         writer.join();
205         cout << "writer threads joined. waiting for readers to finish" << endl;
206 
207         reader.join();
208 
209         BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
210         BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
211         BOOST_REQUIRE(sf.empty());
212         BOOST_REQUIRE(working_set.count_nodes() == 0);
213     }
214 };
215 
216 
BOOST_AUTO_TEST_CASE(spsc_queue_test_buffering)217 BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
218 {
219     boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
220     test1->run();
221 }
222 
223