• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/queue.h"
18 
19 #include <sys/eventfd.h>
20 
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25 
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29 
30 using namespace std::chrono_literals;
31 
32 namespace bluetooth {
33 namespace os {
34 namespace {
35 
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40 
41 class QueueTest : public ::testing::Test {
42  protected:
SetUp()43   void SetUp() override {
44     enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45     enqueue_handler_ = new Handler(enqueue_thread_);
46     dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47     dequeue_handler_ = new Handler(dequeue_thread_);
48   }
TearDown()49   void TearDown() override {
50     enqueue_handler_->Clear();
51     delete enqueue_handler_;
52     delete enqueue_thread_;
53     dequeue_handler_->Clear();
54     delete dequeue_handler_;
55     delete dequeue_thread_;
56     enqueue_handler_ = nullptr;
57     enqueue_thread_ = nullptr;
58     dequeue_handler_ = nullptr;
59     dequeue_thread_ = nullptr;
60   }
61 
62   Thread* enqueue_thread_;
63   Handler* enqueue_handler_;
64   Thread* dequeue_thread_;
65   Handler* dequeue_handler_;
66 
sync_enqueue_handler()67   void sync_enqueue_handler() {
68     ASSERT(enqueue_thread_ != nullptr);
69     ASSERT(enqueue_thread_->GetReactor()->WaitForIdle(2s));
70   }
71 };
72 
73 class TestEnqueueEnd {
74  public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)75   explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
76       : count(0), handler_(handler), queue_(queue), delay_(0) {}
77 
~TestEnqueueEnd()78   ~TestEnqueueEnd() {}
79 
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)80   void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
81     promise_map_ = promise_map;
82     handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
83   }
84 
UnregisterEnqueue()85   void UnregisterEnqueue() {
86     std::promise<void> promise;
87     auto future = promise.get_future();
88 
89     handler_->Post(
90         common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
91     future.wait();
92   }
93 
EnqueueCallbackForTest()94   std::unique_ptr<std::string> EnqueueCallbackForTest() {
95     if (delay_ != 0) {
96       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
97     }
98 
99     count++;
100     std::unique_ptr<std::string> data = std::move(buffer_.front());
101     buffer_.pop();
102     std::string copy = *data;
103     if (buffer_.empty()) {
104       queue_->UnregisterEnqueue();
105     }
106 
107     auto key = buffer_.size();
108     auto node = promise_map_->extract(key);
109     if (node) {
110       node.mapped().set_value(key);
111     }
112 
113     return data;
114   }
115 
setDelay(int value)116   void setDelay(int value) {
117     delay_ = value;
118   }
119 
120   std::queue<std::unique_ptr<std::string>> buffer_;
121   int count;
122 
123  private:
124   Handler* handler_;
125   Queue<std::string>* queue_;
126   std::unordered_map<int, std::promise<int>>* promise_map_;
127   int delay_;
128 
handle_register_enqueue()129   void handle_register_enqueue() {
130     queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
131   }
132 
handle_unregister_enqueue(std::promise<void> promise)133   void handle_unregister_enqueue(std::promise<void> promise) {
134     queue_->UnregisterEnqueue();
135     promise.set_value();
136   }
137 };
138 
139 class TestDequeueEnd {
140  public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)141   explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
142       : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
143 
~TestDequeueEnd()144   ~TestDequeueEnd() {}
145 
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)146   void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
147     promise_map_ = promise_map;
148     handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
149   }
150 
UnregisterDequeue()151   void UnregisterDequeue() {
152     std::promise<void> promise;
153     auto future = promise.get_future();
154 
155     handler_->Post(
156         common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
157     future.wait();
158   }
159 
DequeueCallbackForTest()160   void DequeueCallbackForTest() {
161     if (delay_ != 0) {
162       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
163     }
164 
165     count++;
166     std::unique_ptr<std::string> data = queue_->TryDequeue();
167     buffer_.push(std::move(data));
168 
169     if (buffer_.size() == (size_t)capacity_) {
170       queue_->UnregisterDequeue();
171     }
172 
173     auto key = buffer_.size();
174     auto node = promise_map_->extract(key);
175     if (node) {
176       node.mapped().set_value(key);
177     }
178   }
179 
setDelay(int value)180   void setDelay(int value) {
181     delay_ = value;
182   }
183 
184   std::queue<std::unique_ptr<std::string>> buffer_;
185   int count;
186 
187  private:
188   Handler* handler_;
189   Queue<std::string>* queue_;
190   std::unordered_map<int, std::promise<int>>* promise_map_;
191   int capacity_;
192   int delay_;
193 
handle_register_dequeue()194   void handle_register_dequeue() {
195     queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
196   }
197 
handle_unregister_dequeue(std::promise<void> promise)198   void handle_unregister_dequeue(std::promise<void> promise) {
199     queue_->UnregisterDequeue();
200     promise.set_value();
201   }
202 };
203 
204 // Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
205 // Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty
206 
207 // Test 1 : Queue is empty
208 
209 // Enqueue end level : 1
210 // Dequeue end level : 0
211 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)212 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
213   Queue<std::string> queue(kQueueSize);
214   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
215 
216   // Push kQueueSize data to enqueue_end buffer
217   for (int i = 0; i < kQueueSize; i++) {
218     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
219     test_enqueue_end.buffer_.push(std::move(data));
220   }
221   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
222 
223   // Register enqueue and expect data move to Queue
224   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
225   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
226   auto enqueue_future = enqueue_promise_map[0].get_future();
227   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
228   enqueue_future.wait();
229   EXPECT_EQ(enqueue_future.get(), 0);
230   std::this_thread::sleep_for(std::chrono::milliseconds(20));
231 }
232 
233 // Enqueue end level : 1
234 // Dequeue end level : 0
235 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)236 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
237   Queue<std::string> queue(kQueueSize);
238   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
239 
240   // Register dequeue, DequeueCallback shouldn't be invoked
241   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
242   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
243   std::this_thread::sleep_for(std::chrono::milliseconds(20));
244   EXPECT_EQ(test_dequeue_end.count, 0);
245 
246   test_dequeue_end.UnregisterDequeue();
247 }
248 
249 // Test 2 : Queue is full
250 
251 // Enqueue end level : 0
252 // Dequeue end level : 1
253 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)254 TEST_F(QueueTest, register_enqueue_with_full_queue) {
255   Queue<std::string> queue(kQueueSize);
256   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
257 
258   // make Queue full
259   for (int i = 0; i < kQueueSize; i++) {
260     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
261     test_enqueue_end.buffer_.push(std::move(data));
262   }
263   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
264   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
265   auto enqueue_future = enqueue_promise_map[0].get_future();
266   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
267   enqueue_future.wait();
268   EXPECT_EQ(enqueue_future.get(), 0);
269 
270   // push some data to enqueue_end buffer and register enqueue;
271   for (int i = 0; i < kHalfOfQueueSize; i++) {
272     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
273     test_enqueue_end.buffer_.push(std::move(data));
274   }
275   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
276 
277   // EnqueueCallback shouldn't be invoked
278   std::this_thread::sleep_for(std::chrono::milliseconds(20));
279   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
280   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
281 
282   test_enqueue_end.UnregisterEnqueue();
283 }
284 
285 // Enqueue end level : 0
286 // Dequeue end level : 1
287 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)288 TEST_F(QueueTest, register_dequeue_with_full_queue) {
289   Queue<std::string> queue(kQueueSize);
290   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
291   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
292 
293   // make Queue full
294   for (int i = 0; i < kQueueSize; i++) {
295     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
296     test_enqueue_end.buffer_.push(std::move(data));
297   }
298   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
299   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
300   auto enqueue_future = enqueue_promise_map[0].get_future();
301   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
302   enqueue_future.wait();
303   EXPECT_EQ(enqueue_future.get(), 0);
304 
305   // Register dequeue and expect data move to dequeue end buffer
306   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
307   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
308   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
309   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
310   dequeue_future.wait();
311   EXPECT_EQ(dequeue_future.get(), kQueueSize);
312 
313   test_dequeue_end.UnregisterDequeue();
314 }
315 
316 // Test 3 : Queue is non-empty and non-full
317 
318 // Enqueue end level : 1
319 // Dequeue end level : 1
320 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)321 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
322   Queue<std::string> queue(kQueueSize);
323   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
324 
325   // make Queue half empty
326   for (int i = 0; i < kHalfOfQueueSize; i++) {
327     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
328     test_enqueue_end.buffer_.push(std::move(data));
329   }
330   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
331   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
332   auto enqueue_future = enqueue_promise_map[0].get_future();
333   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
334   enqueue_future.wait();
335   EXPECT_EQ(enqueue_future.get(), 0);
336 
337   // push some data to enqueue_end buffer and register enqueue;
338   for (int i = 0; i < kHalfOfQueueSize; i++) {
339     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
340     test_enqueue_end.buffer_.push(std::move(data));
341   }
342 
343   // Register enqueue and expect data move to Queue
344   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
345   enqueue_future = enqueue_promise_map[0].get_future();
346   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
347   enqueue_future.wait();
348   EXPECT_EQ(enqueue_future.get(), 0);
349   sync_enqueue_handler();
350 }
351 
352 // Enqueue end level : 1
353 // Dequeue end level : 1
354 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)355 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
356   Queue<std::string> queue(kQueueSize);
357   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
358   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
359 
360   // make Queue half empty
361   for (int i = 0; i < kHalfOfQueueSize; i++) {
362     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
363     test_enqueue_end.buffer_.push(std::move(data));
364   }
365   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
366   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
367   auto enqueue_future = enqueue_promise_map[0].get_future();
368   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
369   enqueue_future.wait();
370   EXPECT_EQ(enqueue_future.get(), 0);
371 
372   // Register dequeue and expect data move to dequeue end buffer
373   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
374   dequeue_promise_map.emplace(
375       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
376   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
377   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
378   dequeue_future.wait();
379   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
380 
381   test_dequeue_end.UnregisterDequeue();
382 }
383 
384 // Dynamic level test
385 
386 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
387 
388 // Enqueue end level : 1 -> 0
389 // Dequeue end level : 1
390 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)391 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
392   Queue<std::string> queue(kQueueSize);
393   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
394 
395   // push double of kQueueSize to enqueue end buffer
396   for (int i = 0; i < kDoubleOfQueueSize; i++) {
397     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
398     test_enqueue_end.buffer_.push(std::move(data));
399   }
400 
401   // Register enqueue and expect kQueueSize data move to Queue
402   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
403   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
404   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
405   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
406   enqueue_future.wait();
407   EXPECT_EQ(enqueue_future.get(), kQueueSize);
408 
409   // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
410   std::this_thread::sleep_for(std::chrono::milliseconds(20));
411   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
412   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
413 
414   test_enqueue_end.UnregisterEnqueue();
415 }
416 
417 // Enqueue end level : 1 -> 0
418 // Dequeue end level : 1
419 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)420 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
421   Queue<std::string> queue(kQueueSize);
422   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
423   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
424 
425   // push double of kQueueSize to enqueue end buffer
426   for (int i = 0; i < kDoubleOfQueueSize; i++) {
427     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
428     test_enqueue_end.buffer_.push(std::move(data));
429   }
430 
431   // Register dequeue
432   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
433   dequeue_promise_map.emplace(
434       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
435   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
436   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
437 
438   // Register enqueue
439   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
440   enqueue_promise_map.emplace(
441       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
442   auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
443   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
444 
445   // Dequeue end will unregister when buffer size is kHalfOfQueueSize
446   dequeue_future.wait();
447   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
448 
449   // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
450   enqueue_future.wait();
451   EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
452   std::this_thread::sleep_for(std::chrono::milliseconds(20));
453   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
454   EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
455 
456   test_enqueue_end.UnregisterEnqueue();
457 }
458 
459 // Enqueue end level : 1 -> 0
460 // Dequeue end level : 1
461 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)462 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
463   Queue<std::string> queue(kQueueSize);
464   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
465   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
466 
467   // push double of kDoubleOfQueueSize to enqueue end buffer
468   for (int i = 0; i < kDoubleOfQueueSize; i++) {
469     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
470     test_enqueue_end.buffer_.push(std::move(data));
471   }
472 
473   // Set 20 ms delay for callback and register dequeue
474   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
475   test_dequeue_end.setDelay(20);
476   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
477   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
478 
479   // Register enqueue
480   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
481   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
482   auto enqueue_future = enqueue_promise_map[0].get_future();
483   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
484 
485   // Wait for enqueue buffer empty and expect queue is full
486   enqueue_future.wait();
487   EXPECT_EQ(enqueue_future.get(), 0);
488   EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
489 
490   test_dequeue_end.UnregisterDequeue();
491 }
492 
493 // Enqueue end level : 0 -> 1
494 // Dequeue end level : 1 -> 0
495 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)496 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
497   Queue<std::string> queue(kQueueSizeOne);
498   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
499   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
500 
501   // push double of kQueueSize to enqueue end buffer
502   for (int i = 0; i < kQueueSize; i++) {
503     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
504     test_enqueue_end.buffer_.push(std::move(data));
505   }
506 
507   // Register dequeue
508   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
509   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
510   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
511   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
512 
513   // Register enqueue
514   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
515   auto enqueue_future = enqueue_promise_map[0].get_future();
516   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
517 
518   // Wait for all data move from enqueue end buffer to dequeue end buffer
519   dequeue_future.wait();
520   EXPECT_EQ(dequeue_future.get(), kQueueSize);
521 
522   test_dequeue_end.UnregisterDequeue();
523 }
524 
525 // Enqueue end level : 1 -> 0
526 // Dequeue end level : 1
527 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)528 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
529   Queue<std::string> queue(kQueueSize);
530   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
531   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
532 
533   // make Queue full
534   for (int i = 0; i < kDoubleOfQueueSize; i++) {
535     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
536     test_enqueue_end.buffer_.push(std::move(data));
537   }
538   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
539   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
540   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
541   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
542   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
543   enqueue_future.wait();
544   EXPECT_EQ(enqueue_future.get(), kQueueSize);
545 
546   // Expect kQueueSize data block in enqueue end buffer
547   std::this_thread::sleep_for(std::chrono::milliseconds(20));
548   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
549 
550   // Register dequeue
551   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
552   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
553 
554   // Expect enqueue end will empty
555   enqueue_future = enqueue_promise_map[0].get_future();
556   enqueue_future.wait();
557   EXPECT_EQ(enqueue_future.get(), 0);
558 
559   test_dequeue_end.UnregisterDequeue();
560 }
561 
562 // Enqueue end level : 0 -> 1
563 // Dequeue end level : 1 -> 0
564 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)565 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
566   Queue<std::string> queue(kQueueSizeOne);
567   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
568   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
569 
570   // push double of kQueueSize to enqueue end buffer
571   for (int i = 0; i < kQueueSize; i++) {
572     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
573     test_enqueue_end.buffer_.push(std::move(data));
574   }
575 
576   // Register dequeue
577   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
578   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
579   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
580   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
581 
582   // Register enqueue
583   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
584   auto enqueue_future = enqueue_promise_map[0].get_future();
585   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
586 
587   // Wait for all data move from enqueue end buffer to dequeue end buffer
588   dequeue_future.wait();
589   EXPECT_EQ(dequeue_future.get(), kQueueSize);
590 
591   test_dequeue_end.UnregisterDequeue();
592 }
593 
594 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
595 
596 // Enqueue end level : 1
597 // Dequeue end level : 1 -> 0
598 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)599 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
600   Queue<std::string> queue(kQueueSize);
601   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
602   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
603 
604   // make Queue half empty
605   for (int i = 0; i < kHalfOfQueueSize; i++) {
606     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
607     test_enqueue_end.buffer_.push(std::move(data));
608   }
609   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
610   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
611   auto enqueue_future = enqueue_promise_map[0].get_future();
612   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
613   enqueue_future.wait();
614   EXPECT_EQ(enqueue_future.get(), 0);
615 
616   // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
617   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
618   dequeue_promise_map.emplace(
619       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
620   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
621   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
622   dequeue_future.wait();
623   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
624 
625   // Expect DequeueCallback should stop to be invoked
626   std::this_thread::sleep_for(std::chrono::milliseconds(20));
627   EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
628 }
629 
630 // Enqueue end level : 1
631 // Dequeue end level : 1 -> 0
632 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)633 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
634   Queue<std::string> queue(kQueueSize);
635   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
636   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
637 
638   // make Queue half empty
639   for (int i = 0; i < kHalfOfQueueSize; i++) {
640     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
641     test_enqueue_end.buffer_.push(std::move(data));
642   }
643   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
644   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
645   auto enqueue_future = enqueue_promise_map[0].get_future();
646   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
647   enqueue_future.wait();
648   EXPECT_EQ(enqueue_future.get(), 0);
649 
650   // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
651   for (int i = 0; i < kHalfOfQueueSize; i++) {
652     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
653     test_enqueue_end.buffer_.push(std::move(data));
654   }
655   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
656 
657   // Register dequeue, expect kQueueSize move to dequeue end buffer
658   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
659   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
660   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
661   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
662   dequeue_future.wait();
663   EXPECT_EQ(dequeue_future.get(), kQueueSize);
664 
665   // Expect DequeueCallback should stop to be invoked
666   std::this_thread::sleep_for(std::chrono::milliseconds(20));
667   EXPECT_EQ(test_dequeue_end.count, kQueueSize);
668 }
669 
670 // Enqueue end level : 1
671 // Dequeue end level : 0 -> 1
672 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)673 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
674   Queue<std::string> queue(kQueueSize);
675   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
676   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
677 
678   // Register dequeue
679   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
680   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
681   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
682 
683   // push kQueueSize data to enqueue end buffer and register enqueue
684   for (int i = 0; i < kQueueSize; i++) {
685     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
686     test_enqueue_end.buffer_.push(std::move(data));
687   }
688   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
689   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
690 
691   // Expect kQueueSize data move to dequeue end buffer
692   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
693   dequeue_future.wait();
694   EXPECT_EQ(dequeue_future.get(), kQueueSize);
695 }
696 
TEST_F(QueueTest,pass_smart_pointer_and_unregister)697 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
698   Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
699 
700   // Enqueue a string
701   std::string valid = "Valid String";
702   std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
703   queue->RegisterEnqueue(
704       enqueue_handler_,
705       common::Bind(
706           [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
707             queue->UnregisterEnqueue();
708             return std::make_unique<std::string>(*shared);
709           },
710           common::Unretained(queue),
711           shared));
712 
713   // Dequeue the string
714   queue->RegisterDequeue(
715       dequeue_handler_,
716       common::Bind(
717           [](Queue<std::string>* queue, std::string valid) {
718             queue->UnregisterDequeue();
719             auto answer = *queue->TryDequeue();
720             ASSERT_EQ(answer, valid);
721           },
722           common::Unretained(queue),
723           valid));
724 
725   // Wait for both handlers to finish and delete the Queue
726   std::promise<void> promise;
727   auto future = promise.get_future();
728 
729   enqueue_handler_->Post(common::BindOnce(
730       [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
731         dequeue_handler->Post(common::BindOnce(
732             [](Queue<std::string>* queue, std::promise<void>* promise) {
733               delete queue;
734               promise->set_value();
735             },
736             common::Unretained(queue),
737             common::Unretained(promise)));
738       },
739       common::Unretained(dequeue_handler_),
740       common::Unretained(queue),
741       common::Unretained(&promise)));
742   future.wait();
743 }
744 
sleep_and_enqueue_callback(int * to_increase)745 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
746   std::this_thread::sleep_for(std::chrono::milliseconds(100));
747   (*to_increase)++;
748   return std::make_unique<std::string>("Hello");
749 }
750 
TEST_F(QueueTest,unregister_enqueue_and_wait)751 TEST_F(QueueTest, unregister_enqueue_and_wait) {
752   Queue<std::string> queue(10);
753   int* indicator = new int(100);
754   queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
755   std::this_thread::sleep_for(std::chrono::milliseconds(50));
756   queue.UnregisterEnqueue();
757   EXPECT_EQ(*indicator, 101);
758   delete indicator;
759 }
760 
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)761 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
762     int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
763   std::this_thread::sleep_for(std::chrono::milliseconds(100));
764   (*to_increase)++;
765   if (is_registered->exchange(false)) {
766     queue->UnregisterEnqueue();
767   }
768   return std::make_unique<std::string>("Hello");
769 }
770 
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)771 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
772   Queue<std::string> queue(10);
773   int* indicator = new int(100);
774   std::atomic_bool is_registered = true;
775   queue.RegisterEnqueue(
776       enqueue_handler_,
777       common::Bind(
778           &sleep_and_enqueue_callback_and_unregister,
779           common::Unretained(indicator),
780           common::Unretained(&queue),
781           common::Unretained(&is_registered)));
782   std::this_thread::sleep_for(std::chrono::milliseconds(50));
783   if (is_registered.exchange(false)) {
784     queue.UnregisterEnqueue();
785   }
786   EXPECT_EQ(*indicator, 101);
787   delete indicator;
788 }
789 
sleep_and_dequeue_callback(int * to_increase)790 void sleep_and_dequeue_callback(int* to_increase) {
791   std::this_thread::sleep_for(std::chrono::milliseconds(100));
792   (*to_increase)++;
793 }
794 
TEST_F(QueueTest,unregister_dequeue_and_wait)795 TEST_F(QueueTest, unregister_dequeue_and_wait) {
796   int* indicator = new int(100);
797   Queue<std::string> queue(10);
798   queue.RegisterEnqueue(
799       enqueue_handler_,
800       common::Bind(
801           [](Queue<std::string>* queue) {
802             queue->UnregisterEnqueue();
803             return std::make_unique<std::string>("Hello");
804           },
805           common::Unretained(&queue)));
806   queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
807   std::this_thread::sleep_for(std::chrono::milliseconds(50));
808   queue.UnregisterDequeue();
809   EXPECT_EQ(*indicator, 101);
810   delete indicator;
811 }
812 
813 // Create all threads for death tests in the function that dies
814 class QueueDeathTest : public ::testing::Test {
815  public:
RegisterEnqueueAndDelete()816   void RegisterEnqueueAndDelete() {
817     Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
818     Handler* enqueue_handler = new Handler(enqueue_thread);
819     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
820     queue->RegisterEnqueue(
821         enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
822     delete queue;
823   }
824 
RegisterDequeueAndDelete()825   void RegisterDequeueAndDelete() {
826     Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
827     Handler* dequeue_handler = new Handler(dequeue_thread);
828     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
829     queue->RegisterDequeue(
830         dequeue_handler,
831         common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
832     delete queue;
833   }
834 };
835 
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)836 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
837   EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
838 }
839 
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)840 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
841   EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
842 }
843 
844 class MockIQueueEnqueue : public IQueueEnqueue<int> {
845  public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)846   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
847     EXPECT_FALSE(registered_);
848     registered_ = true;
849     handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
850   }
851 
handle_register_enqueue(EnqueueCallback callback)852   void handle_register_enqueue(EnqueueCallback callback) {
853     if (dont_handle_register_enqueue_) {
854       return;
855     }
856     while (registered_) {
857       std::unique_ptr<int> front = callback.Run();
858       queue_.push(*front);
859     }
860   }
861 
UnregisterEnqueue()862   void UnregisterEnqueue() override {
863     EXPECT_TRUE(registered_);
864     registered_ = false;
865   }
866 
867   bool dont_handle_register_enqueue_ = false;
868   bool registered_ = false;
869   std::queue<int> queue_;
870 };
871 
872 class EnqueueBufferTest : public ::testing::Test {
873  protected:
SetUp()874   void SetUp() override {
875     thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
876     handler_ = new Handler(thread_);
877   }
878 
TearDown()879   void TearDown() override {
880     handler_->Clear();
881     delete handler_;
882     delete thread_;
883   }
884 
SynchronizeHandler()885   void SynchronizeHandler() {
886     std::promise<void> promise;
887     auto future = promise.get_future();
888     handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
889     future.wait();
890   }
891 
892   MockIQueueEnqueue enqueue_;
893   EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
894   Thread* thread_;
895   Handler* handler_;
896 };
897 
TEST_F(EnqueueBufferTest,enqueue)898 TEST_F(EnqueueBufferTest, enqueue) {
899   int num_items = 10;
900   for (int i = 0; i < num_items; i++) {
901     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
902   }
903   SynchronizeHandler();
904   for (int i = 0; i < num_items; i++) {
905     ASSERT_EQ(enqueue_.queue_.front(), i);
906     enqueue_.queue_.pop();
907   }
908   ASSERT_FALSE(enqueue_.registered_);
909 }
910 
TEST_F(EnqueueBufferTest,clear)911 TEST_F(EnqueueBufferTest, clear) {
912   enqueue_.dont_handle_register_enqueue_ = true;
913   int num_items = 10;
914   for (int i = 0; i < num_items; i++) {
915     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
916   }
917   ASSERT_TRUE(enqueue_.registered_);
918   enqueue_buffer_.Clear();
919   ASSERT_FALSE(enqueue_.registered_);
920 }
921 
TEST_F(EnqueueBufferTest,delete_when_in_callback)922 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
923   Queue<int>* queue = new Queue<int>(kQueueSize);
924   EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
925   int num_items = 10;
926   for (int i = 0; i < num_items; i++) {
927     enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
928   }
929 
930   delete enqueue_buffer;
931   delete queue;
932 }
933 
934 }  // namespace
935 }  // namespace os
936 }  // namespace bluetooth
937