• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <fcntl.h>
6 #include <sys/socket.h>
7 #include <unistd.h>
8 
9 #include <array>
10 #include <memory>
11 #include <optional>
12 #include <queue>
13 #include <string>
14 #include <string_view>
15 #include <utility>
16 
17 #include "base/containers/span.h"
18 #include "base/files/scoped_file.h"
19 #include "base/message_loop/io_watcher.h"
20 #include "base/posix/eintr_wrapper.h"
21 #include "base/run_loop.h"
22 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h"
24 #include "base/synchronization/waitable_event.h"
25 #include "base/test/bind.h"
26 #include "base/test/task_environment.h"
27 #include "base/threading/thread.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29 
30 #if BUILDFLAG(IS_ANDROID)
31 #include "base/android/java_handler_thread.h"
32 #endif
33 
34 namespace base {
35 namespace {
36 
37 // TODO(crbug.com/379190028): Introduce new types here as file descriptor
38 // support is added.
39 enum class FdIOCapableMessagePumpType {
40   kDefaultIO,
41 #if BUILDFLAG(IS_ANDROID)
42   kAndroid,
43 #endif
44 };
45 
CreateSocketPair()46 std::pair<ScopedFD, ScopedFD> CreateSocketPair() {
47   int fds[2];
48   CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
49   PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
50   PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
51   return {ScopedFD(fds[0]), ScopedFD(fds[1])};
52 }
53 
WriteToSocket(int fd,std::string_view msg)54 void WriteToSocket(int fd, std::string_view msg) {
55   const ssize_t result = HANDLE_EINTR(write(fd, msg.data(), msg.size()));
56   CHECK_EQ(result, static_cast<ssize_t>(msg.size()));
57 }
58 
FillSocket(int fd)59 void FillSocket(int fd) {
60   const std::array<char, 1024> kJunk = {};
61   ssize_t result;
62   do {
63     result = HANDLE_EINTR(write(fd, kJunk.data(), kJunk.size()));
64   } while (result > 0);
65 }
66 
ReadFromSocket(int fd)67 std::string ReadFromSocket(int fd) {
68   char buffer[256];
69   const ssize_t result = HANDLE_EINTR(read(fd, buffer, std::size(buffer)));
70   if (result <= 0) {
71     return {};
72   }
73 
74   const auto contents = span(buffer).first(static_cast<size_t>(result));
75   return std::string(contents.begin(), contents.end());
76 }
77 
78 template <typename Fn>
RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner,Fn fn)79 void RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner, Fn fn) {
80   RunLoop loop;
81   task_runner->PostTask(FROM_HERE,
82                         BindLambdaForTesting([&fn, quit = loop.QuitClosure()] {
83                           fn();
84                           quit.Run();
85                         }));
86   loop.Run();
87 }
88 
89 class TestFdWatcher;
90 
91 class IOWatcherFdTest
92     : public testing::Test,
93       public testing::WithParamInterface<FdIOCapableMessagePumpType> {
94  public:
SetUp()95   void SetUp() override {
96     switch (GetParam()) {
97       case FdIOCapableMessagePumpType::kDefaultIO:
98         thread_.emplace("IO thread");
99         thread_->StartWithOptions(Thread::Options(MessagePumpType::IO, 0));
100         io_task_runner_ = thread_->task_runner();
101         break;
102 
103 #if BUILDFLAG(IS_ANDROID)
104       case FdIOCapableMessagePumpType::kAndroid:
105         java_thread_.emplace("Java thread");
106         java_thread_->Start();
107         io_task_runner_ = java_thread_->task_runner();
108         break;
109 #endif
110     }
111   }
112 
TearDown()113   void TearDown() override {
114     thread_.reset();
115 #if BUILDFLAG(IS_ANDROID)
116     if (java_thread_) {
117       java_thread_->Stop();
118       java_thread_.reset();
119     }
120 #endif
121   }
122 
123   std::unique_ptr<TestFdWatcher> CreateWatcher();
124 
125   // This is useful for ensuring that read and write can be observed at the
126   // same time on a socket's peer, since the operations which signal both read
127   // and write availability will happen on the same thread that dispatches
128   // signals.
MakePeerReadableAndWritableFromIOThread(int fd)129   void MakePeerReadableAndWritableFromIOThread(int fd) {
130     RunOnTaskRunner(io_task_runner_, [fd] {
131       WriteToSocket(fd, "x");
132       while (!ReadFromSocket(fd).empty()) {
133       }
134     });
135   }
136 
137  private:
138   test::TaskEnvironment task_environment_;
139   std::optional<Thread> thread_;
140 #if BUILDFLAG(IS_ANDROID)
141   std::optional<android::JavaHandlerThread> java_thread_;
142 #endif
143   scoped_refptr<SequencedTaskRunner> io_task_runner_;
144 };
145 
146 class TestFdWatcher : public IOWatcher::FdWatcher {
147  public:
TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)148   explicit TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)
149       : io_task_runner_(std::move(io_task_runner)) {}
150 
~TestFdWatcher()151   ~TestFdWatcher() override { Stop(); }
152 
num_events()153   int num_events() {
154     AutoLock lock(lock_);
155     return num_events_;
156   }
157 
reset_num_events()158   void reset_num_events() {
159     AutoLock lock(lock_);
160     num_events_ = 0;
161   }
162 
set_cancel_on_read()163   void set_cancel_on_read() { cancel_on_read_ = true; }
set_cancel_on_write()164   void set_cancel_on_write() { cancel_on_write_ = true; }
165 
Watch(const ScopedFD & fd,IOWatcher::FdWatchDuration duration,IOWatcher::FdWatchMode mode)166   void Watch(const ScopedFD& fd,
167              IOWatcher::FdWatchDuration duration,
168              IOWatcher::FdWatchMode mode) {
169     RunOnTaskRunner(io_task_runner_, [this, fd = fd.get(), duration, mode] {
170       watch_ = IOWatcher::Get()->WatchFileDescriptor(fd, duration, mode, *this);
171     });
172   }
173 
Stop()174   void Stop() {
175     RunOnTaskRunner(io_task_runner_, [this] { watch_.reset(); });
176   }
177 
WaitForNextMessage()178   std::string WaitForNextMessage() {
179     AutoLock lock(lock_);
180     while (messages_.empty()) {
181       messages_available_.Wait();
182     }
183     std::string next_message = messages_.front();
184     messages_.pop();
185     return next_message;
186   }
187 
WaitForDisconnect()188   void WaitForDisconnect() { disconnect_event_.Wait(); }
189 
WaitForWritable()190   void WaitForWritable() { writable_event_.Wait(); }
191 
WaitForReadableOrWritable()192   void WaitForReadableOrWritable() { readable_or_writable_event_.Wait(); }
193 
194   // IOWatcher::FdWatcher:
OnFdReadable(int fd)195   void OnFdReadable(int fd) override {
196     bool did_read_something = false;
197     {
198       AutoLock lock(lock_);
199       ++num_events_;
200       readable_or_writable_event_.Signal();
201 
202       for (;;) {
203         std::string message = ReadFromSocket(fd);
204         if (message.empty()) {
205           break;
206         }
207 
208         did_read_something = true;
209         messages_.push(std::move(message));
210         messages_available_.Signal();
211       }
212     }
213 
214     if (!did_read_something) {
215       disconnect_event_.Signal();
216     }
217 
218     if (cancel_on_read_) {
219       watch_.reset();
220     }
221   }
222 
OnFdWritable(int fd)223   void OnFdWritable(int fd) override {
224     {
225       AutoLock lock(lock_);
226       ++num_events_;
227       writable_event_.Signal();
228       readable_or_writable_event_.Signal();
229     }
230 
231     if (cancel_on_write_) {
232       watch_.reset();
233     }
234   }
235 
236  private:
237   const scoped_refptr<SequencedTaskRunner> io_task_runner_;
238 
239   // The active watch, started by Watch(). Only one at a time and must be
240   // created and destroyed on `io_task_runner_`.
241   std::unique_ptr<IOWatcher::FdWatch> watch_;
242 
243   // Signaled when `watch_` observes writability.
244   WaitableEvent writable_event_{WaitableEvent::ResetPolicy::AUTOMATIC};
245 
246   // Signaled when `watch_` observes either readability or writability.
247   WaitableEvent readable_or_writable_event_{
248       WaitableEvent::ResetPolicy::AUTOMATIC};
249 
250   // Signaled when `watch_` observes disconnection - i.e., readability when
251   // nothing is available to read.
252   WaitableEvent disconnect_event_;
253 
254   // If set by a test, observing readability will immediately destroy `watch_`.
255   bool cancel_on_read_ = false;
256 
257   // If set by a test, observing writability will immediately destroy `watch_`.
258   bool cancel_on_write_ = false;
259 
260   Lock lock_;
261 
262   // Message queue accumulated as readability is signaled.
263   ConditionVariable messages_available_{&lock_};
264   std::queue<std::string> messages_ GUARDED_BY(lock_);
265 
266   // Counts the number of observed events of any kind.
267   int num_events_ GUARDED_BY(lock_) = 0;
268 };
269 
CreateWatcher()270 std::unique_ptr<TestFdWatcher> IOWatcherFdTest::CreateWatcher() {
271   return std::make_unique<TestFdWatcher>(io_task_runner_);
272 }
273 
TEST_P(IOWatcherFdTest,ReadOnce)274 TEST_P(IOWatcherFdTest, ReadOnce) {
275   // Test that a one-shot read watch sees a single readable event and no more.
276   auto [a, b] = CreateSocketPair();
277   auto watcher1 = CreateWatcher();
278   watcher1->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
279                   IOWatcher::FdWatchMode::kRead);
280   WriteToSocket(a.get(), "ping");
281   EXPECT_EQ("ping", watcher1->WaitForNextMessage());
282 
283   auto watcher2 = CreateWatcher();
284   watcher2->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
285                   IOWatcher::FdWatchMode::kRead);
286   WriteToSocket(a.get(), "pong");
287   EXPECT_EQ("pong", watcher2->WaitForNextMessage());
288   EXPECT_EQ(1, watcher1->num_events());
289 }
290 
TEST_P(IOWatcherFdTest,ReadPersistent)291 TEST_P(IOWatcherFdTest, ReadPersistent) {
292   // Tests that a persistent read watch can see multiple events.
293   auto [a, b] = CreateSocketPair();
294   auto watcher = CreateWatcher();
295   watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
296                  IOWatcher::FdWatchMode::kRead);
297   WriteToSocket(a.get(), "ping");
298   EXPECT_EQ("ping", watcher->WaitForNextMessage());
299   WriteToSocket(a.get(), "pong");
300   EXPECT_EQ("pong", watcher->WaitForNextMessage());
301   EXPECT_EQ(2, watcher->num_events());
302   a.reset();
303   watcher->WaitForDisconnect();
304 }
305 
TEST_P(IOWatcherFdTest,StopWatch)306 TEST_P(IOWatcherFdTest, StopWatch) {
307   // Tests that a stopped watch doesn't continue dispatching events.
308   auto [a, b] = CreateSocketPair();
309   auto watcher = CreateWatcher();
310   watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
311                  IOWatcher::FdWatchMode::kRead);
312   WriteToSocket(a.get(), "ping");
313   EXPECT_EQ("ping", watcher->WaitForNextMessage());
314   WriteToSocket(a.get(), "pong");
315   EXPECT_EQ("pong", watcher->WaitForNextMessage());
316   watcher->Stop();
317   watcher->reset_num_events();
318 
319   WriteToSocket(a.get(), "abc");
320   WriteToSocket(a.get(), "123");
321   EXPECT_EQ(0, watcher->num_events());
322 }
323 
TEST_P(IOWatcherFdTest,Write)324 TEST_P(IOWatcherFdTest, Write) {
325   // Tests basic one-shot write watching.
326   auto [a, b] = CreateSocketPair();
327   FillSocket(b.get());
328   auto watcher = CreateWatcher();
329   watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
330                  IOWatcher::FdWatchMode::kWrite);
331   MakePeerReadableAndWritableFromIOThread(a.get());
332   watcher->WaitForWritable();
333   WriteToSocket(b.get(), "x");
334 }
335 
TEST_P(IOWatcherFdTest,ReadWriteUnifiedOneShot)336 TEST_P(IOWatcherFdTest, ReadWriteUnifiedOneShot) {
337   // Tests that a one-shot read-write watch will observe at most one event
338   // even if the watched object becomes both readable and writable.
339   auto [a, b] = CreateSocketPair();
340   FillSocket(b.get());
341   auto watcher = CreateWatcher();
342   watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
343                  IOWatcher::FdWatchMode::kReadWrite);
344   MakePeerReadableAndWritableFromIOThread(a.get());
345   watcher->WaitForReadableOrWritable();
346   EXPECT_EQ(1, watcher->num_events());
347 }
348 
TEST_P(IOWatcherFdTest,ReadWriteSeparateOneShot)349 TEST_P(IOWatcherFdTest, ReadWriteSeparateOneShot) {
350   // Tests that separate one-shot read and write watches can observe the same
351   // descriptor concurrently.
352   auto [a, b] = CreateSocketPair();
353   FillSocket(b.get());
354   auto read_watcher = CreateWatcher();
355   auto write_watcher = CreateWatcher();
356   read_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
357                       IOWatcher::FdWatchMode::kRead);
358   write_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
359                        IOWatcher::FdWatchMode::kWrite);
360   MakePeerReadableAndWritableFromIOThread(a.get());
361   EXPECT_EQ("x", read_watcher->WaitForNextMessage());
362   write_watcher->WaitForWritable();
363 }
364 
TEST_P(IOWatcherFdTest,CancelDuringRead)365 TEST_P(IOWatcherFdTest, CancelDuringRead) {
366   // Tests that the watcher behaves safely when watching both read and write
367   // with a persistent watch which is cancelled while handling a read.
368   auto [a, b] = CreateSocketPair();
369   FillSocket(b.get());
370   auto watcher = CreateWatcher();
371   watcher->set_cancel_on_read();
372   watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
373                  IOWatcher::FdWatchMode::kReadWrite);
374   MakePeerReadableAndWritableFromIOThread(a.get());
375   EXPECT_EQ("x", watcher->WaitForNextMessage());
376   EXPECT_LE(watcher->num_events(), 2);
377 }
378 
TEST_P(IOWatcherFdTest,CancelDuringWrite)379 TEST_P(IOWatcherFdTest, CancelDuringWrite) {
380   // Tests that the watcher behaves safely when watching both read and write
381   // with a persistent watch which is cancelled while handling a write.
382   auto [a, b] = CreateSocketPair();
383   FillSocket(b.get());
384   auto watcher = CreateWatcher();
385   watcher->set_cancel_on_write();
386   watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
387                  IOWatcher::FdWatchMode::kReadWrite);
388   MakePeerReadableAndWritableFromIOThread(a.get());
389   EXPECT_LE(watcher->num_events(), 2);
390 }
391 
392 INSTANTIATE_TEST_SUITE_P(,
393                          IOWatcherFdTest,
394                          testing::Values(
395 #if BUILDFLAG(IS_ANDROID)
396                              FdIOCapableMessagePumpType::kAndroid,
397 #endif
398                              FdIOCapableMessagePumpType::kDefaultIO));
399 
400 }  // namespace
401 }  // namespace base
402