• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <unistd.h>
8 
9 #include <memory>
10 #include <utility>
11 
12 #include "base/containers/span.h"
13 #include "base/files/file_util.h"
14 #include "base/functional/bind.h"
15 #include "base/functional/callback_helpers.h"
16 #include "base/logging.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/message_loop/message_pump_buildflags.h"
20 #include "base/message_loop/message_pump_type.h"
21 #include "base/posix/eintr_wrapper.h"
22 #include "base/run_loop.h"
23 #include "base/synchronization/waitable_event.h"
24 #include "base/synchronization/waitable_event_watcher.h"
25 #include "base/task/sequenced_task_runner.h"
26 #include "base/task/single_thread_task_executor.h"
27 #include "base/task/single_thread_task_runner.h"
28 #include "base/test/gtest_util.h"
29 #include "base/test/task_environment.h"
30 #include "base/threading/thread.h"
31 #include "build/build_config.h"
32 #include "testing/gtest/include/gtest/gtest.h"
33 #include "third_party/libevent/event.h"
34 
35 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
36 #include "base/message_loop/message_pump_epoll.h"
37 #endif
38 
39 namespace base {
40 
41 enum PumpType {
42   kLibevent,
43   kEpoll,
44 };
45 
46 class MessagePumpLibeventTest : public testing::Test,
47                                 public testing::WithParamInterface<PumpType> {
48  protected:
MessagePumpLibeventTest()49   MessagePumpLibeventTest()
50       : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
51             test::SingleThreadTaskEnvironment::MainThreadType::UI)),
52         io_thread_("MessagePumpLibeventTestIOThread") {}
53   ~MessagePumpLibeventTest() override = default;
54 
SetUp()55   void SetUp() override {
56     Thread::Options options(MessagePumpType::IO, 0);
57     ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
58     int ret = pipe(pipefds_);
59     ASSERT_EQ(0, ret);
60   }
61 
TearDown()62   void TearDown() override {
63     // Some tests watch |pipefds_| from the |io_thread_|. The |io_thread_| must
64     // thus be joined to ensure those watches are complete before closing the
65     // pipe.
66     io_thread_.Stop();
67 
68     if (IGNORE_EINTR(close(pipefds_[0])) < 0)
69       PLOG(ERROR) << "close";
70     if (IGNORE_EINTR(close(pipefds_[1])) < 0)
71       PLOG(ERROR) << "close";
72   }
73 
CreateMessagePump()74   std::unique_ptr<MessagePumpLibevent> CreateMessagePump() {
75 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
76     if (GetParam() == kEpoll) {
77       return std::make_unique<MessagePumpLibevent>(
78           MessagePumpLibevent::kUseEpoll);
79     }
80 #endif
81     return std::make_unique<MessagePumpLibevent>();
82   }
83 
io_runner() const84   scoped_refptr<SingleThreadTaskRunner> io_runner() const {
85     return io_thread_.task_runner();
86   }
87 
SimulateIOEvent(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)88   void SimulateIOEvent(MessagePumpLibevent* pump,
89                        MessagePumpLibevent::FdWatchController* controller) {
90 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
91     if (GetParam() == kEpoll) {
92       pump->epoll_pump_->HandleEvent(0, /*can_read=*/true, /*can_write=*/true,
93                                      controller);
94       return;
95     }
96 #endif
97     pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
98   }
99 
100   int pipefds_[2];
101   static constexpr char null_byte_ = 0;
102   std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
103 
104  private:
105   Thread io_thread_;
106 };
107 
108 namespace {
109 
110 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
111 // nothing useful.
112 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
113  public:
114   ~StupidWatcher() override = default;
115 
116   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)117   void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)118   void OnFileCanWriteWithoutBlocking(int fd) override {}
119 };
120 
TEST_P(MessagePumpLibeventTest,QuitOutsideOfRun)121 TEST_P(MessagePumpLibeventTest, QuitOutsideOfRun) {
122   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
123   ASSERT_DCHECK_DEATH(pump->Quit());
124 }
125 
126 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
127  public:
128   BaseWatcher() = default;
129   ~BaseWatcher() override = default;
130 
131   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)132   void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
133 
OnFileCanWriteWithoutBlocking(int)134   void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
135 };
136 
137 class DeleteWatcher : public BaseWatcher {
138  public:
DeleteWatcher(std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)139   explicit DeleteWatcher(
140       std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)
141       : controller_(std::move(controller)) {}
142 
~DeleteWatcher()143   ~DeleteWatcher() override { DCHECK(!controller_); }
144 
controller()145   MessagePumpLibevent::FdWatchController* controller() {
146     return controller_.get();
147   }
148 
OnFileCanWriteWithoutBlocking(int)149   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
150     DCHECK(controller_);
151     controller_.reset();
152   }
153 
154  private:
155   std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
156 };
157 
TEST_P(MessagePumpLibeventTest,DeleteWatcher)158 TEST_P(MessagePumpLibeventTest, DeleteWatcher) {
159   DeleteWatcher delegate(
160       std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE));
161   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
162   pump->WatchFileDescriptor(pipefds_[1], false,
163                             MessagePumpLibevent::WATCH_READ_WRITE,
164                             delegate.controller(), &delegate);
165   SimulateIOEvent(pump.get(), delegate.controller());
166 }
167 
168 class StopWatcher : public BaseWatcher {
169  public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)170   explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
171       : controller_(controller) {}
172 
173   ~StopWatcher() override = default;
174 
OnFileCanWriteWithoutBlocking(int)175   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
176     controller_->StopWatchingFileDescriptor();
177   }
178 
179  private:
180   raw_ptr<MessagePumpLibevent::FdWatchController> controller_ = nullptr;
181 };
182 
TEST_P(MessagePumpLibeventTest,StopWatcher)183 TEST_P(MessagePumpLibeventTest, StopWatcher) {
184   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
185   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
186   StopWatcher delegate(&controller);
187   pump->WatchFileDescriptor(pipefds_[1], false,
188                             MessagePumpLibevent::WATCH_READ_WRITE, &controller,
189                             &delegate);
190   SimulateIOEvent(pump.get(), &controller);
191 }
192 
QuitMessageLoopAndStart(OnceClosure quit_closure)193 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
194   std::move(quit_closure).Run();
195 
196   RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
197   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
198                                                         runloop.QuitClosure());
199   runloop.Run();
200 }
201 
202 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
203  public:
204   NestedPumpWatcher() = default;
205   ~NestedPumpWatcher() override = default;
206 
OnFileCanReadWithoutBlocking(int)207   void OnFileCanReadWithoutBlocking(int /* fd */) override {
208     RunLoop runloop;
209     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
210         FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
211     runloop.Run();
212   }
213 
OnFileCanWriteWithoutBlocking(int)214   void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
215 };
216 
TEST_P(MessagePumpLibeventTest,NestedPumpWatcher)217 TEST_P(MessagePumpLibeventTest, NestedPumpWatcher) {
218   NestedPumpWatcher delegate;
219   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
220   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
221   pump->WatchFileDescriptor(pipefds_[1], false, MessagePumpLibevent::WATCH_READ,
222                             &controller, &delegate);
223   SimulateIOEvent(pump.get(), &controller);
224 }
225 
FatalClosure()226 void FatalClosure() {
227   FAIL() << "Reached fatal closure.";
228 }
229 
230 class QuitWatcher : public BaseWatcher {
231  public:
QuitWatcher(base::OnceClosure quit_closure)232   QuitWatcher(base::OnceClosure quit_closure)
233       : quit_closure_(std::move(quit_closure)) {}
234 
OnFileCanReadWithoutBlocking(int)235   void OnFileCanReadWithoutBlocking(int /* fd */) override {
236     // Post a fatal closure to the MessageLoop before we quit it.
237     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
238         FROM_HERE, BindOnce(&FatalClosure));
239 
240     if (quit_closure_)
241       std::move(quit_closure_).Run();
242   }
243 
244  private:
245   base::OnceClosure quit_closure_;
246 };
247 
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)248 void WriteFDWrapper(const int fd,
249                     const char* buf,
250                     int size,
251                     WaitableEvent* event) {
252   ASSERT_TRUE(WriteFileDescriptor(fd, StringPiece(buf, size)));
253 }
254 
255 // Tests that MessagePumpLibevent quits immediately when it is quit from
256 // libevent's event_base_loop().
TEST_P(MessagePumpLibeventTest,QuitWatcher)257 TEST_P(MessagePumpLibeventTest, QuitWatcher) {
258   // Delete the old TaskEnvironment so that we can manage our own one here.
259   task_environment_.reset();
260 
261   std::unique_ptr<MessagePumpLibevent> executor_pump = CreateMessagePump();
262   MessagePumpLibevent* pump = executor_pump.get();
263   SingleThreadTaskExecutor executor(std::move(executor_pump));
264   RunLoop run_loop;
265   QuitWatcher delegate(run_loop.QuitClosure());
266   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
267   WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
268                       WaitableEvent::InitialState::NOT_SIGNALED);
269   std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
270 
271   // Tell the pump to watch the pipe.
272   pump->WatchFileDescriptor(pipefds_[0], false, MessagePumpLibevent::WATCH_READ,
273                             &controller, &delegate);
274 
275   // Make the IO thread wait for |event| before writing to pipefds[1].
276   WaitableEventWatcher::EventCallback write_fd_task =
277       BindOnce(&WriteFDWrapper, pipefds_[1], &null_byte_, 1);
278   io_runner()->PostTask(
279       FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
280                           Unretained(watcher.get()), &event,
281                           std::move(write_fd_task), io_runner()));
282 
283   // Queue |event| to signal on |sequence_manager|.
284   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
285       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
286 
287   // Now run the MessageLoop.
288   run_loop.Run();
289 
290   // StartWatching can move |watcher| to IO thread. Release on IO thread.
291   io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
292                                             Owned(watcher.release())));
293 }
294 
295 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
296 #define TEST_PARAM_VALUES kLibevent, kEpoll
297 #else
298 #define TEST_PARAM_VALUES kLibevent
299 #endif
300 
301 INSTANTIATE_TEST_SUITE_P(,
302                          MessagePumpLibeventTest,
303                          ::testing::Values(TEST_PARAM_VALUES));
304 
305 }  // namespace
306 
307 }  // namespace base
308