• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/files/file_descriptor_watcher_posix.h"
6 
7 #include <utility>
8 
9 #include "base/functional/bind.h"
10 #include "base/functional/callback_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/raw_ref.h"
13 #include "base/message_loop/message_pump_for_io.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/task/current_thread.h"
16 #include "base/task/sequenced_task_runner.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/threading/thread_restrictions.h"
20 
21 namespace base {
22 
23 namespace {
24 
25 // Per-thread FileDescriptorWatcher registration.
26 constinit thread_local FileDescriptorWatcher* fd_watcher = nullptr;
27 
28 }  // namespace
29 
30 class FileDescriptorWatcher::Controller::Watcher
31     : public MessagePumpForIO::FdWatcher,
32       public CurrentThread::DestructionObserver {
33  public:
34   Watcher(WeakPtr<Controller> controller,
35           base::WaitableEvent& on_destroyed,
36           MessagePumpForIO::Mode mode,
37           int fd);
38   Watcher(const Watcher&) = delete;
39   Watcher& operator=(const Watcher&) = delete;
40   ~Watcher() override;
41 
42   void StartWatching();
43 
44  private:
45   friend class FileDescriptorWatcher;
46 
47   // MessagePumpForIO::FdWatcher:
48   void OnFileCanReadWithoutBlocking(int fd) override;
49   void OnFileCanWriteWithoutBlocking(int fd) override;
50 
51   // CurrentThread::DestructionObserver:
52   void WillDestroyCurrentMessageLoop() override;
53 
54   // The MessagePumpForIO's watch handle (stops the watch when destroyed).
55   MessagePumpForIO::FdWatchController fd_watch_controller_;
56 
57   // Runs tasks on the sequence on which this was instantiated (i.e. the
58   // sequence on which the callback must run).
59   const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
60       SequencedTaskRunner::GetCurrentDefault();
61 
62   // The Controller that created this Watcher. This WeakPtr is bound to the
63   // |controller_| thread and can only be used by this Watcher to post back to
64   // |callback_task_runner_|.
65   WeakPtr<Controller> controller_;
66 
67   // WaitableEvent to signal to ensure that the Watcher is always destroyed
68   // before the Controller.
69   const raw_ref<base::WaitableEvent, AcrossTasksDanglingUntriaged>
70       on_destroyed_;
71 
72   // Whether this Watcher is notified when |fd_| becomes readable or writable
73   // without blocking.
74   const MessagePumpForIO::Mode mode_;
75 
76   // The watched file descriptor.
77   const int fd_;
78 
79   // Except for the constructor, every method of this class must run on the same
80   // MessagePumpForIO thread.
81   ThreadChecker thread_checker_;
82 
83   // Whether this Watcher was registered as a DestructionObserver on the
84   // MessagePumpForIO thread.
85   bool registered_as_destruction_observer_ = false;
86 };
87 
Watcher(WeakPtr<Controller> controller,base::WaitableEvent & on_destroyed,MessagePumpForIO::Mode mode,int fd)88 FileDescriptorWatcher::Controller::Watcher::Watcher(
89     WeakPtr<Controller> controller,
90     base::WaitableEvent& on_destroyed,
91     MessagePumpForIO::Mode mode,
92     int fd)
93     : fd_watch_controller_(FROM_HERE),
94       controller_(controller),
95       on_destroyed_(on_destroyed),
96       mode_(mode),
97       fd_(fd) {
98   DCHECK(callback_task_runner_);
99   thread_checker_.DetachFromThread();
100 }
101 
~Watcher()102 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
103   DCHECK(thread_checker_.CalledOnValidThread());
104   CurrentIOThread::Get()->RemoveDestructionObserver(this);
105 
106   // Stop watching the descriptor before signalling |on_destroyed_|.
107   CHECK(fd_watch_controller_.StopWatchingFileDescriptor());
108   on_destroyed_->Signal();
109 }
110 
StartWatching()111 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
112   DCHECK(thread_checker_.CalledOnValidThread());
113   DCHECK(CurrentIOThread::IsSet());
114 
115   const bool watch_success = CurrentIOThread::Get()->WatchFileDescriptor(
116       fd_, false, mode_, &fd_watch_controller_, this);
117   DCHECK(watch_success) << "Failed to watch fd=" << fd_;
118 
119   if (!registered_as_destruction_observer_) {
120     CurrentIOThread::Get()->AddDestructionObserver(this);
121     registered_as_destruction_observer_ = true;
122   }
123 }
124 
OnFileCanReadWithoutBlocking(int fd)125 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
126     int fd) {
127   DCHECK_EQ(fd_, fd);
128   DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
129   DCHECK(thread_checker_.CalledOnValidThread());
130 
131   // Run the callback on the sequence on which the watch was initiated.
132   callback_task_runner_->PostTask(
133       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
134 }
135 
OnFileCanWriteWithoutBlocking(int fd)136 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
137     int fd) {
138   DCHECK_EQ(fd_, fd);
139   DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
140   DCHECK(thread_checker_.CalledOnValidThread());
141 
142   // Run the callback on the sequence on which the watch was initiated.
143   callback_task_runner_->PostTask(
144       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
145 }
146 
147 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()148     WillDestroyCurrentMessageLoop() {
149   DCHECK(thread_checker_.CalledOnValidThread());
150 
151   if (callback_task_runner_->RunsTasksInCurrentSequence()) {
152     // |controller_| can be accessed directly when Watcher runs on the same
153     // thread.
154     Watcher* watcher = controller_->watcher_;
155     controller_->watcher_ = nullptr;
156     delete watcher;
157   } else {
158     // If the Watcher and the Controller live on different threads, delete
159     // |this| synchronously. Pending tasks bound to an unretained Watcher* will
160     // not run since this loop is dead. The associated Controller will not know
161     // whether the Watcher has been destroyed but it never uses it directly and
162     // will ultimately send it to this thread for deletion (and that also  won't
163     // run since the loop being dead).
164     delete this;
165   }
166 }
167 
Controller(MessagePumpForIO::Mode mode,int fd,const RepeatingClosure & callback)168 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
169                                               int fd,
170                                               const RepeatingClosure& callback)
171     : callback_(callback),
172       io_thread_task_runner_(fd_watcher->io_thread_task_runner()) {
173   DCHECK(!callback_.is_null());
174   DCHECK(io_thread_task_runner_);
175   watcher_ =
176       new Watcher(weak_factory_.GetWeakPtr(), on_watcher_destroyed_, mode, fd);
177   StartWatching();
178 }
179 
~Controller()180 FileDescriptorWatcher::Controller::~Controller() {
181   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
182 
183   if (io_thread_task_runner_->BelongsToCurrentThread()) {
184     // If the MessagePumpForIO and the Controller live on the same thread.
185     if (watcher_)
186       delete watcher_;
187   } else {
188     // Synchronously wait until |watcher_| is deleted on the MessagePumpForIO
189     // thread. This ensures that the file descriptor is never accessed after
190     // this destructor returns.
191     //
192     // We considered associating "generations" to file descriptors to avoid the
193     // synchronous wait. For example, if the IO thread gets a "cancel" for fd=6,
194     // generation=1 after getting a "start watching" for fd=6, generation=2, it
195     // can ignore the "Cancel". However, "generations" didn't solve this race:
196     //
197     // T1 (client) Start watching fd = 6 with WatchReadable()
198     //             Stop watching fd = 6
199     //             Close fd = 6
200     //             Open a new file, fd = 6 gets reused.
201     // T2 (io)     Watcher::StartWatching()
202     //               Incorrectly starts watching fd = 6 which now refers to a
203     //               different file than when WatchReadable() was called.
204     auto delete_task = BindOnce(
205         [](Watcher* watcher) {
206           // Since |watcher| is a raw pointer, it isn't deleted if this callback
207           // is deleted before it gets to run.
208           delete watcher;
209         },
210         UnsafeDanglingUntriaged(watcher_));
211     io_thread_task_runner_->PostTask(FROM_HERE, std::move(delete_task));
212     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow;
213     on_watcher_destroyed_.Wait();
214   }
215 
216   // Since WeakPtrs are invalidated by the destructor, any pending RunCallback()
217   // won't be invoked after this returns.
218 }
219 
StartWatching()220 void FileDescriptorWatcher::Controller::StartWatching() {
221   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222   if (io_thread_task_runner_->BelongsToCurrentThread()) {
223     // If the MessagePumpForIO and the Controller live on the same thread.
224     watcher_->StartWatching();
225   } else {
226     // It is safe to use Unretained() below because |watcher_| can only be
227     // deleted by a delete task posted to |io_thread_task_runner_| by this
228     // Controller's destructor. Since this delete task hasn't been posted yet,
229     // it can't run before the task posted below.
230     io_thread_task_runner_->PostTask(
231         FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_)));
232   }
233 }
234 
RunCallback()235 void FileDescriptorWatcher::Controller::RunCallback() {
236   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
237 
238   WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
239 
240   // Run a copy of the callback in case this Controller is deleted by the
241   // callback. This would cause the callback itself to be deleted while it is
242   // being run.
243   RepeatingClosure callback_copy = callback_;
244   callback_copy.Run();
245 
246   // If |this| wasn't deleted, re-enable the watch.
247   if (weak_this)
248     StartWatching();
249 }
250 
FileDescriptorWatcher(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)251 FileDescriptorWatcher::FileDescriptorWatcher(
252     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)
253     : resetter_(&fd_watcher, this, nullptr),
254       io_thread_task_runner_(std::move(io_thread_task_runner)) {}
255 
256 FileDescriptorWatcher::~FileDescriptorWatcher() = default;
257 
258 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const RepeatingClosure & callback)259 FileDescriptorWatcher::WatchReadable(int fd, const RepeatingClosure& callback) {
260   return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
261 }
262 
263 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const RepeatingClosure & callback)264 FileDescriptorWatcher::WatchWritable(int fd, const RepeatingClosure& callback) {
265   return WrapUnique(
266       new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
267 }
268 
269 #if DCHECK_IS_ON()
AssertAllowed()270 void FileDescriptorWatcher::AssertAllowed() {
271   DCHECK(fd_watcher);
272 }
273 #endif
274 
275 }  // namespace base
276