1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_epoll.h"
6
7 #include <fcntl.h>
8 #include <sys/socket.h>
9 #include <unistd.h>
10
11 #include <memory>
12 #include <string_view>
13 #include <utility>
14
15 #include "base/containers/span.h"
16 #include "base/files/file_util.h"
17 #include "base/files/scoped_file.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/logging.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/raw_ref.h"
24 #include "base/message_loop/message_pump_type.h"
25 #include "base/posix/eintr_wrapper.h"
26 #include "base/run_loop.h"
27 #include "base/synchronization/waitable_event.h"
28 #include "base/synchronization/waitable_event_watcher.h"
29 #include "base/task/current_thread.h"
30 #include "base/task/sequenced_task_runner.h"
31 #include "base/task/single_thread_task_executor.h"
32 #include "base/task/single_thread_task_runner.h"
33 #include "base/test/gtest_util.h"
34 #include "base/test/scoped_feature_list.h"
35 #include "base/test/task_environment.h"
36 #include "base/threading/thread.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38
39 namespace base {
40
41 class MessagePumpEpollTest : public testing::Test {
42 public:
MessagePumpEpollTest()43 MessagePumpEpollTest()
44 : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
45 test::SingleThreadTaskEnvironment::MainThreadType::UI)),
46 io_thread_("MessagePumpEpollTestIOThread") {}
47 ~MessagePumpEpollTest() override = default;
48
receiver() const49 int receiver() const { return receiver_.get(); }
sender() const50 int sender() const { return sender_.get(); }
51
io_runner() const52 scoped_refptr<SingleThreadTaskRunner> io_runner() const {
53 return io_thread_.task_runner();
54 }
55
ClearNotifications()56 void ClearNotifications() {
57 int unused;
58 while (read(receiver_.get(), &unused, sizeof(unused)) == sizeof(unused)) {
59 }
60 }
61
Notify()62 void Notify() {
63 const int data = 42;
64 PCHECK(write(sender_.get(), &data, sizeof(data)) == sizeof(data));
65 }
66
67 protected:
SetUp()68 void SetUp() override {
69 Thread::Options options(MessagePumpType::IO, 0);
70 ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
71 int fds[2];
72 int rv = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
73 CHECK_EQ(rv, 0);
74 PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
75 receiver_ = base::ScopedFD(fds[0]);
76 sender_ = base::ScopedFD(fds[1]);
77 }
78
TearDown()79 void TearDown() override {
80 // Some tests watch `receiver_` from the `io_thread_`. The `io_thread_` must
81 // thus be joined to ensure those watches are complete before closing the
82 // sockets.
83 io_thread_.Stop();
84 }
85
SimulateIOEvent(MessagePumpEpoll * pump,MessagePumpEpoll::FdWatchController * controller)86 void SimulateIOEvent(MessagePumpEpoll* pump,
87 MessagePumpEpoll::FdWatchController* controller) {
88 pump->HandleEvent(0, /*can_read=*/true, /*can_write=*/true, controller);
89 }
90
91 static constexpr char null_byte_ = 0;
92 std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
93
94 private:
95 Thread io_thread_;
96 base::ScopedFD receiver_;
97 base::ScopedFD sender_;
98 };
99
100 namespace {
101
TEST_F(MessagePumpEpollTest,QuitOutsideOfRun)102 TEST_F(MessagePumpEpollTest, QuitOutsideOfRun) {
103 auto pump = std::make_unique<MessagePumpEpoll>();
104 ASSERT_DCHECK_DEATH(pump->Quit());
105 }
106
107 class BaseWatcher : public MessagePumpEpoll::FdWatcher {
108 public:
109 BaseWatcher() = default;
110 ~BaseWatcher() override = default;
111
112 // base:MessagePumpEpoll::FdWatcher interface
OnFileCanReadWithoutBlocking(int)113 void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
OnFileCanWriteWithoutBlocking(int)114 void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
115 };
116
117 class DeleteWatcher : public BaseWatcher {
118 public:
DeleteWatcher(std::unique_ptr<MessagePumpEpoll::FdWatchController> controller)119 explicit DeleteWatcher(
120 std::unique_ptr<MessagePumpEpoll::FdWatchController> controller)
121 : controller_(std::move(controller)) {}
122
~DeleteWatcher()123 ~DeleteWatcher() override { DCHECK(!controller_); }
124
controller()125 MessagePumpEpoll::FdWatchController* controller() {
126 return controller_.get();
127 }
128
OnFileCanWriteWithoutBlocking(int)129 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
130 DCHECK(controller_);
131 controller_.reset();
132 }
133
134 private:
135 std::unique_ptr<MessagePumpEpoll::FdWatchController> controller_;
136 };
137
TEST_F(MessagePumpEpollTest,DeleteWatcher)138 TEST_F(MessagePumpEpollTest, DeleteWatcher) {
139 DeleteWatcher delegate(
140 std::make_unique<MessagePumpEpoll::FdWatchController>(FROM_HERE));
141 auto pump = std::make_unique<MessagePumpEpoll>();
142 pump->WatchFileDescriptor(receiver(), false,
143 MessagePumpEpoll::WATCH_READ_WRITE,
144 delegate.controller(), &delegate);
145 SimulateIOEvent(pump.get(), delegate.controller());
146 }
147
148 class StopWatcher : public BaseWatcher {
149 public:
StopWatcher(MessagePumpEpoll::FdWatchController * controller)150 explicit StopWatcher(MessagePumpEpoll::FdWatchController* controller)
151 : controller_(controller) {}
152
153 ~StopWatcher() override = default;
154
OnFileCanWriteWithoutBlocking(int)155 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
156 controller_->StopWatchingFileDescriptor();
157 }
158
159 private:
160 raw_ptr<MessagePumpEpoll::FdWatchController> controller_ = nullptr;
161 };
162
TEST_F(MessagePumpEpollTest,StopWatcher)163 TEST_F(MessagePumpEpollTest, StopWatcher) {
164 auto pump = std::make_unique<MessagePumpEpoll>();
165 MessagePumpEpoll::FdWatchController controller(FROM_HERE);
166 StopWatcher delegate(&controller);
167 pump->WatchFileDescriptor(receiver(), false,
168 MessagePumpEpoll::WATCH_READ_WRITE, &controller,
169 &delegate);
170 SimulateIOEvent(pump.get(), &controller);
171 }
172
QuitMessageLoopAndStart(OnceClosure quit_closure)173 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
174 std::move(quit_closure).Run();
175
176 RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
177 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
178 runloop.QuitClosure());
179 runloop.Run();
180 }
181
182 class NestedPumpWatcher : public MessagePumpEpoll::FdWatcher {
183 public:
184 NestedPumpWatcher() = default;
185 ~NestedPumpWatcher() override = default;
186
OnFileCanReadWithoutBlocking(int)187 void OnFileCanReadWithoutBlocking(int /* fd */) override {
188 RunLoop runloop;
189 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
190 FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
191 runloop.Run();
192 }
193
OnFileCanWriteWithoutBlocking(int)194 void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
195 };
196
TEST_F(MessagePumpEpollTest,NestedPumpWatcher)197 TEST_F(MessagePumpEpollTest, NestedPumpWatcher) {
198 NestedPumpWatcher delegate;
199 auto pump = std::make_unique<MessagePumpEpoll>();
200 MessagePumpEpoll::FdWatchController controller(FROM_HERE);
201 pump->WatchFileDescriptor(receiver(), false, MessagePumpEpoll::WATCH_READ,
202 &controller, &delegate);
203 SimulateIOEvent(pump.get(), &controller);
204 }
205
FatalClosure()206 void FatalClosure() {
207 FAIL() << "Reached fatal closure.";
208 }
209
210 class QuitWatcher : public BaseWatcher {
211 public:
QuitWatcher(base::OnceClosure quit_closure)212 QuitWatcher(base::OnceClosure quit_closure)
213 : quit_closure_(std::move(quit_closure)) {}
214
OnFileCanReadWithoutBlocking(int)215 void OnFileCanReadWithoutBlocking(int /* fd */) override {
216 // Post a fatal closure to the MessageLoop before we quit it.
217 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
218 FROM_HERE, BindOnce(&FatalClosure));
219
220 if (quit_closure_) {
221 std::move(quit_closure_).Run();
222 }
223 }
224
225 private:
226 base::OnceClosure quit_closure_;
227 };
228
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)229 void WriteFDWrapper(const int fd,
230 const char* buf,
231 int size,
232 WaitableEvent* event) {
233 ASSERT_TRUE(WriteFileDescriptor(fd, std::string_view(buf, size)));
234 }
235
236 // Tests that MessagePumpEpoll quits immediately when it is quit from
237 // within a wakeup.
TEST_F(MessagePumpEpollTest,QuitWatcher)238 TEST_F(MessagePumpEpollTest, QuitWatcher) {
239 // Delete the old TaskEnvironment so that we can manage our own one here.
240 task_environment_.reset();
241
242 auto executor_pump = std::make_unique<MessagePumpEpoll>();
243 MessagePumpEpoll* pump = executor_pump.get();
244 SingleThreadTaskExecutor executor(std::move(executor_pump));
245 RunLoop run_loop;
246 QuitWatcher delegate(run_loop.QuitClosure());
247 MessagePumpEpoll::FdWatchController controller(FROM_HERE);
248 WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
249 WaitableEvent::InitialState::NOT_SIGNALED);
250 std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
251
252 // Tell the pump to watch the pipe.
253 pump->WatchFileDescriptor(receiver(), false, MessagePumpEpoll::WATCH_READ,
254 &controller, &delegate);
255
256 // Make the IO thread wait for |event| before writing to sender().
257 WaitableEventWatcher::EventCallback write_fd_task =
258 BindOnce(&WriteFDWrapper, sender(), &null_byte_, 1);
259 io_runner()->PostTask(
260 FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
261 Unretained(watcher.get()), &event,
262 std::move(write_fd_task), io_runner()));
263
264 // Queue |event| to signal on |sequence_manager|.
265 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
266 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
267
268 // Now run the MessageLoop.
269 run_loop.Run();
270
271 // StartWatching can move |watcher| to IO thread. Release on IO thread.
272 io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
273 Owned(watcher.release())));
274 }
275
276 class InnerNestedWatcher : public MessagePumpEpoll::FdWatcher {
277 public:
InnerNestedWatcher(MessagePumpEpollTest & test,MessagePumpEpoll::FdWatchController & outer_controller,base::OnceClosure callback)278 InnerNestedWatcher(MessagePumpEpollTest& test,
279 MessagePumpEpoll::FdWatchController& outer_controller,
280 base::OnceClosure callback)
281 : test_(test),
282 outer_controller_(outer_controller),
283 callback_(std::move(callback)) {
284 base::CurrentIOThread::Get().WatchFileDescriptor(
285 test_->receiver(), false, MessagePumpEpoll::WATCH_READ, &controller_,
286 this);
287 }
288 ~InnerNestedWatcher() override = default;
289
OnFileCanReadWithoutBlocking(int)290 void OnFileCanReadWithoutBlocking(int) override {
291 // Cancelling the outer watch from within this inner event handler must be
292 // safe.
293 outer_controller_->StopWatchingFileDescriptor();
294 std::move(callback_).Run();
295 }
296
OnFileCanWriteWithoutBlocking(int)297 void OnFileCanWriteWithoutBlocking(int) override {}
298
299 private:
300 const raw_ref<MessagePumpEpollTest> test_;
301 const raw_ref<MessagePumpEpoll::FdWatchController> outer_controller_;
302 base::OnceClosure callback_;
303 MessagePumpEpoll::FdWatchController controller_{FROM_HERE};
304 };
305
306 class OuterNestedWatcher : public MessagePumpEpoll::FdWatcher {
307 public:
OuterNestedWatcher(MessagePumpEpollTest & test,base::OnceClosure callback)308 OuterNestedWatcher(MessagePumpEpollTest& test, base::OnceClosure callback)
309 : test_(test), callback_(std::move(callback)) {
310 base::RunLoop loop;
311 test_->io_runner()->PostTask(
312 FROM_HERE, base::BindOnce(&OuterNestedWatcher::InitOnIOThread,
313 base::Unretained(this), loop.QuitClosure()));
314 loop.Run();
315 }
316
317 ~OuterNestedWatcher() override = default;
318
OnFileCanReadWithoutBlocking(int)319 void OnFileCanReadWithoutBlocking(int) override {
320 // Ensure that another notification will wake any active FdWatcher.
321 test_->ClearNotifications();
322
323 base::RunLoop loop;
324 std::unique_ptr<InnerNestedWatcher> inner_watcher =
325 std::make_unique<InnerNestedWatcher>(test_.get(), *controller_,
326 loop.QuitClosure());
327 test_->Notify();
328 loop.Run();
329
330 // Ensure that `InnerNestedWatcher` is destroyed before
331 // `OuterNestedWatcher`.
332 inner_watcher.reset();
333 controller_.reset();
334 std::move(callback_).Run();
335 }
336
OnFileCanWriteWithoutBlocking(int)337 void OnFileCanWriteWithoutBlocking(int) override {}
338
339 private:
InitOnIOThread(base::OnceClosure ready_callback)340 void InitOnIOThread(base::OnceClosure ready_callback) {
341 controller_ =
342 std::make_unique<MessagePumpEpoll::FdWatchController>(FROM_HERE);
343 base::CurrentIOThread::Get().WatchFileDescriptor(
344 test_->receiver(), false, MessagePumpEpoll::WATCH_READ,
345 controller_.get(), this);
346 std::move(ready_callback).Run();
347 }
348
349 const raw_ref<MessagePumpEpollTest> test_;
350 base::OnceClosure callback_;
351 std::unique_ptr<MessagePumpEpoll::FdWatchController> controller_;
352 };
353
TEST_F(MessagePumpEpollTest,NestedNotification)354 TEST_F(MessagePumpEpollTest, NestedNotification) {
355 // Regression test for https://crbug.com/1469529. Verifies that it's safe for
356 // a nested RunLoop to stop watching a file descriptor while the outer RunLoop
357 // is handling an event for the same descriptor.
358 base::RunLoop loop;
359 OuterNestedWatcher watcher(*this, loop.QuitClosure());
360 Notify();
361 loop.Run();
362 }
363
364 class RepeatWatcher : public BaseWatcher {
365 public:
RepeatWatcher(MessagePumpEpoll * pump,int fd,MessagePumpForIO::Mode mode,bool persistent,int repeat)366 RepeatWatcher(MessagePumpEpoll* pump,
367 int fd,
368 MessagePumpForIO::Mode mode,
369 bool persistent,
370 int repeat)
371 : pump_(pump),
372 fd_(fd),
373 mode_(mode),
374 persistent_(persistent),
375 repeat_(repeat),
376 fd_watch_controller_(FROM_HERE) {}
377
~RepeatWatcher()378 ~RepeatWatcher() override { EXPECT_EQ(repeat_, 0); }
379
StartWatching()380 void StartWatching() {
381 const bool watch_success = pump_->WatchFileDescriptor(
382 fd_, persistent_, mode_, &fd_watch_controller_, this);
383 ASSERT_TRUE(watch_success);
384 }
385
OnFileCanReadWithoutBlocking(int fd)386 void OnFileCanReadWithoutBlocking(int fd) override {
387 EXPECT_EQ(fd_, fd);
388 EXPECT_GT(repeat_, 0);
389
390 --repeat_;
391
392 if (persistent_) {
393 if (repeat_ == 0) {
394 // Need to stop watching the fd explicitly if it's configured as
395 // persistent.
396 fd_watch_controller_.StopWatchingFileDescriptor();
397 }
398 } else {
399 if (repeat_ > 0) {
400 // Need to restart watching the fd explicitly if it's not configured as
401 // persistent.
402 StartWatching();
403 }
404 }
405 }
406
407 private:
408 raw_ptr<MessagePumpEpoll> pump_;
409 int fd_;
410 MessagePumpForIO::Mode mode_;
411 bool persistent_;
412 int repeat_;
413 MessagePumpEpoll::FdWatchController fd_watch_controller_;
414 };
415
RepeatEventTest(bool persistent,int repeat,std::unique_ptr<MessagePumpEpoll> executor_pump,int sender,int receiver)416 void RepeatEventTest(bool persistent,
417 int repeat,
418 std::unique_ptr<MessagePumpEpoll> executor_pump,
419 int sender,
420 int receiver) {
421 MessagePumpEpoll* pump = executor_pump.get();
422 SingleThreadTaskExecutor executor(std::move(executor_pump));
423 RunLoop run_loop;
424 RepeatWatcher delegate(pump, receiver, MessagePumpEpoll::WATCH_READ,
425 persistent, repeat);
426
427 delegate.StartWatching();
428
429 const char null = 0;
430 ASSERT_TRUE(WriteFileDescriptor(sender, std::string_view(&null, 1)));
431
432 // The RunLoop must go to the idle state after the callback is called the
433 // number of times specified by `repeat`.
434 run_loop.RunUntilIdle();
435 }
436
437 // Tests that MessagePumpEpoll calls FdWatcher's callback repeatedly when
438 // it's configured as persistent.
TEST_F(MessagePumpEpollTest,RepeatPersistentEvent)439 TEST_F(MessagePumpEpollTest, RepeatPersistentEvent) {
440 // Delete the old TaskEnvironment so that we can manage our own one here.
441 task_environment_.reset();
442
443 RepeatEventTest(/* persistent= */ true, /* repeat= */ 3,
444 std::make_unique<MessagePumpEpoll>(), sender(), receiver());
445 }
446
447 // Tests that MessagePumpEpoll calls FdWatcher's callback repeatedly when it's
448 // not configured as persistent but reconfigured in the callback.
TEST_F(MessagePumpEpollTest,RepeatOneShotEvent)449 TEST_F(MessagePumpEpollTest, RepeatOneShotEvent) {
450 // Delete the old TaskEnvironment so that we can manage our own one here.
451 task_environment_.reset();
452
453 RepeatEventTest(/* persistent= */ false, /* repeat= */ 3,
454 std::make_unique<MessagePumpEpoll>(), sender(), receiver());
455 }
456
457 } // namespace
458 } // namespace base
459