• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fdevent.h"
18 
19 #include <gtest/gtest.h>
20 
21 #include <unistd.h>
22 #include <chrono>
23 #include <limits>
24 #include <memory>
25 #include <queue>
26 #include <string>
27 #include <thread>
28 #include <vector>
29 
30 #include <android-base/threads.h>
31 
32 #include "adb_io.h"
33 #include "fdevent_test.h"
34 
35 using namespace std::chrono_literals;
36 
37 class FdHandler {
38   public:
FdHandler(int read_fd,int write_fd,bool use_new_callback)39     FdHandler(int read_fd, int write_fd, bool use_new_callback)
40         : read_fd_(read_fd), write_fd_(write_fd) {
41         if (use_new_callback) {
42             read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this);
43             write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this);
44         } else {
45             read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
46             write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
47         }
48         fdevent_add(read_fde_, FDE_READ);
49     }
50 
~FdHandler()51     ~FdHandler() {
52         fdevent_destroy(read_fde_);
53         fdevent_destroy(write_fde_);
54     }
55 
56   private:
FdEventCallback(int fd,unsigned events,void * userdata)57     static void FdEventCallback(int fd, unsigned events, void* userdata) {
58         FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
59         ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
60         if (events & FDE_READ) {
61             ASSERT_EQ(fd, handler->read_fd_);
62             char c;
63             ASSERT_EQ(1, adb_read(fd, &c, 1));
64             handler->queue_.push(c);
65             fdevent_add(handler->write_fde_, FDE_WRITE);
66         }
67         if (events & FDE_WRITE) {
68             ASSERT_EQ(fd, handler->write_fd_);
69             ASSERT_FALSE(handler->queue_.empty());
70             char c = handler->queue_.front();
71             handler->queue_.pop();
72             ASSERT_EQ(1, adb_write(fd, &c, 1));
73             if (handler->queue_.empty()) {
74                 fdevent_del(handler->write_fde_, FDE_WRITE);
75             }
76         }
77     }
78 
FdEventNewCallback(fdevent * fde,unsigned events,void * userdata)79     static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) {
80         int fd = fde->fd.get();
81         FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
82         ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
83         if (events & FDE_READ) {
84             ASSERT_EQ(fd, handler->read_fd_);
85             char c;
86             ASSERT_EQ(1, adb_read(fd, &c, 1));
87             handler->queue_.push(c);
88             fdevent_add(handler->write_fde_, FDE_WRITE);
89         }
90         if (events & FDE_WRITE) {
91             ASSERT_EQ(fd, handler->write_fd_);
92             ASSERT_FALSE(handler->queue_.empty());
93             char c = handler->queue_.front();
94             handler->queue_.pop();
95             ASSERT_EQ(1, adb_write(fd, &c, 1));
96             if (handler->queue_.empty()) {
97                 fdevent_del(handler->write_fde_, FDE_WRITE);
98             }
99         }
100     }
101 
102   private:
103     const int read_fd_;
104     const int write_fd_;
105     fdevent* read_fde_;
106     fdevent* write_fde_;
107     std::queue<char> queue_;
108 };
109 
110 struct ThreadArg {
111     int first_read_fd;
112     int last_write_fd;
113     size_t middle_pipe_count;
114 };
115 
TEST_F(FdeventTest,fdevent_terminate)116 TEST_F(FdeventTest, fdevent_terminate) {
117     PrepareThread();
118     TerminateThread();
119 }
120 
TEST_F(FdeventTest,smoke)121 TEST_F(FdeventTest, smoke) {
122     for (bool use_new_callback : {true, false}) {
123         fdevent_reset();
124         const size_t PIPE_COUNT = 512;
125         const size_t MESSAGE_LOOP_COUNT = 10;
126         const std::string MESSAGE = "fdevent_test";
127         int fd_pair1[2];
128         int fd_pair2[2];
129         ASSERT_EQ(0, adb_socketpair(fd_pair1));
130         ASSERT_EQ(0, adb_socketpair(fd_pair2));
131         ThreadArg thread_arg;
132         thread_arg.first_read_fd = fd_pair1[0];
133         thread_arg.last_write_fd = fd_pair2[1];
134         thread_arg.middle_pipe_count = PIPE_COUNT;
135         int writer = fd_pair1[1];
136         int reader = fd_pair2[0];
137 
138         PrepareThread();
139 
140         std::vector<std::unique_ptr<FdHandler>> fd_handlers;
141         fdevent_run_on_looper([&thread_arg, &fd_handlers, use_new_callback]() {
142             std::vector<int> read_fds;
143             std::vector<int> write_fds;
144 
145             read_fds.push_back(thread_arg.first_read_fd);
146             for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
147                 int fds[2];
148                 ASSERT_EQ(0, adb_socketpair(fds));
149                 read_fds.push_back(fds[0]);
150                 write_fds.push_back(fds[1]);
151             }
152             write_fds.push_back(thread_arg.last_write_fd);
153 
154             for (size_t i = 0; i < read_fds.size(); ++i) {
155                 fd_handlers.push_back(
156                         std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback));
157             }
158         });
159         WaitForFdeventLoop();
160 
161         for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
162             std::string read_buffer = MESSAGE;
163             std::string write_buffer(MESSAGE.size(), 'a');
164             ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
165             ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
166             ASSERT_EQ(read_buffer, write_buffer);
167         }
168 
169         fdevent_run_on_looper([&fd_handlers]() { fd_handlers.clear(); });
170         WaitForFdeventLoop();
171 
172         TerminateThread();
173         ASSERT_EQ(0, adb_close(writer));
174         ASSERT_EQ(0, adb_close(reader));
175     }
176 }
177 
TEST_F(FdeventTest,run_on_looper_thread_queued)178 TEST_F(FdeventTest, run_on_looper_thread_queued) {
179     std::vector<int> vec;
180 
181     PrepareThread();
182 
183     // Block the looper thread for a long time while we queue our callbacks.
184     fdevent_run_on_looper([]() {
185         fdevent_check_looper();
186         std::this_thread::sleep_for(std::chrono::seconds(1));
187     });
188 
189     for (int i = 0; i < 1000000; ++i) {
190         fdevent_run_on_looper([i, &vec]() {
191             fdevent_check_looper();
192             vec.push_back(i);
193         });
194     }
195 
196     TerminateThread();
197 
198     ASSERT_EQ(1000000u, vec.size());
199     for (int i = 0; i < 1000000; ++i) {
200         ASSERT_EQ(i, vec[i]);
201     }
202 }
203 
TEST_F(FdeventTest,run_on_looper_thread_reentrant)204 TEST_F(FdeventTest, run_on_looper_thread_reentrant) {
205     bool b = false;
206 
207     PrepareThread();
208 
209     fdevent_run_on_looper([&b]() {
210         fdevent_check_looper();
211         fdevent_run_on_looper([&b]() {
212             fdevent_check_looper();
213             b = true;
214         });
215     });
216 
217     TerminateThread();
218 
219     EXPECT_EQ(b, true);
220 }
221 
TEST_F(FdeventTest,timeout)222 TEST_F(FdeventTest, timeout) {
223     fdevent_reset();
224     PrepareThread();
225 
226     enum class TimeoutEvent {
227         read,
228         timeout,
229         done,
230     };
231 
232     struct TimeoutTest {
233         std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events;
234         fdevent* fde;
235     };
236     TimeoutTest test;
237 
238     int fds[2];
239     ASSERT_EQ(0, adb_socketpair(fds));
240     static constexpr auto delta = 100ms;
241     fdevent_run_on_looper([&]() {
242         test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) {
243             auto test = static_cast<TimeoutTest*>(arg);
244             auto now = std::chrono::steady_clock::now();
245             CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT));
246             TimeoutEvent event;
247             if ((events & FDE_READ)) {
248                 char buf[2];
249                 ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf));
250                 if (rc == 0) {
251                     event = TimeoutEvent::done;
252                 } else if (rc == 1) {
253                     event = TimeoutEvent::read;
254                 } else {
255                     abort();
256                 }
257             } else if ((events & FDE_TIMEOUT)) {
258                 event = TimeoutEvent::timeout;
259             } else {
260                 abort();
261             }
262 
263             CHECK_EQ(fde, test->fde);
264             test->events.emplace_back(event, now);
265 
266             if (event == TimeoutEvent::done) {
267                 fdevent_destroy(fde);
268             }
269         }, &test);
270         fdevent_add(test.fde, FDE_READ);
271         fdevent_set_timeout(test.fde, delta);
272     });
273 
274     ASSERT_EQ(1, adb_write(fds[1], "", 1));
275 
276     // Timeout should happen here
277     std::this_thread::sleep_for(delta);
278 
279     // and another.
280     std::this_thread::sleep_for(delta);
281 
282     // No timeout should happen here.
283     std::this_thread::sleep_for(delta / 2);
284     adb_close(fds[1]);
285 
286     TerminateThread();
287 
288     ASSERT_EQ(4ULL, test.events.size());
289     ASSERT_EQ(TimeoutEvent::read, test.events[0].first);
290     ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first);
291     ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first);
292     ASSERT_EQ(TimeoutEvent::done, test.events[3].first);
293 
294     std::vector<int> time_deltas;
295     for (size_t i = 0; i < test.events.size() - 1; ++i) {
296         auto before = test.events[i].second;
297         auto after = test.events[i + 1].second;
298         auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before);
299         time_deltas.push_back(diff.count());
300     }
301 
302     std::vector<int> expected = {
303         delta.count(),
304         delta.count(),
305         delta.count() / 2,
306     };
307 
308     std::vector<int> diff;
309     ASSERT_EQ(time_deltas.size(), expected.size());
310     for (size_t i = 0; i < time_deltas.size(); ++i) {
311         diff.push_back(std::abs(time_deltas[i] - expected[i]));
312     }
313 
314     ASSERT_LT(diff[0], delta.count() * 0.5);
315     ASSERT_LT(diff[1], delta.count() * 0.5);
316     ASSERT_LT(diff[2], delta.count() * 0.5);
317 }
318 
TEST_F(FdeventTest,unregister_with_pending_event)319 TEST_F(FdeventTest, unregister_with_pending_event) {
320     fdevent_reset();
321 
322     int fds1[2];
323     int fds2[2];
324     ASSERT_EQ(0, adb_socketpair(fds1));
325     ASSERT_EQ(0, adb_socketpair(fds2));
326 
327     struct Test {
328         fdevent* fde1;
329         fdevent* fde2;
330         bool should_not_happen;
331     };
332     Test test{};
333 
334     test.fde1 = fdevent_create(
335             fds1[0],
336             [](fdevent* fde, unsigned events, void* arg) {
337                 auto test = static_cast<Test*>(arg);
338                 // Unregister fde2 from inside the fde1 event
339                 fdevent_destroy(test->fde2);
340                 // Unregister fde1 so it doesn't get called again
341                 fdevent_destroy(test->fde1);
342             },
343             &test);
344 
345     test.fde2 = fdevent_create(
346             fds2[0],
347             [](fdevent* fde, unsigned events, void* arg) {
348                 auto test = static_cast<Test*>(arg);
349                 test->should_not_happen = true;
350             },
351             &test);
352 
353     fdevent_add(test.fde1, FDE_READ | FDE_ERROR);
354     fdevent_add(test.fde2, FDE_READ | FDE_ERROR);
355 
356     PrepareThread();
357     WaitForFdeventLoop();
358 
359     std::mutex m;
360     std::condition_variable cv;
361     bool main_thread_latch = false;
362     bool looper_thread_latch = false;
363 
364     fdevent_run_on_looper([&]() {
365         std::unique_lock lk(m);
366         // Notify the main thread that the looper is in this lambda
367         main_thread_latch = true;
368         cv.notify_one();
369         // Pause the looper to ensure both events occur in the same epoll_wait
370         cv.wait(lk, [&] { return looper_thread_latch; });
371     });
372 
373     // Wait for the looper thread to pause to ensure it is not in epoll_wait
374     {
375         std::unique_lock lk(m);
376         cv.wait(lk, [&] { return main_thread_latch; });
377     }
378 
379     // Write to one end of the sockets to trigger events on the other ends
380     adb_write(fds1[1], "a", 1);
381     adb_write(fds2[1], "a", 1);
382 
383     // Unpause the looper thread to let it loop back into epoll_wait, which should return
384     // both fde1 and fde2.
385     {
386         std::lock_guard lk(m);
387         looper_thread_latch = true;
388     }
389     cv.notify_one();
390 
391     WaitForFdeventLoop();
392     TerminateThread();
393 
394     adb_close(fds1[0]);
395     adb_close(fds1[1]);
396     adb_close(fds2[0]);
397     adb_close(fds2[1]);
398 
399     ASSERT_FALSE(test.should_not_happen);
400 }
401