1 /*
2 * Copyright (c) 2016-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 */
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
11
12 #include <gtest/gtest.h>
13 #include <iostream>
14 #include <memory>
15 #include <mutex>
16 #include <thread>
17 #include <vector>
18
19 using namespace pzstd;
20
21 namespace {
22 struct Popper {
23 WorkQueue<int>* queue;
24 int* results;
25 std::mutex* mutex;
26
operator ()__anon7c9f79f80111::Popper27 void operator()() {
28 int result;
29 while (queue->pop(result)) {
30 std::lock_guard<std::mutex> lock(*mutex);
31 results[result] = result;
32 }
33 }
34 };
35 }
36
TEST(WorkQueue,SingleThreaded)37 TEST(WorkQueue, SingleThreaded) {
38 WorkQueue<int> queue;
39 int result;
40
41 queue.push(5);
42 EXPECT_TRUE(queue.pop(result));
43 EXPECT_EQ(5, result);
44
45 queue.push(1);
46 queue.push(2);
47 EXPECT_TRUE(queue.pop(result));
48 EXPECT_EQ(1, result);
49 EXPECT_TRUE(queue.pop(result));
50 EXPECT_EQ(2, result);
51
52 queue.push(1);
53 queue.push(2);
54 queue.finish();
55 EXPECT_TRUE(queue.pop(result));
56 EXPECT_EQ(1, result);
57 EXPECT_TRUE(queue.pop(result));
58 EXPECT_EQ(2, result);
59 EXPECT_FALSE(queue.pop(result));
60
61 queue.waitUntilFinished();
62 }
63
TEST(WorkQueue,SPSC)64 TEST(WorkQueue, SPSC) {
65 WorkQueue<int> queue;
66 const int max = 100;
67
68 for (int i = 0; i < 10; ++i) {
69 queue.push(int{i});
70 }
71
72 std::thread thread([ &queue, max ] {
73 int result;
74 for (int i = 0;; ++i) {
75 if (!queue.pop(result)) {
76 EXPECT_EQ(i, max);
77 break;
78 }
79 EXPECT_EQ(i, result);
80 }
81 });
82
83 std::this_thread::yield();
84 for (int i = 10; i < max; ++i) {
85 queue.push(int{i});
86 }
87 queue.finish();
88
89 thread.join();
90 }
91
TEST(WorkQueue,SPMC)92 TEST(WorkQueue, SPMC) {
93 WorkQueue<int> queue;
94 std::vector<int> results(50, -1);
95 std::mutex mutex;
96 std::vector<std::thread> threads;
97 for (int i = 0; i < 5; ++i) {
98 threads.emplace_back(Popper{&queue, results.data(), &mutex});
99 }
100
101 for (int i = 0; i < 50; ++i) {
102 queue.push(int{i});
103 }
104 queue.finish();
105
106 for (auto& thread : threads) {
107 thread.join();
108 }
109
110 for (int i = 0; i < 50; ++i) {
111 EXPECT_EQ(i, results[i]);
112 }
113 }
114
TEST(WorkQueue,MPMC)115 TEST(WorkQueue, MPMC) {
116 WorkQueue<int> queue;
117 std::vector<int> results(100, -1);
118 std::mutex mutex;
119 std::vector<std::thread> popperThreads;
120 for (int i = 0; i < 4; ++i) {
121 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
122 }
123
124 std::vector<std::thread> pusherThreads;
125 for (int i = 0; i < 2; ++i) {
126 auto min = i * 50;
127 auto max = (i + 1) * 50;
128 pusherThreads.emplace_back(
129 [ &queue, min, max ] {
130 for (int i = min; i < max; ++i) {
131 queue.push(int{i});
132 }
133 });
134 }
135
136 for (auto& thread : pusherThreads) {
137 thread.join();
138 }
139 queue.finish();
140
141 for (auto& thread : popperThreads) {
142 thread.join();
143 }
144
145 for (int i = 0; i < 100; ++i) {
146 EXPECT_EQ(i, results[i]);
147 }
148 }
149
TEST(WorkQueue,BoundedSizeWorks)150 TEST(WorkQueue, BoundedSizeWorks) {
151 WorkQueue<int> queue(1);
152 int result;
153 queue.push(5);
154 queue.pop(result);
155 queue.push(5);
156 queue.pop(result);
157 queue.push(5);
158 queue.finish();
159 queue.pop(result);
160 EXPECT_EQ(5, result);
161 }
162
TEST(WorkQueue,BoundedSizePushAfterFinish)163 TEST(WorkQueue, BoundedSizePushAfterFinish) {
164 WorkQueue<int> queue(1);
165 int result;
166 queue.push(5);
167 std::thread pusher([&queue] {
168 queue.push(6);
169 });
170 // Dirtily try and make sure that pusher has run.
171 std::this_thread::sleep_for(std::chrono::seconds(1));
172 queue.finish();
173 EXPECT_TRUE(queue.pop(result));
174 EXPECT_EQ(5, result);
175 EXPECT_FALSE(queue.pop(result));
176
177 pusher.join();
178 }
179
TEST(WorkQueue,SetMaxSize)180 TEST(WorkQueue, SetMaxSize) {
181 WorkQueue<int> queue(2);
182 int result;
183 queue.push(5);
184 queue.push(6);
185 queue.setMaxSize(1);
186 std::thread pusher([&queue] {
187 queue.push(7);
188 });
189 // Dirtily try and make sure that pusher has run.
190 std::this_thread::sleep_for(std::chrono::seconds(1));
191 queue.finish();
192 EXPECT_TRUE(queue.pop(result));
193 EXPECT_EQ(5, result);
194 EXPECT_TRUE(queue.pop(result));
195 EXPECT_EQ(6, result);
196 EXPECT_FALSE(queue.pop(result));
197
198 pusher.join();
199 }
200
TEST(WorkQueue,BoundedSizeMPMC)201 TEST(WorkQueue, BoundedSizeMPMC) {
202 WorkQueue<int> queue(10);
203 std::vector<int> results(200, -1);
204 std::mutex mutex;
205 std::cerr << "Creating popperThreads" << std::endl;
206 std::vector<std::thread> popperThreads;
207 for (int i = 0; i < 4; ++i) {
208 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
209 }
210
211 std::cerr << "Creating pusherThreads" << std::endl;
212 std::vector<std::thread> pusherThreads;
213 for (int i = 0; i < 2; ++i) {
214 auto min = i * 100;
215 auto max = (i + 1) * 100;
216 pusherThreads.emplace_back(
217 [ &queue, min, max ] {
218 for (int i = min; i < max; ++i) {
219 queue.push(int{i});
220 }
221 });
222 }
223
224 std::cerr << "Joining pusherThreads" << std::endl;
225 for (auto& thread : pusherThreads) {
226 thread.join();
227 }
228 std::cerr << "Finishing queue" << std::endl;
229 queue.finish();
230
231 std::cerr << "Joining popperThreads" << std::endl;
232 for (auto& thread : popperThreads) {
233 thread.join();
234 }
235
236 std::cerr << "Inspecting results" << std::endl;
237 for (int i = 0; i < 200; ++i) {
238 EXPECT_EQ(i, results[i]);
239 }
240 }
241
TEST(WorkQueue,FailedPush)242 TEST(WorkQueue, FailedPush) {
243 WorkQueue<std::unique_ptr<int>> queue;
244 std::unique_ptr<int> x(new int{5});
245 EXPECT_TRUE(queue.push(std::move(x)));
246 EXPECT_EQ(nullptr, x);
247 queue.finish();
248 x.reset(new int{6});
249 EXPECT_FALSE(queue.push(std::move(x)));
250 EXPECT_NE(nullptr, x);
251 EXPECT_EQ(6, *x);
252 }
253
TEST(BufferWorkQueue,SizeCalculatedCorrectly)254 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
255 {
256 BufferWorkQueue queue;
257 queue.finish();
258 EXPECT_EQ(0, queue.size());
259 }
260 {
261 BufferWorkQueue queue;
262 queue.push(Buffer(10));
263 queue.finish();
264 EXPECT_EQ(10, queue.size());
265 }
266 {
267 BufferWorkQueue queue;
268 queue.push(Buffer(10));
269 queue.push(Buffer(5));
270 queue.finish();
271 EXPECT_EQ(15, queue.size());
272 }
273 {
274 BufferWorkQueue queue;
275 queue.push(Buffer(10));
276 queue.push(Buffer(5));
277 queue.finish();
278 Buffer buffer;
279 queue.pop(buffer);
280 EXPECT_EQ(5, queue.size());
281 }
282 }
283