1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/queue.h"
18
19 #include <sys/eventfd.h>
20
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29
30 using namespace std::chrono_literals;
31
32 namespace bluetooth {
33 namespace os {
34 namespace {
35
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40
41 class QueueTest : public ::testing::Test {
42 protected:
SetUp()43 void SetUp() override {
44 enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45 enqueue_handler_ = new Handler(enqueue_thread_);
46 dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47 dequeue_handler_ = new Handler(dequeue_thread_);
48 }
TearDown()49 void TearDown() override {
50 enqueue_handler_->Clear();
51 delete enqueue_handler_;
52 delete enqueue_thread_;
53 dequeue_handler_->Clear();
54 delete dequeue_handler_;
55 delete dequeue_thread_;
56 enqueue_handler_ = nullptr;
57 enqueue_thread_ = nullptr;
58 dequeue_handler_ = nullptr;
59 dequeue_thread_ = nullptr;
60 }
61
62 Thread* enqueue_thread_;
63 Handler* enqueue_handler_;
64 Thread* dequeue_thread_;
65 Handler* dequeue_handler_;
66
sync_enqueue_handler()67 void sync_enqueue_handler() {
68 ASSERT(enqueue_thread_ != nullptr);
69 ASSERT(enqueue_thread_->GetReactor()->WaitForIdle(2s));
70 }
71 };
72
73 class TestEnqueueEnd {
74 public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)75 explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
76 : count(0), handler_(handler), queue_(queue), delay_(0) {}
77
~TestEnqueueEnd()78 ~TestEnqueueEnd() {}
79
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)80 void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
81 promise_map_ = promise_map;
82 handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
83 }
84
UnregisterEnqueue()85 void UnregisterEnqueue() {
86 std::promise<void> promise;
87 auto future = promise.get_future();
88
89 handler_->Post(
90 common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
91 future.wait();
92 }
93
EnqueueCallbackForTest()94 std::unique_ptr<std::string> EnqueueCallbackForTest() {
95 if (delay_ != 0) {
96 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
97 }
98
99 count++;
100 std::unique_ptr<std::string> data = std::move(buffer_.front());
101 buffer_.pop();
102 std::string copy = *data;
103 if (buffer_.empty()) {
104 queue_->UnregisterEnqueue();
105 }
106
107 auto key = buffer_.size();
108 auto node = promise_map_->extract(key);
109 if (node) {
110 node.mapped().set_value(key);
111 }
112
113 return data;
114 }
115
setDelay(int value)116 void setDelay(int value) {
117 delay_ = value;
118 }
119
120 std::queue<std::unique_ptr<std::string>> buffer_;
121 int count;
122
123 private:
124 Handler* handler_;
125 Queue<std::string>* queue_;
126 std::unordered_map<int, std::promise<int>>* promise_map_;
127 int delay_;
128
handle_register_enqueue()129 void handle_register_enqueue() {
130 queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
131 }
132
handle_unregister_enqueue(std::promise<void> promise)133 void handle_unregister_enqueue(std::promise<void> promise) {
134 queue_->UnregisterEnqueue();
135 promise.set_value();
136 }
137 };
138
139 class TestDequeueEnd {
140 public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)141 explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
142 : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
143
~TestDequeueEnd()144 ~TestDequeueEnd() {}
145
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)146 void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
147 promise_map_ = promise_map;
148 handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
149 }
150
UnregisterDequeue()151 void UnregisterDequeue() {
152 std::promise<void> promise;
153 auto future = promise.get_future();
154
155 handler_->Post(
156 common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
157 future.wait();
158 }
159
DequeueCallbackForTest()160 void DequeueCallbackForTest() {
161 if (delay_ != 0) {
162 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
163 }
164
165 count++;
166 std::unique_ptr<std::string> data = queue_->TryDequeue();
167 buffer_.push(std::move(data));
168
169 if (buffer_.size() == (size_t)capacity_) {
170 queue_->UnregisterDequeue();
171 }
172
173 auto key = buffer_.size();
174 auto node = promise_map_->extract(key);
175 if (node) {
176 node.mapped().set_value(key);
177 }
178 }
179
setDelay(int value)180 void setDelay(int value) {
181 delay_ = value;
182 }
183
184 std::queue<std::unique_ptr<std::string>> buffer_;
185 int count;
186
187 private:
188 Handler* handler_;
189 Queue<std::string>* queue_;
190 std::unordered_map<int, std::promise<int>>* promise_map_;
191 int capacity_;
192 int delay_;
193
handle_register_dequeue()194 void handle_register_dequeue() {
195 queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
196 }
197
handle_unregister_dequeue(std::promise<void> promise)198 void handle_unregister_dequeue(std::promise<void> promise) {
199 queue_->UnregisterDequeue();
200 promise.set_value();
201 }
202 };
203
204 // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full
205 // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty
206
207 // Test 1 : Queue is empty
208
209 // Enqueue end level : 1
210 // Dequeue end level : 0
211 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)212 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
213 Queue<std::string> queue(kQueueSize);
214 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
215
216 // Push kQueueSize data to enqueue_end buffer
217 for (int i = 0; i < kQueueSize; i++) {
218 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
219 test_enqueue_end.buffer_.push(std::move(data));
220 }
221 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
222
223 // Register enqueue and expect data move to Queue
224 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
225 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
226 auto enqueue_future = enqueue_promise_map[0].get_future();
227 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
228 enqueue_future.wait();
229 EXPECT_EQ(enqueue_future.get(), 0);
230 std::this_thread::sleep_for(std::chrono::milliseconds(20));
231 }
232
233 // Enqueue end level : 1
234 // Dequeue end level : 0
235 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)236 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
237 Queue<std::string> queue(kQueueSize);
238 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
239
240 // Register dequeue, DequeueCallback shouldn't be invoked
241 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
242 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
243 std::this_thread::sleep_for(std::chrono::milliseconds(20));
244 EXPECT_EQ(test_dequeue_end.count, 0);
245
246 test_dequeue_end.UnregisterDequeue();
247 }
248
249 // Test 2 : Queue is full
250
251 // Enqueue end level : 0
252 // Dequeue end level : 1
253 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)254 TEST_F(QueueTest, register_enqueue_with_full_queue) {
255 Queue<std::string> queue(kQueueSize);
256 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
257
258 // make Queue full
259 for (int i = 0; i < kQueueSize; i++) {
260 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
261 test_enqueue_end.buffer_.push(std::move(data));
262 }
263 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
264 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
265 auto enqueue_future = enqueue_promise_map[0].get_future();
266 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
267 enqueue_future.wait();
268 EXPECT_EQ(enqueue_future.get(), 0);
269
270 // push some data to enqueue_end buffer and register enqueue;
271 for (int i = 0; i < kHalfOfQueueSize; i++) {
272 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
273 test_enqueue_end.buffer_.push(std::move(data));
274 }
275 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
276
277 // EnqueueCallback shouldn't be invoked
278 std::this_thread::sleep_for(std::chrono::milliseconds(20));
279 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
280 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
281
282 test_enqueue_end.UnregisterEnqueue();
283 }
284
285 // Enqueue end level : 0
286 // Dequeue end level : 1
287 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)288 TEST_F(QueueTest, register_dequeue_with_full_queue) {
289 Queue<std::string> queue(kQueueSize);
290 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
291 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
292
293 // make Queue full
294 for (int i = 0; i < kQueueSize; i++) {
295 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
296 test_enqueue_end.buffer_.push(std::move(data));
297 }
298 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
299 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
300 auto enqueue_future = enqueue_promise_map[0].get_future();
301 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
302 enqueue_future.wait();
303 EXPECT_EQ(enqueue_future.get(), 0);
304
305 // Register dequeue and expect data move to dequeue end buffer
306 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
307 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
308 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
309 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
310 dequeue_future.wait();
311 EXPECT_EQ(dequeue_future.get(), kQueueSize);
312
313 test_dequeue_end.UnregisterDequeue();
314 }
315
316 // Test 3 : Queue is non-empty and non-full
317
318 // Enqueue end level : 1
319 // Dequeue end level : 1
320 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)321 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
322 Queue<std::string> queue(kQueueSize);
323 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
324
325 // make Queue half empty
326 for (int i = 0; i < kHalfOfQueueSize; i++) {
327 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
328 test_enqueue_end.buffer_.push(std::move(data));
329 }
330 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
331 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
332 auto enqueue_future = enqueue_promise_map[0].get_future();
333 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
334 enqueue_future.wait();
335 EXPECT_EQ(enqueue_future.get(), 0);
336
337 // push some data to enqueue_end buffer and register enqueue;
338 for (int i = 0; i < kHalfOfQueueSize; i++) {
339 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
340 test_enqueue_end.buffer_.push(std::move(data));
341 }
342
343 // Register enqueue and expect data move to Queue
344 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
345 enqueue_future = enqueue_promise_map[0].get_future();
346 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
347 enqueue_future.wait();
348 EXPECT_EQ(enqueue_future.get(), 0);
349 sync_enqueue_handler();
350 }
351
352 // Enqueue end level : 1
353 // Dequeue end level : 1
354 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)355 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
356 Queue<std::string> queue(kQueueSize);
357 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
358 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
359
360 // make Queue half empty
361 for (int i = 0; i < kHalfOfQueueSize; i++) {
362 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
363 test_enqueue_end.buffer_.push(std::move(data));
364 }
365 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
366 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
367 auto enqueue_future = enqueue_promise_map[0].get_future();
368 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
369 enqueue_future.wait();
370 EXPECT_EQ(enqueue_future.get(), 0);
371
372 // Register dequeue and expect data move to dequeue end buffer
373 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
374 dequeue_promise_map.emplace(
375 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
376 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
377 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
378 dequeue_future.wait();
379 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
380
381 test_dequeue_end.UnregisterDequeue();
382 }
383
384 // Dynamic level test
385
386 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
387
388 // Enqueue end level : 1 -> 0
389 // Dequeue end level : 1
390 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)391 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
392 Queue<std::string> queue(kQueueSize);
393 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
394
395 // push double of kQueueSize to enqueue end buffer
396 for (int i = 0; i < kDoubleOfQueueSize; i++) {
397 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
398 test_enqueue_end.buffer_.push(std::move(data));
399 }
400
401 // Register enqueue and expect kQueueSize data move to Queue
402 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
403 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
404 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
405 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
406 enqueue_future.wait();
407 EXPECT_EQ(enqueue_future.get(), kQueueSize);
408
409 // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
410 std::this_thread::sleep_for(std::chrono::milliseconds(20));
411 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
412 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
413
414 test_enqueue_end.UnregisterEnqueue();
415 }
416
417 // Enqueue end level : 1 -> 0
418 // Dequeue end level : 1
419 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)420 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
421 Queue<std::string> queue(kQueueSize);
422 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
423 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
424
425 // push double of kQueueSize to enqueue end buffer
426 for (int i = 0; i < kDoubleOfQueueSize; i++) {
427 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
428 test_enqueue_end.buffer_.push(std::move(data));
429 }
430
431 // Register dequeue
432 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
433 dequeue_promise_map.emplace(
434 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
435 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
436 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
437
438 // Register enqueue
439 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
440 enqueue_promise_map.emplace(
441 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
442 auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
443 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
444
445 // Dequeue end will unregister when buffer size is kHalfOfQueueSize
446 dequeue_future.wait();
447 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
448
449 // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
450 enqueue_future.wait();
451 EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
452 std::this_thread::sleep_for(std::chrono::milliseconds(20));
453 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
454 EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
455
456 test_enqueue_end.UnregisterEnqueue();
457 }
458
459 // Enqueue end level : 1 -> 0
460 // Dequeue end level : 1
461 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)462 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
463 Queue<std::string> queue(kQueueSize);
464 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
465 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
466
467 // push double of kDoubleOfQueueSize to enqueue end buffer
468 for (int i = 0; i < kDoubleOfQueueSize; i++) {
469 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
470 test_enqueue_end.buffer_.push(std::move(data));
471 }
472
473 // Set 20 ms delay for callback and register dequeue
474 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
475 test_dequeue_end.setDelay(20);
476 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
477 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
478
479 // Register enqueue
480 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
481 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
482 auto enqueue_future = enqueue_promise_map[0].get_future();
483 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
484
485 // Wait for enqueue buffer empty and expect queue is full
486 enqueue_future.wait();
487 EXPECT_EQ(enqueue_future.get(), 0);
488 EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
489
490 test_dequeue_end.UnregisterDequeue();
491 }
492
493 // Enqueue end level : 0 -> 1
494 // Dequeue end level : 1 -> 0
495 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)496 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
497 Queue<std::string> queue(kQueueSizeOne);
498 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
499 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
500
501 // push double of kQueueSize to enqueue end buffer
502 for (int i = 0; i < kQueueSize; i++) {
503 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
504 test_enqueue_end.buffer_.push(std::move(data));
505 }
506
507 // Register dequeue
508 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
509 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
510 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
511 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
512
513 // Register enqueue
514 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
515 auto enqueue_future = enqueue_promise_map[0].get_future();
516 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
517
518 // Wait for all data move from enqueue end buffer to dequeue end buffer
519 dequeue_future.wait();
520 EXPECT_EQ(dequeue_future.get(), kQueueSize);
521
522 test_dequeue_end.UnregisterDequeue();
523 }
524
525 // Enqueue end level : 1 -> 0
526 // Dequeue end level : 1
527 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)528 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
529 Queue<std::string> queue(kQueueSize);
530 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
531 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
532
533 // make Queue full
534 for (int i = 0; i < kDoubleOfQueueSize; i++) {
535 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
536 test_enqueue_end.buffer_.push(std::move(data));
537 }
538 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
539 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
540 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
541 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
542 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
543 enqueue_future.wait();
544 EXPECT_EQ(enqueue_future.get(), kQueueSize);
545
546 // Expect kQueueSize data block in enqueue end buffer
547 std::this_thread::sleep_for(std::chrono::milliseconds(20));
548 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
549
550 // Register dequeue
551 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
552 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
553
554 // Expect enqueue end will empty
555 enqueue_future = enqueue_promise_map[0].get_future();
556 enqueue_future.wait();
557 EXPECT_EQ(enqueue_future.get(), 0);
558
559 test_dequeue_end.UnregisterDequeue();
560 }
561
562 // Enqueue end level : 0 -> 1
563 // Dequeue end level : 1 -> 0
564 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)565 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
566 Queue<std::string> queue(kQueueSizeOne);
567 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
568 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
569
570 // push double of kQueueSize to enqueue end buffer
571 for (int i = 0; i < kQueueSize; i++) {
572 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
573 test_enqueue_end.buffer_.push(std::move(data));
574 }
575
576 // Register dequeue
577 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
578 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
579 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
580 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
581
582 // Register enqueue
583 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
584 auto enqueue_future = enqueue_promise_map[0].get_future();
585 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
586
587 // Wait for all data move from enqueue end buffer to dequeue end buffer
588 dequeue_future.wait();
589 EXPECT_EQ(dequeue_future.get(), kQueueSize);
590
591 test_dequeue_end.UnregisterDequeue();
592 }
593
594 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
595
596 // Enqueue end level : 1
597 // Dequeue end level : 1 -> 0
598 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)599 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
600 Queue<std::string> queue(kQueueSize);
601 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
602 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
603
604 // make Queue half empty
605 for (int i = 0; i < kHalfOfQueueSize; i++) {
606 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
607 test_enqueue_end.buffer_.push(std::move(data));
608 }
609 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
610 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
611 auto enqueue_future = enqueue_promise_map[0].get_future();
612 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
613 enqueue_future.wait();
614 EXPECT_EQ(enqueue_future.get(), 0);
615
616 // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
617 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
618 dequeue_promise_map.emplace(
619 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
620 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
621 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
622 dequeue_future.wait();
623 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
624
625 // Expect DequeueCallback should stop to be invoked
626 std::this_thread::sleep_for(std::chrono::milliseconds(20));
627 EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
628 }
629
630 // Enqueue end level : 1
631 // Dequeue end level : 1 -> 0
632 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)633 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
634 Queue<std::string> queue(kQueueSize);
635 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
636 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
637
638 // make Queue half empty
639 for (int i = 0; i < kHalfOfQueueSize; i++) {
640 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
641 test_enqueue_end.buffer_.push(std::move(data));
642 }
643 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
644 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
645 auto enqueue_future = enqueue_promise_map[0].get_future();
646 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
647 enqueue_future.wait();
648 EXPECT_EQ(enqueue_future.get(), 0);
649
650 // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
651 for (int i = 0; i < kHalfOfQueueSize; i++) {
652 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
653 test_enqueue_end.buffer_.push(std::move(data));
654 }
655 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
656
657 // Register dequeue, expect kQueueSize move to dequeue end buffer
658 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
659 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
660 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
661 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
662 dequeue_future.wait();
663 EXPECT_EQ(dequeue_future.get(), kQueueSize);
664
665 // Expect DequeueCallback should stop to be invoked
666 std::this_thread::sleep_for(std::chrono::milliseconds(20));
667 EXPECT_EQ(test_dequeue_end.count, kQueueSize);
668 }
669
670 // Enqueue end level : 1
671 // Dequeue end level : 0 -> 1
672 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)673 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
674 Queue<std::string> queue(kQueueSize);
675 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
676 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
677
678 // Register dequeue
679 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
680 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
681 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
682
683 // push kQueueSize data to enqueue end buffer and register enqueue
684 for (int i = 0; i < kQueueSize; i++) {
685 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
686 test_enqueue_end.buffer_.push(std::move(data));
687 }
688 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
689 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
690
691 // Expect kQueueSize data move to dequeue end buffer
692 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
693 dequeue_future.wait();
694 EXPECT_EQ(dequeue_future.get(), kQueueSize);
695 }
696
TEST_F(QueueTest,pass_smart_pointer_and_unregister)697 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
698 Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
699
700 // Enqueue a string
701 std::string valid = "Valid String";
702 std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
703 queue->RegisterEnqueue(
704 enqueue_handler_,
705 common::Bind(
706 [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
707 queue->UnregisterEnqueue();
708 return std::make_unique<std::string>(*shared);
709 },
710 common::Unretained(queue),
711 shared));
712
713 // Dequeue the string
714 queue->RegisterDequeue(
715 dequeue_handler_,
716 common::Bind(
717 [](Queue<std::string>* queue, std::string valid) {
718 queue->UnregisterDequeue();
719 auto answer = *queue->TryDequeue();
720 ASSERT_EQ(answer, valid);
721 },
722 common::Unretained(queue),
723 valid));
724
725 // Wait for both handlers to finish and delete the Queue
726 std::promise<void> promise;
727 auto future = promise.get_future();
728
729 enqueue_handler_->Post(common::BindOnce(
730 [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
731 dequeue_handler->Post(common::BindOnce(
732 [](Queue<std::string>* queue, std::promise<void>* promise) {
733 delete queue;
734 promise->set_value();
735 },
736 common::Unretained(queue),
737 common::Unretained(promise)));
738 },
739 common::Unretained(dequeue_handler_),
740 common::Unretained(queue),
741 common::Unretained(&promise)));
742 future.wait();
743 }
744
sleep_and_enqueue_callback(int * to_increase)745 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
746 std::this_thread::sleep_for(std::chrono::milliseconds(100));
747 (*to_increase)++;
748 return std::make_unique<std::string>("Hello");
749 }
750
TEST_F(QueueTest,unregister_enqueue_and_wait)751 TEST_F(QueueTest, unregister_enqueue_and_wait) {
752 Queue<std::string> queue(10);
753 int* indicator = new int(100);
754 queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
755 std::this_thread::sleep_for(std::chrono::milliseconds(50));
756 queue.UnregisterEnqueue();
757 EXPECT_EQ(*indicator, 101);
758 delete indicator;
759 }
760
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)761 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
762 int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
763 std::this_thread::sleep_for(std::chrono::milliseconds(100));
764 (*to_increase)++;
765 if (is_registered->exchange(false)) {
766 queue->UnregisterEnqueue();
767 }
768 return std::make_unique<std::string>("Hello");
769 }
770
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)771 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
772 Queue<std::string> queue(10);
773 int* indicator = new int(100);
774 std::atomic_bool is_registered = true;
775 queue.RegisterEnqueue(
776 enqueue_handler_,
777 common::Bind(
778 &sleep_and_enqueue_callback_and_unregister,
779 common::Unretained(indicator),
780 common::Unretained(&queue),
781 common::Unretained(&is_registered)));
782 std::this_thread::sleep_for(std::chrono::milliseconds(50));
783 if (is_registered.exchange(false)) {
784 queue.UnregisterEnqueue();
785 }
786 EXPECT_EQ(*indicator, 101);
787 delete indicator;
788 }
789
sleep_and_dequeue_callback(int * to_increase)790 void sleep_and_dequeue_callback(int* to_increase) {
791 std::this_thread::sleep_for(std::chrono::milliseconds(100));
792 (*to_increase)++;
793 }
794
TEST_F(QueueTest,unregister_dequeue_and_wait)795 TEST_F(QueueTest, unregister_dequeue_and_wait) {
796 int* indicator = new int(100);
797 Queue<std::string> queue(10);
798 queue.RegisterEnqueue(
799 enqueue_handler_,
800 common::Bind(
801 [](Queue<std::string>* queue) {
802 queue->UnregisterEnqueue();
803 return std::make_unique<std::string>("Hello");
804 },
805 common::Unretained(&queue)));
806 queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
807 std::this_thread::sleep_for(std::chrono::milliseconds(50));
808 queue.UnregisterDequeue();
809 EXPECT_EQ(*indicator, 101);
810 delete indicator;
811 }
812
813 // Create all threads for death tests in the function that dies
814 class QueueDeathTest : public ::testing::Test {
815 public:
RegisterEnqueueAndDelete()816 void RegisterEnqueueAndDelete() {
817 Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
818 Handler* enqueue_handler = new Handler(enqueue_thread);
819 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
820 queue->RegisterEnqueue(
821 enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
822 delete queue;
823 }
824
RegisterDequeueAndDelete()825 void RegisterDequeueAndDelete() {
826 Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
827 Handler* dequeue_handler = new Handler(dequeue_thread);
828 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
829 queue->RegisterDequeue(
830 dequeue_handler,
831 common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
832 delete queue;
833 }
834 };
835
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)836 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
837 EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
838 }
839
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)840 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
841 EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
842 }
843
844 class MockIQueueEnqueue : public IQueueEnqueue<int> {
845 public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)846 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
847 EXPECT_FALSE(registered_);
848 registered_ = true;
849 handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
850 }
851
handle_register_enqueue(EnqueueCallback callback)852 void handle_register_enqueue(EnqueueCallback callback) {
853 if (dont_handle_register_enqueue_) {
854 return;
855 }
856 while (registered_) {
857 std::unique_ptr<int> front = callback.Run();
858 queue_.push(*front);
859 }
860 }
861
UnregisterEnqueue()862 void UnregisterEnqueue() override {
863 EXPECT_TRUE(registered_);
864 registered_ = false;
865 }
866
867 bool dont_handle_register_enqueue_ = false;
868 bool registered_ = false;
869 std::queue<int> queue_;
870 };
871
872 class EnqueueBufferTest : public ::testing::Test {
873 protected:
SetUp()874 void SetUp() override {
875 thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
876 handler_ = new Handler(thread_);
877 }
878
TearDown()879 void TearDown() override {
880 handler_->Clear();
881 delete handler_;
882 delete thread_;
883 }
884
SynchronizeHandler()885 void SynchronizeHandler() {
886 std::promise<void> promise;
887 auto future = promise.get_future();
888 handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
889 future.wait();
890 }
891
892 MockIQueueEnqueue enqueue_;
893 EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
894 Thread* thread_;
895 Handler* handler_;
896 };
897
TEST_F(EnqueueBufferTest,enqueue)898 TEST_F(EnqueueBufferTest, enqueue) {
899 int num_items = 10;
900 for (int i = 0; i < num_items; i++) {
901 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
902 }
903 SynchronizeHandler();
904 for (int i = 0; i < num_items; i++) {
905 ASSERT_EQ(enqueue_.queue_.front(), i);
906 enqueue_.queue_.pop();
907 }
908 ASSERT_FALSE(enqueue_.registered_);
909 }
910
TEST_F(EnqueueBufferTest,clear)911 TEST_F(EnqueueBufferTest, clear) {
912 enqueue_.dont_handle_register_enqueue_ = true;
913 int num_items = 10;
914 for (int i = 0; i < num_items; i++) {
915 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
916 }
917 ASSERT_TRUE(enqueue_.registered_);
918 enqueue_buffer_.Clear();
919 ASSERT_FALSE(enqueue_.registered_);
920 }
921
TEST_F(EnqueueBufferTest,delete_when_in_callback)922 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
923 Queue<int>* queue = new Queue<int>(kQueueSize);
924 EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
925 int num_items = 10;
926 for (int i = 0; i < num_items; i++) {
927 enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
928 }
929
930 delete enqueue_buffer;
931 delete queue;
932 }
933
934 } // namespace
935 } // namespace os
936 } // namespace bluetooth
937