1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <bluetooth/log.h>
20 #include <sys/eventfd.h>
21
22 #include <chrono>
23 #include <future>
24 #include <thread>
25
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "gtest/gtest.h"
29 #include "os/log.h"
30
31 namespace bluetooth {
32 namespace os {
33 namespace {
34
35 constexpr int kReadReadyValue = 100;
36
37 using common::Bind;
38
39 std::promise<int>* g_promise;
40
41 class ReactorTest : public ::testing::Test {
42 protected:
SetUp()43 void SetUp() override {
44 g_promise = new std::promise<int>;
45 reactor_ = new Reactor;
46 }
47
TearDown()48 void TearDown() override {
49 delete g_promise;
50 g_promise = nullptr;
51 delete reactor_;
52 reactor_ = nullptr;
53 }
54
55 Reactor* reactor_;
56 };
57
58 class SampleReactable {
59 public:
SampleReactable()60 SampleReactable() : fd_(eventfd(0, EFD_NONBLOCK)) {
61 EXPECT_NE(fd_, -1);
62 }
63
~SampleReactable()64 ~SampleReactable() {
65 close(fd_);
66 }
67
OnReadReady()68 void OnReadReady() {}
69
OnWriteReady()70 void OnWriteReady() {}
71
72 int fd_;
73 };
74
75 class FakeReactable {
76 public:
77 enum EventFdValue {
78 kSetPromise = 1,
79 kRegisterSampleReactable,
80 kUnregisterSampleReactable,
81 kSampleOutputValue,
82 };
FakeReactable()83 FakeReactable() : fd_(eventfd(0, 0)), reactor_(nullptr) {
84 EXPECT_NE(fd_, -1);
85 }
86
FakeReactable(Reactor * reactor)87 FakeReactable(Reactor* reactor) : fd_(eventfd(0, 0)), reactor_(reactor) {
88 EXPECT_NE(fd_, -1);
89 }
90
~FakeReactable()91 ~FakeReactable() {
92 close(fd_);
93 }
94
OnReadReady()95 void OnReadReady() {
96 log::info("");
97 uint64_t value = 0;
98 auto read_result = eventfd_read(fd_, &value);
99 log::info("value = {}", (int)value);
100 EXPECT_EQ(read_result, 0);
101 if (value == kSetPromise && g_promise != nullptr) {
102 g_promise->set_value(kReadReadyValue);
103 }
104 if (value == kRegisterSampleReactable) {
105 reactable_ = reactor_->Register(
106 sample_reactable_.fd_,
107 Bind(&FakeReactable::OnReadReady, common::Unretained(this)),
108 Bind(&FakeReactable::OnWriteReadyNoOp, common::Unretained(this)));
109 g_promise->set_value(kReadReadyValue);
110 }
111 if (value == kUnregisterSampleReactable) {
112 reactor_->Unregister(reactable_);
113 g_promise->set_value(kReadReadyValue);
114 }
115 }
116
OnWriteReady()117 void OnWriteReady() {
118 auto write_result = eventfd_write(fd_, output_data_);
119 output_data_ = 0;
120 EXPECT_EQ(write_result, 0);
121 }
122
OnWriteReadyNoOp()123 void OnWriteReadyNoOp() {}
124
UnregisterInCallback()125 void UnregisterInCallback() {
126 uint64_t value = 0;
127 auto read_result = eventfd_read(fd_, &value);
128 EXPECT_EQ(read_result, 0);
129 g_promise->set_value(kReadReadyValue);
130 reactor_->Unregister(reactable_);
131 }
132
133 SampleReactable sample_reactable_;
134 Reactor::Reactable* reactable_ = nullptr;
135 int fd_;
136
137 private:
138 Reactor* reactor_;
139 uint64_t output_data_ = kSampleOutputValue;
140 };
141
142 class FakeRunningReactable {
143 public:
FakeRunningReactable()144 FakeRunningReactable() : fd_(eventfd(0, 0)) {
145 EXPECT_NE(fd_, -1);
146 }
147
~FakeRunningReactable()148 ~FakeRunningReactable() {
149 close(fd_);
150 }
151
OnReadReady()152 void OnReadReady() {
153 uint64_t value = 0;
154 auto read_result = eventfd_read(fd_, &value);
155 ASSERT_EQ(read_result, 0);
156 started.set_value();
157 can_finish.get_future().wait();
158 finished.set_value();
159 }
160
161 Reactor::Reactable* reactable_ = nullptr;
162 int fd_;
163
164 std::promise<void> started;
165 std::promise<void> can_finish;
166 std::promise<void> finished;
167 };
168
TEST_F(ReactorTest,start_and_stop)169 TEST_F(ReactorTest, start_and_stop) {
170 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
171 reactor_->Stop();
172 reactor_thread.join();
173 }
174
TEST_F(ReactorTest,stop_and_start)175 TEST_F(ReactorTest, stop_and_start) {
176 auto reactor_thread = std::thread(&Reactor::Stop, reactor_);
177 auto another_thread = std::thread(&Reactor::Run, reactor_);
178 reactor_thread.join();
179 another_thread.join();
180 }
181
TEST_F(ReactorTest,stop_multi_times)182 TEST_F(ReactorTest, stop_multi_times) {
183 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
184 for (int i = 0; i < 5; i++) {
185 reactor_->Stop();
186 }
187 reactor_thread.join();
188 }
189
TEST_F(ReactorTest,cold_register_only)190 TEST_F(ReactorTest, cold_register_only) {
191 FakeReactable fake_reactable;
192 auto* reactable = reactor_->Register(
193 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
194
195 reactor_->Unregister(reactable);
196 }
197
TEST_F(ReactorTest,cold_register)198 TEST_F(ReactorTest, cold_register) {
199 FakeReactable fake_reactable;
200 auto* reactable = reactor_->Register(
201 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
202 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
203 auto future = g_promise->get_future();
204
205 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
206 EXPECT_EQ(write_result, 0);
207 EXPECT_EQ(future.get(), kReadReadyValue);
208 reactor_->Stop();
209 reactor_thread.join();
210 reactor_->Unregister(reactable);
211 }
212
TEST_F(ReactorTest,hot_register_from_different_thread)213 TEST_F(ReactorTest, hot_register_from_different_thread) {
214 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
215 auto future = g_promise->get_future();
216
217 FakeReactable fake_reactable;
218 auto* reactable = reactor_->Register(
219 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
220 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
221 EXPECT_EQ(write_result, 0);
222 EXPECT_EQ(future.get(), kReadReadyValue);
223 reactor_->Stop();
224 reactor_thread.join();
225
226 reactor_->Unregister(reactable);
227 }
228
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_)229 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) {
230 FakeRunningReactable fake_reactable;
231 auto* reactable = reactor_->Register(
232 fake_reactable.fd_,
233 Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
234 common::Closure());
235 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
236 auto write_result = eventfd_write(fake_reactable.fd_, 1);
237 ASSERT_EQ(write_result, 0);
238 fake_reactable.started.get_future().wait();
239 reactor_->Unregister(reactable);
240 fake_reactable.can_finish.set_value();
241 fake_reactable.finished.get_future().wait();
242
243 reactor_->Stop();
244 reactor_thread.join();
245 }
246
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_fails)247 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_fails) {
248 FakeRunningReactable fake_reactable;
249 auto* reactable = reactor_->Register(
250 fake_reactable.fd_,
251 common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
252 common::Closure());
253 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
254 auto write_result = eventfd_write(fake_reactable.fd_, 1);
255 ASSERT_EQ(write_result, 0);
256 fake_reactable.started.get_future().wait();
257 reactor_->Unregister(reactable);
258 ASSERT_FALSE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
259 fake_reactable.can_finish.set_value();
260 fake_reactable.finished.get_future().wait();
261
262 reactor_->Stop();
263 reactor_thread.join();
264 }
265
TEST_F(ReactorTest,unregister_from_different_thread_while_task_is_executing_wait_succeeds)266 TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_succeeds) {
267 FakeRunningReactable fake_reactable;
268 auto* reactable = reactor_->Register(
269 fake_reactable.fd_,
270 common::Bind(&FakeRunningReactable::OnReadReady, common::Unretained(&fake_reactable)),
271 common::Closure());
272 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
273 auto write_result = eventfd_write(fake_reactable.fd_, 1);
274 ASSERT_EQ(write_result, 0);
275 fake_reactable.started.get_future().wait();
276 reactor_->Unregister(reactable);
277 fake_reactable.can_finish.set_value();
278 fake_reactable.finished.get_future().wait();
279 ASSERT_TRUE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1)));
280
281 reactor_->Stop();
282 reactor_thread.join();
283 }
284
TEST_F(ReactorTest,hot_unregister_from_different_thread)285 TEST_F(ReactorTest, hot_unregister_from_different_thread) {
286 FakeReactable fake_reactable;
287 auto* reactable = reactor_->Register(
288 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
289 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
290 reactor_->Unregister(reactable);
291 auto future = g_promise->get_future();
292
293 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
294 EXPECT_EQ(write_result, 0);
295 future.wait_for(std::chrono::milliseconds(10));
296 g_promise->set_value(2);
297 EXPECT_EQ(future.get(), 2);
298 reactor_->Stop();
299 reactor_thread.join();
300 }
301
TEST_F(ReactorTest,hot_register_from_same_thread)302 TEST_F(ReactorTest, hot_register_from_same_thread) {
303 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
304 auto future = g_promise->get_future();
305
306 FakeReactable fake_reactable(reactor_);
307 auto* reactable = reactor_->Register(
308 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
309 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
310 EXPECT_EQ(write_result, 0);
311 EXPECT_EQ(future.get(), kReadReadyValue);
312 delete g_promise;
313 g_promise = new std::promise<int>;
314 future = g_promise->get_future();
315 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
316 EXPECT_EQ(write_result, 0);
317 reactor_->Stop();
318 reactor_thread.join();
319
320 reactor_->Unregister(reactable);
321 }
322
TEST_F(ReactorTest,hot_unregister_from_same_thread)323 TEST_F(ReactorTest, hot_unregister_from_same_thread) {
324 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
325 auto future = g_promise->get_future();
326
327 FakeReactable fake_reactable(reactor_);
328 auto* reactable = reactor_->Register(
329 fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure());
330 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
331 EXPECT_EQ(write_result, 0);
332 EXPECT_EQ(future.get(), kReadReadyValue);
333 log::info("");
334 delete g_promise;
335 g_promise = new std::promise<int>;
336 future = g_promise->get_future();
337 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
338 EXPECT_EQ(write_result, 0);
339 EXPECT_EQ(future.get(), kReadReadyValue);
340 log::info("");
341 reactor_->Stop();
342 reactor_thread.join();
343
344 reactor_->Unregister(reactable);
345 }
346
TEST_F(ReactorTest,hot_unregister_from_callback)347 TEST_F(ReactorTest, hot_unregister_from_callback) {
348 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
349
350 FakeReactable fake_reactable1(reactor_);
351 auto* reactable1 = reactor_->Register(
352 fake_reactable1.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)), common::Closure());
353
354 FakeReactable fake_reactable2(reactor_);
355 auto* reactable2 = reactor_->Register(
356 fake_reactable2.fd_,
357 Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
358 common::Closure());
359 fake_reactable2.reactable_ = reactable2;
360 auto write_result = eventfd_write(fake_reactable2.fd_, 1);
361 EXPECT_EQ(write_result, 0);
362 reactor_->Stop();
363 reactor_thread.join();
364
365 reactor_->Unregister(reactable1);
366 }
367
TEST_F(ReactorTest,hot_unregister_during_unregister_from_callback)368 TEST_F(ReactorTest, hot_unregister_during_unregister_from_callback) {
369 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
370 auto future = g_promise->get_future();
371
372 FakeReactable fake_reactable1(reactor_);
373 auto* reactable1 = reactor_->Register(
374 fake_reactable1.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable1)), common::Closure());
375
376 FakeReactable fake_reactable2(reactor_);
377 auto* reactable2 = reactor_->Register(
378 fake_reactable2.fd_,
379 Bind(&FakeReactable::UnregisterInCallback, common::Unretained(&fake_reactable2)),
380 common::Closure());
381 fake_reactable2.reactable_ = reactable2;
382 auto write_result = eventfd_write(fake_reactable2.fd_, 1);
383 EXPECT_EQ(write_result, 0);
384 EXPECT_EQ(future.get(), kReadReadyValue);
385 reactor_->Unregister(reactable1);
386
387 reactor_->Stop();
388 reactor_thread.join();
389 }
390
TEST_F(ReactorTest,start_and_stop_multi_times)391 TEST_F(ReactorTest, start_and_stop_multi_times) {
392 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
393 reactor_->Stop();
394 reactor_thread.join();
395 for (int i = 0; i < 5; i++) {
396 reactor_thread = std::thread(&Reactor::Run, reactor_);
397 reactor_->Stop();
398 reactor_thread.join();
399 }
400 }
401
TEST_F(ReactorTest,on_write_ready)402 TEST_F(ReactorTest, on_write_ready) {
403 FakeReactable fake_reactable;
404 auto* reactable = reactor_->Register(
405 fake_reactable.fd_, common::Closure(), Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
406 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
407 uint64_t value = 0;
408 auto read_result = eventfd_read(fake_reactable.fd_, &value);
409 EXPECT_EQ(read_result, 0);
410 EXPECT_EQ(value, FakeReactable::kSampleOutputValue);
411
412 reactor_->Stop();
413 reactor_thread.join();
414
415 reactor_->Unregister(reactable);
416 }
417
TEST_F(ReactorTest,modify_registration)418 TEST_F(ReactorTest, modify_registration) {
419 FakeReactable fake_reactable;
420 auto* reactable = reactor_->Register(
421 fake_reactable.fd_,
422 Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)),
423 Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable)));
424
425 auto reactor_thread = std::thread(&Reactor::Run, reactor_);
426
427 using namespace std::chrono_literals;
428 auto future = g_promise->get_future();
429
430 auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
431 ASSERT_EQ(write_result, 0);
432 ASSERT_EQ(future.wait_for(10ms), std::future_status::ready);
433 ASSERT_EQ(future.get(), kReadReadyValue);
434
435 /* Disable on_read callback */
436 reactor_->ModifyRegistration(reactable, Reactor::REACT_ON_WRITE_ONLY);
437
438 delete g_promise;
439 g_promise = new std::promise<int>;
440 future = g_promise->get_future();
441
442 write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
443 ASSERT_EQ(write_result, 0);
444 ASSERT_NE(future.wait_for(10ms), std::future_status::ready);
445
446 reactor_->Stop();
447 reactor_thread.join();
448
449 reactor_->Unregister(reactable);
450 }
451
452 } // namespace
453 } // namespace os
454 } // namespace bluetooth
455