1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_libevent.h"
6
7 #include <unistd.h>
8
9 #include <memory>
10 #include <utility>
11
12 #include "base/containers/span.h"
13 #include "base/files/file_util.h"
14 #include "base/functional/bind.h"
15 #include "base/functional/callback_helpers.h"
16 #include "base/logging.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/message_loop/message_pump_buildflags.h"
20 #include "base/message_loop/message_pump_type.h"
21 #include "base/posix/eintr_wrapper.h"
22 #include "base/run_loop.h"
23 #include "base/synchronization/waitable_event.h"
24 #include "base/synchronization/waitable_event_watcher.h"
25 #include "base/task/sequenced_task_runner.h"
26 #include "base/task/single_thread_task_executor.h"
27 #include "base/task/single_thread_task_runner.h"
28 #include "base/test/gtest_util.h"
29 #include "base/test/task_environment.h"
30 #include "base/threading/thread.h"
31 #include "build/build_config.h"
32 #include "testing/gtest/include/gtest/gtest.h"
33 #include "third_party/libevent/event.h"
34
35 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
36 #include "base/message_loop/message_pump_epoll.h"
37 #endif
38
39 namespace base {
40
41 enum PumpType {
42 kLibevent,
43 kEpoll,
44 };
45
46 class MessagePumpLibeventTest : public testing::Test,
47 public testing::WithParamInterface<PumpType> {
48 protected:
MessagePumpLibeventTest()49 MessagePumpLibeventTest()
50 : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
51 test::SingleThreadTaskEnvironment::MainThreadType::UI)),
52 io_thread_("MessagePumpLibeventTestIOThread") {}
53 ~MessagePumpLibeventTest() override = default;
54
SetUp()55 void SetUp() override {
56 Thread::Options options(MessagePumpType::IO, 0);
57 ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
58 int ret = pipe(pipefds_);
59 ASSERT_EQ(0, ret);
60 }
61
TearDown()62 void TearDown() override {
63 // Some tests watch |pipefds_| from the |io_thread_|. The |io_thread_| must
64 // thus be joined to ensure those watches are complete before closing the
65 // pipe.
66 io_thread_.Stop();
67
68 if (IGNORE_EINTR(close(pipefds_[0])) < 0)
69 PLOG(ERROR) << "close";
70 if (IGNORE_EINTR(close(pipefds_[1])) < 0)
71 PLOG(ERROR) << "close";
72 }
73
CreateMessagePump()74 std::unique_ptr<MessagePumpLibevent> CreateMessagePump() {
75 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
76 if (GetParam() == kEpoll) {
77 return std::make_unique<MessagePumpLibevent>(
78 MessagePumpLibevent::kUseEpoll);
79 }
80 #endif
81 return std::make_unique<MessagePumpLibevent>();
82 }
83
io_runner() const84 scoped_refptr<SingleThreadTaskRunner> io_runner() const {
85 return io_thread_.task_runner();
86 }
87
SimulateIOEvent(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)88 void SimulateIOEvent(MessagePumpLibevent* pump,
89 MessagePumpLibevent::FdWatchController* controller) {
90 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
91 if (GetParam() == kEpoll) {
92 pump->epoll_pump_->HandleEvent(0, /*can_read=*/true, /*can_write=*/true,
93 controller);
94 return;
95 }
96 #endif
97 pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
98 }
99
100 int pipefds_[2];
101 static constexpr char null_byte_ = 0;
102 std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
103
104 private:
105 Thread io_thread_;
106 };
107
108 namespace {
109
110 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
111 // nothing useful.
112 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
113 public:
114 ~StupidWatcher() override = default;
115
116 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)117 void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)118 void OnFileCanWriteWithoutBlocking(int fd) override {}
119 };
120
TEST_P(MessagePumpLibeventTest,QuitOutsideOfRun)121 TEST_P(MessagePumpLibeventTest, QuitOutsideOfRun) {
122 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
123 ASSERT_DCHECK_DEATH(pump->Quit());
124 }
125
126 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
127 public:
128 BaseWatcher() = default;
129 ~BaseWatcher() override = default;
130
131 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)132 void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
133
OnFileCanWriteWithoutBlocking(int)134 void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
135 };
136
137 class DeleteWatcher : public BaseWatcher {
138 public:
DeleteWatcher(std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)139 explicit DeleteWatcher(
140 std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)
141 : controller_(std::move(controller)) {}
142
~DeleteWatcher()143 ~DeleteWatcher() override { DCHECK(!controller_); }
144
controller()145 MessagePumpLibevent::FdWatchController* controller() {
146 return controller_.get();
147 }
148
OnFileCanWriteWithoutBlocking(int)149 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
150 DCHECK(controller_);
151 controller_.reset();
152 }
153
154 private:
155 std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
156 };
157
TEST_P(MessagePumpLibeventTest,DeleteWatcher)158 TEST_P(MessagePumpLibeventTest, DeleteWatcher) {
159 DeleteWatcher delegate(
160 std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE));
161 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
162 pump->WatchFileDescriptor(pipefds_[1], false,
163 MessagePumpLibevent::WATCH_READ_WRITE,
164 delegate.controller(), &delegate);
165 SimulateIOEvent(pump.get(), delegate.controller());
166 }
167
168 class StopWatcher : public BaseWatcher {
169 public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)170 explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
171 : controller_(controller) {}
172
173 ~StopWatcher() override = default;
174
OnFileCanWriteWithoutBlocking(int)175 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
176 controller_->StopWatchingFileDescriptor();
177 }
178
179 private:
180 raw_ptr<MessagePumpLibevent::FdWatchController> controller_ = nullptr;
181 };
182
TEST_P(MessagePumpLibeventTest,StopWatcher)183 TEST_P(MessagePumpLibeventTest, StopWatcher) {
184 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
185 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
186 StopWatcher delegate(&controller);
187 pump->WatchFileDescriptor(pipefds_[1], false,
188 MessagePumpLibevent::WATCH_READ_WRITE, &controller,
189 &delegate);
190 SimulateIOEvent(pump.get(), &controller);
191 }
192
QuitMessageLoopAndStart(OnceClosure quit_closure)193 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
194 std::move(quit_closure).Run();
195
196 RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
197 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
198 runloop.QuitClosure());
199 runloop.Run();
200 }
201
202 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
203 public:
204 NestedPumpWatcher() = default;
205 ~NestedPumpWatcher() override = default;
206
OnFileCanReadWithoutBlocking(int)207 void OnFileCanReadWithoutBlocking(int /* fd */) override {
208 RunLoop runloop;
209 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
210 FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
211 runloop.Run();
212 }
213
OnFileCanWriteWithoutBlocking(int)214 void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
215 };
216
TEST_P(MessagePumpLibeventTest,NestedPumpWatcher)217 TEST_P(MessagePumpLibeventTest, NestedPumpWatcher) {
218 NestedPumpWatcher delegate;
219 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
220 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
221 pump->WatchFileDescriptor(pipefds_[1], false, MessagePumpLibevent::WATCH_READ,
222 &controller, &delegate);
223 SimulateIOEvent(pump.get(), &controller);
224 }
225
FatalClosure()226 void FatalClosure() {
227 FAIL() << "Reached fatal closure.";
228 }
229
230 class QuitWatcher : public BaseWatcher {
231 public:
QuitWatcher(base::OnceClosure quit_closure)232 QuitWatcher(base::OnceClosure quit_closure)
233 : quit_closure_(std::move(quit_closure)) {}
234
OnFileCanReadWithoutBlocking(int)235 void OnFileCanReadWithoutBlocking(int /* fd */) override {
236 // Post a fatal closure to the MessageLoop before we quit it.
237 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
238 FROM_HERE, BindOnce(&FatalClosure));
239
240 if (quit_closure_)
241 std::move(quit_closure_).Run();
242 }
243
244 private:
245 base::OnceClosure quit_closure_;
246 };
247
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)248 void WriteFDWrapper(const int fd,
249 const char* buf,
250 int size,
251 WaitableEvent* event) {
252 ASSERT_TRUE(WriteFileDescriptor(fd, StringPiece(buf, size)));
253 }
254
255 // Tests that MessagePumpLibevent quits immediately when it is quit from
256 // libevent's event_base_loop().
TEST_P(MessagePumpLibeventTest,QuitWatcher)257 TEST_P(MessagePumpLibeventTest, QuitWatcher) {
258 // Delete the old TaskEnvironment so that we can manage our own one here.
259 task_environment_.reset();
260
261 std::unique_ptr<MessagePumpLibevent> executor_pump = CreateMessagePump();
262 MessagePumpLibevent* pump = executor_pump.get();
263 SingleThreadTaskExecutor executor(std::move(executor_pump));
264 RunLoop run_loop;
265 QuitWatcher delegate(run_loop.QuitClosure());
266 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
267 WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
268 WaitableEvent::InitialState::NOT_SIGNALED);
269 std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
270
271 // Tell the pump to watch the pipe.
272 pump->WatchFileDescriptor(pipefds_[0], false, MessagePumpLibevent::WATCH_READ,
273 &controller, &delegate);
274
275 // Make the IO thread wait for |event| before writing to pipefds[1].
276 WaitableEventWatcher::EventCallback write_fd_task =
277 BindOnce(&WriteFDWrapper, pipefds_[1], &null_byte_, 1);
278 io_runner()->PostTask(
279 FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
280 Unretained(watcher.get()), &event,
281 std::move(write_fd_task), io_runner()));
282
283 // Queue |event| to signal on |sequence_manager|.
284 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
285 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
286
287 // Now run the MessageLoop.
288 run_loop.Run();
289
290 // StartWatching can move |watcher| to IO thread. Release on IO thread.
291 io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
292 Owned(watcher.release())));
293 }
294
295 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
296 #define TEST_PARAM_VALUES kLibevent, kEpoll
297 #else
298 #define TEST_PARAM_VALUES kLibevent
299 #endif
300
301 INSTANTIATE_TEST_SUITE_P(,
302 MessagePumpLibeventTest,
303 ::testing::Values(TEST_PARAM_VALUES));
304
305 } // namespace
306
307 } // namespace base
308