• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_epoll.h"
6 
7 #include <fcntl.h>
8 #include <sys/socket.h>
9 #include <unistd.h>
10 
11 #include <memory>
12 #include <string_view>
13 #include <utility>
14 
15 #include "base/containers/span.h"
16 #include "base/files/file_util.h"
17 #include "base/files/scoped_file.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/logging.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/raw_ref.h"
24 #include "base/message_loop/message_pump_type.h"
25 #include "base/posix/eintr_wrapper.h"
26 #include "base/run_loop.h"
27 #include "base/synchronization/waitable_event.h"
28 #include "base/synchronization/waitable_event_watcher.h"
29 #include "base/task/current_thread.h"
30 #include "base/task/sequenced_task_runner.h"
31 #include "base/task/single_thread_task_executor.h"
32 #include "base/task/single_thread_task_runner.h"
33 #include "base/test/gtest_util.h"
34 #include "base/test/scoped_feature_list.h"
35 #include "base/test/task_environment.h"
36 #include "base/threading/thread.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38 
39 namespace base {
40 
41 class MessagePumpEpollTest : public testing::Test {
42  public:
MessagePumpEpollTest()43   MessagePumpEpollTest()
44       : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
45             test::SingleThreadTaskEnvironment::MainThreadType::UI)),
46         io_thread_("MessagePumpEpollTestIOThread") {}
47   ~MessagePumpEpollTest() override = default;
48 
receiver() const49   int receiver() const { return receiver_.get(); }
sender() const50   int sender() const { return sender_.get(); }
51 
io_runner() const52   scoped_refptr<SingleThreadTaskRunner> io_runner() const {
53     return io_thread_.task_runner();
54   }
55 
ClearNotifications()56   void ClearNotifications() {
57     int unused;
58     while (read(receiver_.get(), &unused, sizeof(unused)) == sizeof(unused)) {
59     }
60   }
61 
Notify()62   void Notify() {
63     const int data = 42;
64     PCHECK(write(sender_.get(), &data, sizeof(data)) == sizeof(data));
65   }
66 
67  protected:
SetUp()68   void SetUp() override {
69     Thread::Options options(MessagePumpType::IO, 0);
70     ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
71     int fds[2];
72     int rv = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
73     CHECK_EQ(rv, 0);
74     PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
75     receiver_ = base::ScopedFD(fds[0]);
76     sender_ = base::ScopedFD(fds[1]);
77   }
78 
TearDown()79   void TearDown() override {
80     // Some tests watch `receiver_` from the `io_thread_`. The `io_thread_` must
81     // thus be joined to ensure those watches are complete before closing the
82     // sockets.
83     io_thread_.Stop();
84   }
85 
SimulateIOEvent(MessagePumpEpoll * pump,MessagePumpEpoll::FdWatchController * controller)86   void SimulateIOEvent(MessagePumpEpoll* pump,
87                        MessagePumpEpoll::FdWatchController* controller) {
88     pump->HandleEvent(0, /*can_read=*/true, /*can_write=*/true, controller);
89   }
90 
91   static constexpr char null_byte_ = 0;
92   std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
93 
94  private:
95   Thread io_thread_;
96   base::ScopedFD receiver_;
97   base::ScopedFD sender_;
98 };
99 
100 namespace {
101 
TEST_F(MessagePumpEpollTest,QuitOutsideOfRun)102 TEST_F(MessagePumpEpollTest, QuitOutsideOfRun) {
103   auto pump = std::make_unique<MessagePumpEpoll>();
104   ASSERT_DCHECK_DEATH(pump->Quit());
105 }
106 
107 class BaseWatcher : public MessagePumpEpoll::FdWatcher {
108  public:
109   BaseWatcher() = default;
110   ~BaseWatcher() override = default;
111 
112   // base:MessagePumpEpoll::FdWatcher interface
OnFileCanReadWithoutBlocking(int)113   void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
OnFileCanWriteWithoutBlocking(int)114   void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
115 };
116 
117 class DeleteWatcher : public BaseWatcher {
118  public:
DeleteWatcher(std::unique_ptr<MessagePumpEpoll::FdWatchController> controller)119   explicit DeleteWatcher(
120       std::unique_ptr<MessagePumpEpoll::FdWatchController> controller)
121       : controller_(std::move(controller)) {}
122 
~DeleteWatcher()123   ~DeleteWatcher() override { DCHECK(!controller_); }
124 
controller()125   MessagePumpEpoll::FdWatchController* controller() {
126     return controller_.get();
127   }
128 
OnFileCanWriteWithoutBlocking(int)129   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
130     DCHECK(controller_);
131     controller_.reset();
132   }
133 
134  private:
135   std::unique_ptr<MessagePumpEpoll::FdWatchController> controller_;
136 };
137 
TEST_F(MessagePumpEpollTest,DeleteWatcher)138 TEST_F(MessagePumpEpollTest, DeleteWatcher) {
139   DeleteWatcher delegate(
140       std::make_unique<MessagePumpEpoll::FdWatchController>(FROM_HERE));
141   auto pump = std::make_unique<MessagePumpEpoll>();
142   pump->WatchFileDescriptor(receiver(), false,
143                             MessagePumpEpoll::WATCH_READ_WRITE,
144                             delegate.controller(), &delegate);
145   SimulateIOEvent(pump.get(), delegate.controller());
146 }
147 
148 class StopWatcher : public BaseWatcher {
149  public:
StopWatcher(MessagePumpEpoll::FdWatchController * controller)150   explicit StopWatcher(MessagePumpEpoll::FdWatchController* controller)
151       : controller_(controller) {}
152 
153   ~StopWatcher() override = default;
154 
OnFileCanWriteWithoutBlocking(int)155   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
156     controller_->StopWatchingFileDescriptor();
157   }
158 
159  private:
160   raw_ptr<MessagePumpEpoll::FdWatchController> controller_ = nullptr;
161 };
162 
TEST_F(MessagePumpEpollTest,StopWatcher)163 TEST_F(MessagePumpEpollTest, StopWatcher) {
164   auto pump = std::make_unique<MessagePumpEpoll>();
165   MessagePumpEpoll::FdWatchController controller(FROM_HERE);
166   StopWatcher delegate(&controller);
167   pump->WatchFileDescriptor(receiver(), false,
168                             MessagePumpEpoll::WATCH_READ_WRITE, &controller,
169                             &delegate);
170   SimulateIOEvent(pump.get(), &controller);
171 }
172 
QuitMessageLoopAndStart(OnceClosure quit_closure)173 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
174   std::move(quit_closure).Run();
175 
176   RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
177   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
178                                                         runloop.QuitClosure());
179   runloop.Run();
180 }
181 
182 class NestedPumpWatcher : public MessagePumpEpoll::FdWatcher {
183  public:
184   NestedPumpWatcher() = default;
185   ~NestedPumpWatcher() override = default;
186 
OnFileCanReadWithoutBlocking(int)187   void OnFileCanReadWithoutBlocking(int /* fd */) override {
188     RunLoop runloop;
189     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
190         FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
191     runloop.Run();
192   }
193 
OnFileCanWriteWithoutBlocking(int)194   void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
195 };
196 
TEST_F(MessagePumpEpollTest,NestedPumpWatcher)197 TEST_F(MessagePumpEpollTest, NestedPumpWatcher) {
198   NestedPumpWatcher delegate;
199   auto pump = std::make_unique<MessagePumpEpoll>();
200   MessagePumpEpoll::FdWatchController controller(FROM_HERE);
201   pump->WatchFileDescriptor(receiver(), false, MessagePumpEpoll::WATCH_READ,
202                             &controller, &delegate);
203   SimulateIOEvent(pump.get(), &controller);
204 }
205 
FatalClosure()206 void FatalClosure() {
207   FAIL() << "Reached fatal closure.";
208 }
209 
210 class QuitWatcher : public BaseWatcher {
211  public:
QuitWatcher(base::OnceClosure quit_closure)212   QuitWatcher(base::OnceClosure quit_closure)
213       : quit_closure_(std::move(quit_closure)) {}
214 
OnFileCanReadWithoutBlocking(int)215   void OnFileCanReadWithoutBlocking(int /* fd */) override {
216     // Post a fatal closure to the MessageLoop before we quit it.
217     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
218         FROM_HERE, BindOnce(&FatalClosure));
219 
220     if (quit_closure_) {
221       std::move(quit_closure_).Run();
222     }
223   }
224 
225  private:
226   base::OnceClosure quit_closure_;
227 };
228 
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)229 void WriteFDWrapper(const int fd,
230                     const char* buf,
231                     int size,
232                     WaitableEvent* event) {
233   ASSERT_TRUE(WriteFileDescriptor(fd, std::string_view(buf, size)));
234 }
235 
236 // Tests that MessagePumpEpoll quits immediately when it is quit from
237 // within a wakeup.
TEST_F(MessagePumpEpollTest,QuitWatcher)238 TEST_F(MessagePumpEpollTest, QuitWatcher) {
239   // Delete the old TaskEnvironment so that we can manage our own one here.
240   task_environment_.reset();
241 
242   auto executor_pump = std::make_unique<MessagePumpEpoll>();
243   MessagePumpEpoll* pump = executor_pump.get();
244   SingleThreadTaskExecutor executor(std::move(executor_pump));
245   RunLoop run_loop;
246   QuitWatcher delegate(run_loop.QuitClosure());
247   MessagePumpEpoll::FdWatchController controller(FROM_HERE);
248   WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
249                       WaitableEvent::InitialState::NOT_SIGNALED);
250   std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
251 
252   // Tell the pump to watch the pipe.
253   pump->WatchFileDescriptor(receiver(), false, MessagePumpEpoll::WATCH_READ,
254                             &controller, &delegate);
255 
256   // Make the IO thread wait for |event| before writing to sender().
257   WaitableEventWatcher::EventCallback write_fd_task =
258       BindOnce(&WriteFDWrapper, sender(), &null_byte_, 1);
259   io_runner()->PostTask(
260       FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
261                           Unretained(watcher.get()), &event,
262                           std::move(write_fd_task), io_runner()));
263 
264   // Queue |event| to signal on |sequence_manager|.
265   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
266       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
267 
268   // Now run the MessageLoop.
269   run_loop.Run();
270 
271   // StartWatching can move |watcher| to IO thread. Release on IO thread.
272   io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
273                                             Owned(watcher.release())));
274 }
275 
276 class InnerNestedWatcher : public MessagePumpEpoll::FdWatcher {
277  public:
InnerNestedWatcher(MessagePumpEpollTest & test,MessagePumpEpoll::FdWatchController & outer_controller,base::OnceClosure callback)278   InnerNestedWatcher(MessagePumpEpollTest& test,
279                      MessagePumpEpoll::FdWatchController& outer_controller,
280                      base::OnceClosure callback)
281       : test_(test),
282         outer_controller_(outer_controller),
283         callback_(std::move(callback)) {
284     base::CurrentIOThread::Get().WatchFileDescriptor(
285         test_->receiver(), false, MessagePumpEpoll::WATCH_READ, &controller_,
286         this);
287   }
288   ~InnerNestedWatcher() override = default;
289 
OnFileCanReadWithoutBlocking(int)290   void OnFileCanReadWithoutBlocking(int) override {
291     // Cancelling the outer watch from within this inner event handler must be
292     // safe.
293     outer_controller_->StopWatchingFileDescriptor();
294     std::move(callback_).Run();
295   }
296 
OnFileCanWriteWithoutBlocking(int)297   void OnFileCanWriteWithoutBlocking(int) override {}
298 
299  private:
300   const raw_ref<MessagePumpEpollTest> test_;
301   const raw_ref<MessagePumpEpoll::FdWatchController> outer_controller_;
302   base::OnceClosure callback_;
303   MessagePumpEpoll::FdWatchController controller_{FROM_HERE};
304 };
305 
306 class OuterNestedWatcher : public MessagePumpEpoll::FdWatcher {
307  public:
OuterNestedWatcher(MessagePumpEpollTest & test,base::OnceClosure callback)308   OuterNestedWatcher(MessagePumpEpollTest& test, base::OnceClosure callback)
309       : test_(test), callback_(std::move(callback)) {
310     base::RunLoop loop;
311     test_->io_runner()->PostTask(
312         FROM_HERE, base::BindOnce(&OuterNestedWatcher::InitOnIOThread,
313                                   base::Unretained(this), loop.QuitClosure()));
314     loop.Run();
315   }
316 
317   ~OuterNestedWatcher() override = default;
318 
OnFileCanReadWithoutBlocking(int)319   void OnFileCanReadWithoutBlocking(int) override {
320     // Ensure that another notification will wake any active FdWatcher.
321     test_->ClearNotifications();
322 
323     base::RunLoop loop;
324     std::unique_ptr<InnerNestedWatcher> inner_watcher =
325         std::make_unique<InnerNestedWatcher>(test_.get(), *controller_,
326                                              loop.QuitClosure());
327     test_->Notify();
328     loop.Run();
329 
330     // Ensure that `InnerNestedWatcher` is destroyed before
331     // `OuterNestedWatcher`.
332     inner_watcher.reset();
333     controller_.reset();
334     std::move(callback_).Run();
335   }
336 
OnFileCanWriteWithoutBlocking(int)337   void OnFileCanWriteWithoutBlocking(int) override {}
338 
339  private:
InitOnIOThread(base::OnceClosure ready_callback)340   void InitOnIOThread(base::OnceClosure ready_callback) {
341     controller_ =
342         std::make_unique<MessagePumpEpoll::FdWatchController>(FROM_HERE);
343     base::CurrentIOThread::Get().WatchFileDescriptor(
344         test_->receiver(), false, MessagePumpEpoll::WATCH_READ,
345         controller_.get(), this);
346     std::move(ready_callback).Run();
347   }
348 
349   const raw_ref<MessagePumpEpollTest> test_;
350   base::OnceClosure callback_;
351   std::unique_ptr<MessagePumpEpoll::FdWatchController> controller_;
352 };
353 
TEST_F(MessagePumpEpollTest,NestedNotification)354 TEST_F(MessagePumpEpollTest, NestedNotification) {
355   // Regression test for https://crbug.com/1469529. Verifies that it's safe for
356   // a nested RunLoop to stop watching a file descriptor while the outer RunLoop
357   // is handling an event for the same descriptor.
358   base::RunLoop loop;
359   OuterNestedWatcher watcher(*this, loop.QuitClosure());
360   Notify();
361   loop.Run();
362 }
363 
364 class RepeatWatcher : public BaseWatcher {
365  public:
RepeatWatcher(MessagePumpEpoll * pump,int fd,MessagePumpForIO::Mode mode,bool persistent,int repeat)366   RepeatWatcher(MessagePumpEpoll* pump,
367                 int fd,
368                 MessagePumpForIO::Mode mode,
369                 bool persistent,
370                 int repeat)
371       : pump_(pump),
372         fd_(fd),
373         mode_(mode),
374         persistent_(persistent),
375         repeat_(repeat),
376         fd_watch_controller_(FROM_HERE) {}
377 
~RepeatWatcher()378   ~RepeatWatcher() override { EXPECT_EQ(repeat_, 0); }
379 
StartWatching()380   void StartWatching() {
381     const bool watch_success = pump_->WatchFileDescriptor(
382         fd_, persistent_, mode_, &fd_watch_controller_, this);
383     ASSERT_TRUE(watch_success);
384   }
385 
OnFileCanReadWithoutBlocking(int fd)386   void OnFileCanReadWithoutBlocking(int fd) override {
387     EXPECT_EQ(fd_, fd);
388     EXPECT_GT(repeat_, 0);
389 
390     --repeat_;
391 
392     if (persistent_) {
393       if (repeat_ == 0) {
394         // Need to stop watching the fd explicitly if it's configured as
395         // persistent.
396         fd_watch_controller_.StopWatchingFileDescriptor();
397       }
398     } else {
399       if (repeat_ > 0) {
400         // Need to restart watching the fd explicitly if it's not configured as
401         // persistent.
402         StartWatching();
403       }
404     }
405   }
406 
407  private:
408   raw_ptr<MessagePumpEpoll> pump_;
409   int fd_;
410   MessagePumpForIO::Mode mode_;
411   bool persistent_;
412   int repeat_;
413   MessagePumpEpoll::FdWatchController fd_watch_controller_;
414 };
415 
RepeatEventTest(bool persistent,int repeat,std::unique_ptr<MessagePumpEpoll> executor_pump,int sender,int receiver)416 void RepeatEventTest(bool persistent,
417                      int repeat,
418                      std::unique_ptr<MessagePumpEpoll> executor_pump,
419                      int sender,
420                      int receiver) {
421   MessagePumpEpoll* pump = executor_pump.get();
422   SingleThreadTaskExecutor executor(std::move(executor_pump));
423   RunLoop run_loop;
424   RepeatWatcher delegate(pump, receiver, MessagePumpEpoll::WATCH_READ,
425                          persistent, repeat);
426 
427   delegate.StartWatching();
428 
429   const char null = 0;
430   ASSERT_TRUE(WriteFileDescriptor(sender, std::string_view(&null, 1)));
431 
432   // The RunLoop must go to the idle state after the callback is called the
433   // number of times specified by `repeat`.
434   run_loop.RunUntilIdle();
435 }
436 
437 // Tests that MessagePumpEpoll calls FdWatcher's callback repeatedly when
438 // it's configured as persistent.
TEST_F(MessagePumpEpollTest,RepeatPersistentEvent)439 TEST_F(MessagePumpEpollTest, RepeatPersistentEvent) {
440   // Delete the old TaskEnvironment so that we can manage our own one here.
441   task_environment_.reset();
442 
443   RepeatEventTest(/* persistent= */ true, /* repeat= */ 3,
444                   std::make_unique<MessagePumpEpoll>(), sender(), receiver());
445 }
446 
447 // Tests that MessagePumpEpoll calls FdWatcher's callback repeatedly when it's
448 // not configured as persistent but reconfigured in the callback.
TEST_F(MessagePumpEpollTest,RepeatOneShotEvent)449 TEST_F(MessagePumpEpollTest, RepeatOneShotEvent) {
450   // Delete the old TaskEnvironment so that we can manage our own one here.
451   task_environment_.reset();
452 
453   RepeatEventTest(/* persistent= */ false, /* repeat= */ 3,
454                   std::make_unique<MessagePumpEpoll>(), sender(), receiver());
455 }
456 
457 }  // namespace
458 }  // namespace base
459