• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 #include "utils/Buffer.h"
10 #include "utils/WorkQueue.h"
11 
12 #include <gtest/gtest.h>
13 #include <iostream>
14 #include <memory>
15 #include <mutex>
16 #include <thread>
17 #include <vector>
18 
19 using namespace pzstd;
20 
21 namespace {
22 struct Popper {
23   WorkQueue<int>* queue;
24   int* results;
25   std::mutex* mutex;
26 
operator ()__anon7c9f79f80111::Popper27   void operator()() {
28     int result;
29     while (queue->pop(result)) {
30       std::lock_guard<std::mutex> lock(*mutex);
31       results[result] = result;
32     }
33   }
34 };
35 }
36 
TEST(WorkQueue,SingleThreaded)37 TEST(WorkQueue, SingleThreaded) {
38   WorkQueue<int> queue;
39   int result;
40 
41   queue.push(5);
42   EXPECT_TRUE(queue.pop(result));
43   EXPECT_EQ(5, result);
44 
45   queue.push(1);
46   queue.push(2);
47   EXPECT_TRUE(queue.pop(result));
48   EXPECT_EQ(1, result);
49   EXPECT_TRUE(queue.pop(result));
50   EXPECT_EQ(2, result);
51 
52   queue.push(1);
53   queue.push(2);
54   queue.finish();
55   EXPECT_TRUE(queue.pop(result));
56   EXPECT_EQ(1, result);
57   EXPECT_TRUE(queue.pop(result));
58   EXPECT_EQ(2, result);
59   EXPECT_FALSE(queue.pop(result));
60 
61   queue.waitUntilFinished();
62 }
63 
TEST(WorkQueue,SPSC)64 TEST(WorkQueue, SPSC) {
65   WorkQueue<int> queue;
66   const int max = 100;
67 
68   for (int i = 0; i < 10; ++i) {
69     queue.push(int{i});
70   }
71 
72   std::thread thread([ &queue, max ] {
73     int result;
74     for (int i = 0;; ++i) {
75       if (!queue.pop(result)) {
76         EXPECT_EQ(i, max);
77         break;
78       }
79       EXPECT_EQ(i, result);
80     }
81   });
82 
83   std::this_thread::yield();
84   for (int i = 10; i < max; ++i) {
85     queue.push(int{i});
86   }
87   queue.finish();
88 
89   thread.join();
90 }
91 
TEST(WorkQueue,SPMC)92 TEST(WorkQueue, SPMC) {
93   WorkQueue<int> queue;
94   std::vector<int> results(50, -1);
95   std::mutex mutex;
96   std::vector<std::thread> threads;
97   for (int i = 0; i < 5; ++i) {
98     threads.emplace_back(Popper{&queue, results.data(), &mutex});
99   }
100 
101   for (int i = 0; i < 50; ++i) {
102     queue.push(int{i});
103   }
104   queue.finish();
105 
106   for (auto& thread : threads) {
107     thread.join();
108   }
109 
110   for (int i = 0; i < 50; ++i) {
111     EXPECT_EQ(i, results[i]);
112   }
113 }
114 
TEST(WorkQueue,MPMC)115 TEST(WorkQueue, MPMC) {
116   WorkQueue<int> queue;
117   std::vector<int> results(100, -1);
118   std::mutex mutex;
119   std::vector<std::thread> popperThreads;
120   for (int i = 0; i < 4; ++i) {
121     popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
122   }
123 
124   std::vector<std::thread> pusherThreads;
125   for (int i = 0; i < 2; ++i) {
126     auto min = i * 50;
127     auto max = (i + 1) * 50;
128     pusherThreads.emplace_back(
129         [ &queue, min, max ] {
130           for (int i = min; i < max; ++i) {
131             queue.push(int{i});
132           }
133         });
134   }
135 
136   for (auto& thread : pusherThreads) {
137     thread.join();
138   }
139   queue.finish();
140 
141   for (auto& thread : popperThreads) {
142     thread.join();
143   }
144 
145   for (int i = 0; i < 100; ++i) {
146     EXPECT_EQ(i, results[i]);
147   }
148 }
149 
TEST(WorkQueue,BoundedSizeWorks)150 TEST(WorkQueue, BoundedSizeWorks) {
151   WorkQueue<int> queue(1);
152   int result;
153   queue.push(5);
154   queue.pop(result);
155   queue.push(5);
156   queue.pop(result);
157   queue.push(5);
158   queue.finish();
159   queue.pop(result);
160   EXPECT_EQ(5, result);
161 }
162 
TEST(WorkQueue,BoundedSizePushAfterFinish)163 TEST(WorkQueue, BoundedSizePushAfterFinish) {
164   WorkQueue<int> queue(1);
165   int result;
166   queue.push(5);
167   std::thread pusher([&queue] {
168     queue.push(6);
169   });
170   // Dirtily try and make sure that pusher has run.
171   std::this_thread::sleep_for(std::chrono::seconds(1));
172   queue.finish();
173   EXPECT_TRUE(queue.pop(result));
174   EXPECT_EQ(5, result);
175   EXPECT_FALSE(queue.pop(result));
176 
177   pusher.join();
178 }
179 
TEST(WorkQueue,SetMaxSize)180 TEST(WorkQueue, SetMaxSize) {
181   WorkQueue<int> queue(2);
182   int result;
183   queue.push(5);
184   queue.push(6);
185   queue.setMaxSize(1);
186   std::thread pusher([&queue] {
187     queue.push(7);
188   });
189   // Dirtily try and make sure that pusher has run.
190   std::this_thread::sleep_for(std::chrono::seconds(1));
191   queue.finish();
192   EXPECT_TRUE(queue.pop(result));
193   EXPECT_EQ(5, result);
194   EXPECT_TRUE(queue.pop(result));
195   EXPECT_EQ(6, result);
196   EXPECT_FALSE(queue.pop(result));
197 
198   pusher.join();
199 }
200 
TEST(WorkQueue,BoundedSizeMPMC)201 TEST(WorkQueue, BoundedSizeMPMC) {
202   WorkQueue<int> queue(10);
203   std::vector<int> results(200, -1);
204   std::mutex mutex;
205   std::cerr << "Creating popperThreads" << std::endl;
206   std::vector<std::thread> popperThreads;
207   for (int i = 0; i < 4; ++i) {
208     popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
209   }
210 
211   std::cerr << "Creating pusherThreads" << std::endl;
212   std::vector<std::thread> pusherThreads;
213   for (int i = 0; i < 2; ++i) {
214     auto min = i * 100;
215     auto max = (i + 1) * 100;
216     pusherThreads.emplace_back(
217         [ &queue, min, max ] {
218           for (int i = min; i < max; ++i) {
219             queue.push(int{i});
220           }
221         });
222   }
223 
224   std::cerr << "Joining pusherThreads" << std::endl;
225   for (auto& thread : pusherThreads) {
226     thread.join();
227   }
228   std::cerr << "Finishing queue" << std::endl;
229   queue.finish();
230 
231   std::cerr << "Joining popperThreads" << std::endl;
232   for (auto& thread : popperThreads) {
233     thread.join();
234   }
235 
236   std::cerr << "Inspecting results" << std::endl;
237   for (int i = 0; i < 200; ++i) {
238     EXPECT_EQ(i, results[i]);
239   }
240 }
241 
TEST(WorkQueue,FailedPush)242 TEST(WorkQueue, FailedPush) {
243   WorkQueue<std::unique_ptr<int>> queue;
244   std::unique_ptr<int> x(new int{5});
245   EXPECT_TRUE(queue.push(std::move(x)));
246   EXPECT_EQ(nullptr, x);
247   queue.finish();
248   x.reset(new int{6});
249   EXPECT_FALSE(queue.push(std::move(x)));
250   EXPECT_NE(nullptr, x);
251   EXPECT_EQ(6, *x);
252 }
253 
TEST(BufferWorkQueue,SizeCalculatedCorrectly)254 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
255   {
256     BufferWorkQueue queue;
257     queue.finish();
258     EXPECT_EQ(0, queue.size());
259   }
260   {
261     BufferWorkQueue queue;
262     queue.push(Buffer(10));
263     queue.finish();
264     EXPECT_EQ(10, queue.size());
265   }
266   {
267     BufferWorkQueue queue;
268     queue.push(Buffer(10));
269     queue.push(Buffer(5));
270     queue.finish();
271     EXPECT_EQ(15, queue.size());
272   }
273   {
274     BufferWorkQueue queue;
275     queue.push(Buffer(10));
276     queue.push(Buffer(5));
277     queue.finish();
278     Buffer buffer;
279     queue.pop(buffer);
280     EXPECT_EQ(5, queue.size());
281   }
282 }
283