1 // Copyright (C) 2011-2013 Tim Blechmann
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See
4 // accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 #include <boost/lockfree/spsc_queue.hpp>
8 #include <boost/thread.hpp>
9
10 #define BOOST_TEST_MAIN
11 #ifdef BOOST_LOCKFREE_INCLUDE_TESTS
12 #include <boost/test/included/unit_test.hpp>
13 #else
14 #include <boost/test/unit_test.hpp>
15 #endif
16
17 #include <iostream>
18 #include <memory>
19
20 #include "test_helpers.hpp"
21 #include "test_common.hpp"
22
23 using namespace boost;
24 using namespace boost::lockfree;
25 using namespace std;
26
27 #ifndef BOOST_LOCKFREE_STRESS_TEST
28 static const boost::uint32_t nodes_per_thread = 100000;
29 #else
30 static const boost::uint32_t nodes_per_thread = 100000000;
31 #endif
32
33 struct spsc_queue_tester
34 {
35 spsc_queue<int, capacity<128> > sf;
36
37 boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
38
39 // In VxWorks one RTP just supports 65535 objects
40 #ifndef __VXWORKS__
41 static_hashed_set<int, 1<<16 > working_set;
42 #else
43 static_hashed_set<int, 1<<15 > working_set;
44 #endif
45
spsc_queue_testerspsc_queue_tester46 spsc_queue_tester(void):
47 spsc_queue_cnt(0), received_nodes(0)
48 {}
49
addspsc_queue_tester50 void add(void)
51 {
52 for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
53 int id = generate_id<int>();
54 working_set.insert(id);
55
56 while (sf.push(id) == false)
57 {}
58
59 ++spsc_queue_cnt;
60 }
61 running = false;
62 }
63
get_elementspsc_queue_tester64 bool get_element(void)
65 {
66 int data;
67 bool success = sf.pop(data);
68
69 if (success) {
70 ++received_nodes;
71 --spsc_queue_cnt;
72 bool erased = working_set.erase(data);
73 assert(erased);
74 return true;
75 } else
76 return false;
77 }
78
79 boost::lockfree::detail::atomic<bool> running;
80
getspsc_queue_tester81 void get(void)
82 {
83 for(;;) {
84 bool success = get_element();
85 if (!running && !success)
86 break;
87 }
88
89 while ( get_element() );
90 }
91
runspsc_queue_tester92 void run(void)
93 {
94 running = true;
95
96 BOOST_REQUIRE(sf.empty());
97
98 boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
99 boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
100 cout << "reader and writer threads created" << endl;
101
102 writer.join();
103 cout << "writer threads joined. waiting for readers to finish" << endl;
104
105 reader.join();
106
107 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
108 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
109 BOOST_REQUIRE(sf.empty());
110 BOOST_REQUIRE(working_set.count_nodes() == 0);
111 }
112 };
113
BOOST_AUTO_TEST_CASE(spsc_queue_test_caching)114 BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
115 {
116 boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
117 test1->run();
118 }
119
120 struct spsc_queue_tester_buffering
121 {
122 spsc_queue<int, capacity<128> > sf;
123
124 boost::lockfree::detail::atomic<long> spsc_queue_cnt;
125
126 // In VxWorks one RTP just supports 65535 objects
127 #ifndef __VXWORKS__
128 static_hashed_set<int, 1<<16 > working_set;
129 #else
130 static_hashed_set<int, 1<<15 > working_set;
131 #endif
132
133 boost::lockfree::detail::atomic<size_t> received_nodes;
134
spsc_queue_tester_bufferingspsc_queue_tester_buffering135 spsc_queue_tester_buffering(void):
136 spsc_queue_cnt(0), received_nodes(0)
137 {}
138
139 static const size_t buf_size = 5;
140
addspsc_queue_tester_buffering141 void add(void)
142 {
143 boost::array<int, buf_size> input_buffer;
144 for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
145 for (size_t i = 0; i != buf_size; ++i) {
146 int id = generate_id<int>();
147 working_set.insert(id);
148 input_buffer[i] = id;
149 }
150
151 size_t pushed = 0;
152
153 do {
154 pushed += sf.push(input_buffer.c_array() + pushed,
155 input_buffer.size() - pushed);
156 } while (pushed != buf_size);
157
158 spsc_queue_cnt+=buf_size;
159 }
160 running = false;
161 }
162
get_elementsspsc_queue_tester_buffering163 bool get_elements(void)
164 {
165 boost::array<int, buf_size> output_buffer;
166
167 size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
168
169 if (popd) {
170 received_nodes += popd;
171 spsc_queue_cnt -= popd;
172
173 for (size_t i = 0; i != popd; ++i) {
174 bool erased = working_set.erase(output_buffer[i]);
175 assert(erased);
176 }
177
178 return true;
179 } else
180 return false;
181 }
182
183 boost::lockfree::detail::atomic<bool> running;
184
getspsc_queue_tester_buffering185 void get(void)
186 {
187 for(;;) {
188 bool success = get_elements();
189 if (!running && !success)
190 break;
191 }
192
193 while ( get_elements() );
194 }
195
runspsc_queue_tester_buffering196 void run(void)
197 {
198 running = true;
199
200 boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
201 boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
202 cout << "reader and writer threads created" << endl;
203
204 writer.join();
205 cout << "writer threads joined. waiting for readers to finish" << endl;
206
207 reader.join();
208
209 BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
210 BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
211 BOOST_REQUIRE(sf.empty());
212 BOOST_REQUIRE(working_set.count_nodes() == 0);
213 }
214 };
215
216
BOOST_AUTO_TEST_CASE(spsc_queue_test_buffering)217 BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
218 {
219 boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
220 test1->run();
221 }
222
223