1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <fcntl.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8
9 #include <array>
10 #include <memory>
11 #include <optional>
12 #include <queue>
13 #include <string>
14 #include <string_view>
15 #include <utility>
16
17 #include "base/containers/span.h"
18 #include "base/files/scoped_file.h"
19 #include "base/message_loop/io_watcher.h"
20 #include "base/posix/eintr_wrapper.h"
21 #include "base/run_loop.h"
22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h"
24 #include "base/synchronization/waitable_event.h"
25 #include "base/test/bind.h"
26 #include "base/test/task_environment.h"
27 #include "base/threading/thread.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29
30 #if BUILDFLAG(IS_ANDROID)
31 #include "base/android/java_handler_thread.h"
32 #endif
33
34 namespace base {
35 namespace {
36
37 // TODO(crbug.com/379190028): Introduce new types here as file descriptor
38 // support is added.
39 enum class FdIOCapableMessagePumpType {
40 kDefaultIO,
41 #if BUILDFLAG(IS_ANDROID)
42 kAndroid,
43 #endif
44 };
45
CreateSocketPair()46 std::pair<ScopedFD, ScopedFD> CreateSocketPair() {
47 int fds[2];
48 CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
49 PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
50 PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
51 return {ScopedFD(fds[0]), ScopedFD(fds[1])};
52 }
53
WriteToSocket(int fd,std::string_view msg)54 void WriteToSocket(int fd, std::string_view msg) {
55 const ssize_t result = HANDLE_EINTR(write(fd, msg.data(), msg.size()));
56 CHECK_EQ(result, static_cast<ssize_t>(msg.size()));
57 }
58
FillSocket(int fd)59 void FillSocket(int fd) {
60 const std::array<char, 1024> kJunk = {};
61 ssize_t result;
62 do {
63 result = HANDLE_EINTR(write(fd, kJunk.data(), kJunk.size()));
64 } while (result > 0);
65 }
66
ReadFromSocket(int fd)67 std::string ReadFromSocket(int fd) {
68 char buffer[256];
69 const ssize_t result = HANDLE_EINTR(read(fd, buffer, std::size(buffer)));
70 if (result <= 0) {
71 return {};
72 }
73
74 const auto contents = span(buffer).first(static_cast<size_t>(result));
75 return std::string(contents.begin(), contents.end());
76 }
77
78 template <typename Fn>
RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner,Fn fn)79 void RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner, Fn fn) {
80 RunLoop loop;
81 task_runner->PostTask(FROM_HERE,
82 BindLambdaForTesting([&fn, quit = loop.QuitClosure()] {
83 fn();
84 quit.Run();
85 }));
86 loop.Run();
87 }
88
89 class TestFdWatcher;
90
91 class IOWatcherFdTest
92 : public testing::Test,
93 public testing::WithParamInterface<FdIOCapableMessagePumpType> {
94 public:
SetUp()95 void SetUp() override {
96 switch (GetParam()) {
97 case FdIOCapableMessagePumpType::kDefaultIO:
98 thread_.emplace("IO thread");
99 thread_->StartWithOptions(Thread::Options(MessagePumpType::IO, 0));
100 io_task_runner_ = thread_->task_runner();
101 break;
102
103 #if BUILDFLAG(IS_ANDROID)
104 case FdIOCapableMessagePumpType::kAndroid:
105 java_thread_.emplace("Java thread");
106 java_thread_->Start();
107 io_task_runner_ = java_thread_->task_runner();
108 break;
109 #endif
110 }
111 }
112
TearDown()113 void TearDown() override {
114 thread_.reset();
115 #if BUILDFLAG(IS_ANDROID)
116 if (java_thread_) {
117 java_thread_->Stop();
118 java_thread_.reset();
119 }
120 #endif
121 }
122
123 std::unique_ptr<TestFdWatcher> CreateWatcher();
124
125 // This is useful for ensuring that read and write can be observed at the
126 // same time on a socket's peer, since the operations which signal both read
127 // and write availability will happen on the same thread that dispatches
128 // signals.
MakePeerReadableAndWritableFromIOThread(int fd)129 void MakePeerReadableAndWritableFromIOThread(int fd) {
130 RunOnTaskRunner(io_task_runner_, [fd] {
131 WriteToSocket(fd, "x");
132 while (!ReadFromSocket(fd).empty()) {
133 }
134 });
135 }
136
137 private:
138 test::TaskEnvironment task_environment_;
139 std::optional<Thread> thread_;
140 #if BUILDFLAG(IS_ANDROID)
141 std::optional<android::JavaHandlerThread> java_thread_;
142 #endif
143 scoped_refptr<SequencedTaskRunner> io_task_runner_;
144 };
145
146 class TestFdWatcher : public IOWatcher::FdWatcher {
147 public:
TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)148 explicit TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)
149 : io_task_runner_(std::move(io_task_runner)) {}
150
~TestFdWatcher()151 ~TestFdWatcher() override { Stop(); }
152
num_events()153 int num_events() {
154 AutoLock lock(lock_);
155 return num_events_;
156 }
157
reset_num_events()158 void reset_num_events() {
159 AutoLock lock(lock_);
160 num_events_ = 0;
161 }
162
set_cancel_on_read()163 void set_cancel_on_read() { cancel_on_read_ = true; }
set_cancel_on_write()164 void set_cancel_on_write() { cancel_on_write_ = true; }
165
Watch(const ScopedFD & fd,IOWatcher::FdWatchDuration duration,IOWatcher::FdWatchMode mode)166 void Watch(const ScopedFD& fd,
167 IOWatcher::FdWatchDuration duration,
168 IOWatcher::FdWatchMode mode) {
169 RunOnTaskRunner(io_task_runner_, [this, fd = fd.get(), duration, mode] {
170 watch_ = IOWatcher::Get()->WatchFileDescriptor(fd, duration, mode, *this);
171 });
172 }
173
Stop()174 void Stop() {
175 RunOnTaskRunner(io_task_runner_, [this] { watch_.reset(); });
176 }
177
WaitForNextMessage()178 std::string WaitForNextMessage() {
179 AutoLock lock(lock_);
180 while (messages_.empty()) {
181 messages_available_.Wait();
182 }
183 std::string next_message = messages_.front();
184 messages_.pop();
185 return next_message;
186 }
187
WaitForDisconnect()188 void WaitForDisconnect() { disconnect_event_.Wait(); }
189
WaitForWritable()190 void WaitForWritable() { writable_event_.Wait(); }
191
WaitForReadableOrWritable()192 void WaitForReadableOrWritable() { readable_or_writable_event_.Wait(); }
193
194 // IOWatcher::FdWatcher:
OnFdReadable(int fd)195 void OnFdReadable(int fd) override {
196 bool did_read_something = false;
197 {
198 AutoLock lock(lock_);
199 ++num_events_;
200 readable_or_writable_event_.Signal();
201
202 for (;;) {
203 std::string message = ReadFromSocket(fd);
204 if (message.empty()) {
205 break;
206 }
207
208 did_read_something = true;
209 messages_.push(std::move(message));
210 messages_available_.Signal();
211 }
212 }
213
214 if (!did_read_something) {
215 disconnect_event_.Signal();
216 }
217
218 if (cancel_on_read_) {
219 watch_.reset();
220 }
221 }
222
OnFdWritable(int fd)223 void OnFdWritable(int fd) override {
224 {
225 AutoLock lock(lock_);
226 ++num_events_;
227 writable_event_.Signal();
228 readable_or_writable_event_.Signal();
229 }
230
231 if (cancel_on_write_) {
232 watch_.reset();
233 }
234 }
235
236 private:
237 const scoped_refptr<SequencedTaskRunner> io_task_runner_;
238
239 // The active watch, started by Watch(). Only one at a time and must be
240 // created and destroyed on `io_task_runner_`.
241 std::unique_ptr<IOWatcher::FdWatch> watch_;
242
243 // Signaled when `watch_` observes writability.
244 WaitableEvent writable_event_{WaitableEvent::ResetPolicy::AUTOMATIC};
245
246 // Signaled when `watch_` observes either readability or writability.
247 WaitableEvent readable_or_writable_event_{
248 WaitableEvent::ResetPolicy::AUTOMATIC};
249
250 // Signaled when `watch_` observes disconnection - i.e., readability when
251 // nothing is available to read.
252 WaitableEvent disconnect_event_;
253
254 // If set by a test, observing readability will immediately destroy `watch_`.
255 bool cancel_on_read_ = false;
256
257 // If set by a test, observing writability will immediately destroy `watch_`.
258 bool cancel_on_write_ = false;
259
260 Lock lock_;
261
262 // Message queue accumulated as readability is signaled.
263 ConditionVariable messages_available_{&lock_};
264 std::queue<std::string> messages_ GUARDED_BY(lock_);
265
266 // Counts the number of observed events of any kind.
267 int num_events_ GUARDED_BY(lock_) = 0;
268 };
269
CreateWatcher()270 std::unique_ptr<TestFdWatcher> IOWatcherFdTest::CreateWatcher() {
271 return std::make_unique<TestFdWatcher>(io_task_runner_);
272 }
273
TEST_P(IOWatcherFdTest,ReadOnce)274 TEST_P(IOWatcherFdTest, ReadOnce) {
275 // Test that a one-shot read watch sees a single readable event and no more.
276 auto [a, b] = CreateSocketPair();
277 auto watcher1 = CreateWatcher();
278 watcher1->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
279 IOWatcher::FdWatchMode::kRead);
280 WriteToSocket(a.get(), "ping");
281 EXPECT_EQ("ping", watcher1->WaitForNextMessage());
282
283 auto watcher2 = CreateWatcher();
284 watcher2->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
285 IOWatcher::FdWatchMode::kRead);
286 WriteToSocket(a.get(), "pong");
287 EXPECT_EQ("pong", watcher2->WaitForNextMessage());
288 EXPECT_EQ(1, watcher1->num_events());
289 }
290
TEST_P(IOWatcherFdTest,ReadPersistent)291 TEST_P(IOWatcherFdTest, ReadPersistent) {
292 // Tests that a persistent read watch can see multiple events.
293 auto [a, b] = CreateSocketPair();
294 auto watcher = CreateWatcher();
295 watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
296 IOWatcher::FdWatchMode::kRead);
297 WriteToSocket(a.get(), "ping");
298 EXPECT_EQ("ping", watcher->WaitForNextMessage());
299 WriteToSocket(a.get(), "pong");
300 EXPECT_EQ("pong", watcher->WaitForNextMessage());
301 EXPECT_EQ(2, watcher->num_events());
302 a.reset();
303 watcher->WaitForDisconnect();
304 }
305
TEST_P(IOWatcherFdTest,StopWatch)306 TEST_P(IOWatcherFdTest, StopWatch) {
307 // Tests that a stopped watch doesn't continue dispatching events.
308 auto [a, b] = CreateSocketPair();
309 auto watcher = CreateWatcher();
310 watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
311 IOWatcher::FdWatchMode::kRead);
312 WriteToSocket(a.get(), "ping");
313 EXPECT_EQ("ping", watcher->WaitForNextMessage());
314 WriteToSocket(a.get(), "pong");
315 EXPECT_EQ("pong", watcher->WaitForNextMessage());
316 watcher->Stop();
317 watcher->reset_num_events();
318
319 WriteToSocket(a.get(), "abc");
320 WriteToSocket(a.get(), "123");
321 EXPECT_EQ(0, watcher->num_events());
322 }
323
TEST_P(IOWatcherFdTest,Write)324 TEST_P(IOWatcherFdTest, Write) {
325 // Tests basic one-shot write watching.
326 auto [a, b] = CreateSocketPair();
327 FillSocket(b.get());
328 auto watcher = CreateWatcher();
329 watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
330 IOWatcher::FdWatchMode::kWrite);
331 MakePeerReadableAndWritableFromIOThread(a.get());
332 watcher->WaitForWritable();
333 WriteToSocket(b.get(), "x");
334 }
335
TEST_P(IOWatcherFdTest,ReadWriteUnifiedOneShot)336 TEST_P(IOWatcherFdTest, ReadWriteUnifiedOneShot) {
337 // Tests that a one-shot read-write watch will observe at most one event
338 // even if the watched object becomes both readable and writable.
339 auto [a, b] = CreateSocketPair();
340 FillSocket(b.get());
341 auto watcher = CreateWatcher();
342 watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
343 IOWatcher::FdWatchMode::kReadWrite);
344 MakePeerReadableAndWritableFromIOThread(a.get());
345 watcher->WaitForReadableOrWritable();
346 EXPECT_EQ(1, watcher->num_events());
347 }
348
TEST_P(IOWatcherFdTest,ReadWriteSeparateOneShot)349 TEST_P(IOWatcherFdTest, ReadWriteSeparateOneShot) {
350 // Tests that separate one-shot read and write watches can observe the same
351 // descriptor concurrently.
352 auto [a, b] = CreateSocketPair();
353 FillSocket(b.get());
354 auto read_watcher = CreateWatcher();
355 auto write_watcher = CreateWatcher();
356 read_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
357 IOWatcher::FdWatchMode::kRead);
358 write_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
359 IOWatcher::FdWatchMode::kWrite);
360 MakePeerReadableAndWritableFromIOThread(a.get());
361 EXPECT_EQ("x", read_watcher->WaitForNextMessage());
362 write_watcher->WaitForWritable();
363 }
364
TEST_P(IOWatcherFdTest,CancelDuringRead)365 TEST_P(IOWatcherFdTest, CancelDuringRead) {
366 // Tests that the watcher behaves safely when watching both read and write
367 // with a persistent watch which is cancelled while handling a read.
368 auto [a, b] = CreateSocketPair();
369 FillSocket(b.get());
370 auto watcher = CreateWatcher();
371 watcher->set_cancel_on_read();
372 watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
373 IOWatcher::FdWatchMode::kReadWrite);
374 MakePeerReadableAndWritableFromIOThread(a.get());
375 EXPECT_EQ("x", watcher->WaitForNextMessage());
376 EXPECT_LE(watcher->num_events(), 2);
377 }
378
TEST_P(IOWatcherFdTest,CancelDuringWrite)379 TEST_P(IOWatcherFdTest, CancelDuringWrite) {
380 // Tests that the watcher behaves safely when watching both read and write
381 // with a persistent watch which is cancelled while handling a write.
382 auto [a, b] = CreateSocketPair();
383 FillSocket(b.get());
384 auto watcher = CreateWatcher();
385 watcher->set_cancel_on_write();
386 watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
387 IOWatcher::FdWatchMode::kReadWrite);
388 MakePeerReadableAndWritableFromIOThread(a.get());
389 EXPECT_LE(watcher->num_events(), 2);
390 }
391
392 INSTANTIATE_TEST_SUITE_P(,
393 IOWatcherFdTest,
394 testing::Values(
395 #if BUILDFLAG(IS_ANDROID)
396 FdIOCapableMessagePumpType::kAndroid,
397 #endif
398 FdIOCapableMessagePumpType::kDefaultIO));
399
400 } // namespace
401 } // namespace base
402