1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/files/file_descriptor_watcher_posix.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/functional/callback_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/raw_ref.h"
13 #include "base/message_loop/message_pump_for_io.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/task/current_thread.h"
16 #include "base/task/sequenced_task_runner.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/threading/thread_restrictions.h"
20
21 namespace base {
22
23 namespace {
24
25 // Per-thread FileDescriptorWatcher registration.
26 constinit thread_local FileDescriptorWatcher* fd_watcher = nullptr;
27
28 } // namespace
29
30 class FileDescriptorWatcher::Controller::Watcher
31 : public MessagePumpForIO::FdWatcher,
32 public CurrentThread::DestructionObserver {
33 public:
34 Watcher(WeakPtr<Controller> controller,
35 base::WaitableEvent& on_destroyed,
36 MessagePumpForIO::Mode mode,
37 int fd);
38 Watcher(const Watcher&) = delete;
39 Watcher& operator=(const Watcher&) = delete;
40 ~Watcher() override;
41
42 void StartWatching();
43
44 private:
45 friend class FileDescriptorWatcher;
46
47 // MessagePumpForIO::FdWatcher:
48 void OnFileCanReadWithoutBlocking(int fd) override;
49 void OnFileCanWriteWithoutBlocking(int fd) override;
50
51 // CurrentThread::DestructionObserver:
52 void WillDestroyCurrentMessageLoop() override;
53
54 // The MessagePumpForIO's watch handle (stops the watch when destroyed).
55 MessagePumpForIO::FdWatchController fd_watch_controller_;
56
57 // Runs tasks on the sequence on which this was instantiated (i.e. the
58 // sequence on which the callback must run).
59 const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
60 SequencedTaskRunner::GetCurrentDefault();
61
62 // The Controller that created this Watcher. This WeakPtr is bound to the
63 // |controller_| thread and can only be used by this Watcher to post back to
64 // |callback_task_runner_|.
65 WeakPtr<Controller> controller_;
66
67 // WaitableEvent to signal to ensure that the Watcher is always destroyed
68 // before the Controller.
69 const raw_ref<base::WaitableEvent, AcrossTasksDanglingUntriaged>
70 on_destroyed_;
71
72 // Whether this Watcher is notified when |fd_| becomes readable or writable
73 // without blocking.
74 const MessagePumpForIO::Mode mode_;
75
76 // The watched file descriptor.
77 const int fd_;
78
79 // Except for the constructor, every method of this class must run on the same
80 // MessagePumpForIO thread.
81 ThreadChecker thread_checker_;
82
83 // Whether this Watcher was registered as a DestructionObserver on the
84 // MessagePumpForIO thread.
85 bool registered_as_destruction_observer_ = false;
86 };
87
Watcher(WeakPtr<Controller> controller,base::WaitableEvent & on_destroyed,MessagePumpForIO::Mode mode,int fd)88 FileDescriptorWatcher::Controller::Watcher::Watcher(
89 WeakPtr<Controller> controller,
90 base::WaitableEvent& on_destroyed,
91 MessagePumpForIO::Mode mode,
92 int fd)
93 : fd_watch_controller_(FROM_HERE),
94 controller_(controller),
95 on_destroyed_(on_destroyed),
96 mode_(mode),
97 fd_(fd) {
98 DCHECK(callback_task_runner_);
99 thread_checker_.DetachFromThread();
100 }
101
~Watcher()102 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
103 DCHECK(thread_checker_.CalledOnValidThread());
104 CurrentIOThread::Get()->RemoveDestructionObserver(this);
105
106 // Stop watching the descriptor before signalling |on_destroyed_|.
107 CHECK(fd_watch_controller_.StopWatchingFileDescriptor());
108 on_destroyed_->Signal();
109 }
110
StartWatching()111 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
112 DCHECK(thread_checker_.CalledOnValidThread());
113 DCHECK(CurrentIOThread::IsSet());
114
115 const bool watch_success = CurrentIOThread::Get()->WatchFileDescriptor(
116 fd_, false, mode_, &fd_watch_controller_, this);
117 DCHECK(watch_success) << "Failed to watch fd=" << fd_;
118
119 if (!registered_as_destruction_observer_) {
120 CurrentIOThread::Get()->AddDestructionObserver(this);
121 registered_as_destruction_observer_ = true;
122 }
123 }
124
OnFileCanReadWithoutBlocking(int fd)125 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
126 int fd) {
127 DCHECK_EQ(fd_, fd);
128 DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
129 DCHECK(thread_checker_.CalledOnValidThread());
130
131 // Run the callback on the sequence on which the watch was initiated.
132 callback_task_runner_->PostTask(
133 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
134 }
135
OnFileCanWriteWithoutBlocking(int fd)136 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
137 int fd) {
138 DCHECK_EQ(fd_, fd);
139 DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
140 DCHECK(thread_checker_.CalledOnValidThread());
141
142 // Run the callback on the sequence on which the watch was initiated.
143 callback_task_runner_->PostTask(
144 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
145 }
146
147 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()148 WillDestroyCurrentMessageLoop() {
149 DCHECK(thread_checker_.CalledOnValidThread());
150
151 if (callback_task_runner_->RunsTasksInCurrentSequence()) {
152 // |controller_| can be accessed directly when Watcher runs on the same
153 // thread.
154 Watcher* watcher = controller_->watcher_;
155 controller_->watcher_ = nullptr;
156 delete watcher;
157 } else {
158 // If the Watcher and the Controller live on different threads, delete
159 // |this| synchronously. Pending tasks bound to an unretained Watcher* will
160 // not run since this loop is dead. The associated Controller will not know
161 // whether the Watcher has been destroyed but it never uses it directly and
162 // will ultimately send it to this thread for deletion (and that also won't
163 // run since the loop being dead).
164 delete this;
165 }
166 }
167
Controller(MessagePumpForIO::Mode mode,int fd,const RepeatingClosure & callback)168 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
169 int fd,
170 const RepeatingClosure& callback)
171 : callback_(callback),
172 io_thread_task_runner_(fd_watcher->io_thread_task_runner()) {
173 DCHECK(!callback_.is_null());
174 DCHECK(io_thread_task_runner_);
175 watcher_ =
176 new Watcher(weak_factory_.GetWeakPtr(), on_watcher_destroyed_, mode, fd);
177 StartWatching();
178 }
179
~Controller()180 FileDescriptorWatcher::Controller::~Controller() {
181 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
182
183 if (io_thread_task_runner_->BelongsToCurrentThread()) {
184 // If the MessagePumpForIO and the Controller live on the same thread.
185 if (watcher_)
186 delete watcher_;
187 } else {
188 // Synchronously wait until |watcher_| is deleted on the MessagePumpForIO
189 // thread. This ensures that the file descriptor is never accessed after
190 // this destructor returns.
191 //
192 // We considered associating "generations" to file descriptors to avoid the
193 // synchronous wait. For example, if the IO thread gets a "cancel" for fd=6,
194 // generation=1 after getting a "start watching" for fd=6, generation=2, it
195 // can ignore the "Cancel". However, "generations" didn't solve this race:
196 //
197 // T1 (client) Start watching fd = 6 with WatchReadable()
198 // Stop watching fd = 6
199 // Close fd = 6
200 // Open a new file, fd = 6 gets reused.
201 // T2 (io) Watcher::StartWatching()
202 // Incorrectly starts watching fd = 6 which now refers to a
203 // different file than when WatchReadable() was called.
204 auto delete_task = BindOnce(
205 [](Watcher* watcher) {
206 // Since |watcher| is a raw pointer, it isn't deleted if this callback
207 // is deleted before it gets to run.
208 delete watcher;
209 },
210 UnsafeDanglingUntriaged(watcher_));
211 io_thread_task_runner_->PostTask(FROM_HERE, std::move(delete_task));
212 ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow;
213 on_watcher_destroyed_.Wait();
214 }
215
216 // Since WeakPtrs are invalidated by the destructor, any pending RunCallback()
217 // won't be invoked after this returns.
218 }
219
StartWatching()220 void FileDescriptorWatcher::Controller::StartWatching() {
221 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222 if (io_thread_task_runner_->BelongsToCurrentThread()) {
223 // If the MessagePumpForIO and the Controller live on the same thread.
224 watcher_->StartWatching();
225 } else {
226 // It is safe to use Unretained() below because |watcher_| can only be
227 // deleted by a delete task posted to |io_thread_task_runner_| by this
228 // Controller's destructor. Since this delete task hasn't been posted yet,
229 // it can't run before the task posted below.
230 io_thread_task_runner_->PostTask(
231 FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_)));
232 }
233 }
234
RunCallback()235 void FileDescriptorWatcher::Controller::RunCallback() {
236 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
237
238 WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
239
240 // Run a copy of the callback in case this Controller is deleted by the
241 // callback. This would cause the callback itself to be deleted while it is
242 // being run.
243 RepeatingClosure callback_copy = callback_;
244 callback_copy.Run();
245
246 // If |this| wasn't deleted, re-enable the watch.
247 if (weak_this)
248 StartWatching();
249 }
250
FileDescriptorWatcher(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)251 FileDescriptorWatcher::FileDescriptorWatcher(
252 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)
253 : resetter_(&fd_watcher, this, nullptr),
254 io_thread_task_runner_(std::move(io_thread_task_runner)) {}
255
256 FileDescriptorWatcher::~FileDescriptorWatcher() = default;
257
258 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const RepeatingClosure & callback)259 FileDescriptorWatcher::WatchReadable(int fd, const RepeatingClosure& callback) {
260 return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
261 }
262
263 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const RepeatingClosure & callback)264 FileDescriptorWatcher::WatchWritable(int fd, const RepeatingClosure& callback) {
265 return WrapUnique(
266 new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
267 }
268
269 #if DCHECK_IS_ON()
AssertAllowed()270 void FileDescriptorWatcher::AssertAllowed() {
271 DCHECK(fd_watcher);
272 }
273 #endif
274
275 } // namespace base
276