1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/files/file_descriptor_watcher_posix.h"
6
7 #include "base/bind.h"
8 #include "base/lazy_instance.h"
9 #include "base/logging.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/message_loop/message_loop_current.h"
12 #include "base/message_loop/message_pump_for_io.h"
13 #include "base/sequenced_task_runner.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/threading/sequenced_task_runner_handle.h"
16 #include "base/threading/thread_checker.h"
17 #include "base/threading/thread_local.h"
18
19 namespace base {
20
21 namespace {
22
23 // MessageLoopForIO used to watch file descriptors for which callbacks are
24 // registered from a given thread.
25 LazyInstance<ThreadLocalPointer<MessageLoopForIO>>::Leaky
26 tls_message_loop_for_io = LAZY_INSTANCE_INITIALIZER;
27
28 } // namespace
29
~Controller()30 FileDescriptorWatcher::Controller::~Controller() {
31 DCHECK(sequence_checker_.CalledOnValidSequence());
32
33 // Delete |watcher_| on the MessageLoopForIO.
34 //
35 // If the MessageLoopForIO is deleted before Watcher::StartWatching() runs,
36 // |watcher_| is leaked. If the MessageLoopForIO is deleted after
37 // Watcher::StartWatching() runs but before the DeleteSoon task runs,
38 // |watcher_| is deleted from Watcher::WillDestroyCurrentMessageLoop().
39 message_loop_for_io_task_runner_->DeleteSoon(FROM_HERE, watcher_.release());
40
41 // Since WeakPtrs are invalidated by the destructor, RunCallback() won't be
42 // invoked after this returns.
43 }
44
45 class FileDescriptorWatcher::Controller::Watcher
46 : public MessagePumpForIO::FdWatcher,
47 public MessageLoopCurrent::DestructionObserver {
48 public:
49 Watcher(WeakPtr<Controller> controller, MessagePumpForIO::Mode mode, int fd);
50 ~Watcher() override;
51
52 void StartWatching();
53
54 private:
55 friend class FileDescriptorWatcher;
56
57 // MessagePumpForIO::FdWatcher:
58 void OnFileCanReadWithoutBlocking(int fd) override;
59 void OnFileCanWriteWithoutBlocking(int fd) override;
60
61 // MessageLoopCurrent::DestructionObserver:
62 void WillDestroyCurrentMessageLoop() override;
63
64 // The MessageLoopForIO's watch handle (stops the watch when destroyed).
65 MessagePumpForIO::FdWatchController fd_watch_controller_;
66
67 // Runs tasks on the sequence on which this was instantiated (i.e. the
68 // sequence on which the callback must run).
69 const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
70 SequencedTaskRunnerHandle::Get();
71
72 // The Controller that created this Watcher.
73 WeakPtr<Controller> controller_;
74
75 // Whether this Watcher is notified when |fd_| becomes readable or writable
76 // without blocking.
77 const MessagePumpForIO::Mode mode_;
78
79 // The watched file descriptor.
80 const int fd_;
81
82 // Except for the constructor, every method of this class must run on the same
83 // MessageLoopForIO thread.
84 ThreadChecker thread_checker_;
85
86 // Whether this Watcher was registered as a DestructionObserver on the
87 // MessageLoopForIO thread.
88 bool registered_as_destruction_observer_ = false;
89
90 DISALLOW_COPY_AND_ASSIGN(Watcher);
91 };
92
Watcher(WeakPtr<Controller> controller,MessagePumpForIO::Mode mode,int fd)93 FileDescriptorWatcher::Controller::Watcher::Watcher(
94 WeakPtr<Controller> controller,
95 MessagePumpForIO::Mode mode,
96 int fd)
97 : fd_watch_controller_(FROM_HERE),
98 controller_(controller),
99 mode_(mode),
100 fd_(fd) {
101 DCHECK(callback_task_runner_);
102 thread_checker_.DetachFromThread();
103 }
104
~Watcher()105 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
106 DCHECK(thread_checker_.CalledOnValidThread());
107 MessageLoopCurrentForIO::Get()->RemoveDestructionObserver(this);
108 }
109
StartWatching()110 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
111 DCHECK(thread_checker_.CalledOnValidThread());
112
113 if (!MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
114 fd_, false, mode_, &fd_watch_controller_, this)) {
115 // TODO(wez): Ideally we would [D]CHECK here, or propagate the failure back
116 // to the caller, but there is no guarantee that they haven't already
117 // closed |fd_| on another thread, so the best we can do is Debug-log.
118 DLOG(ERROR) << "Failed to watch fd=" << fd_;
119 }
120
121 if (!registered_as_destruction_observer_) {
122 MessageLoopCurrentForIO::Get()->AddDestructionObserver(this);
123 registered_as_destruction_observer_ = true;
124 }
125 }
126
OnFileCanReadWithoutBlocking(int fd)127 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
128 int fd) {
129 DCHECK_EQ(fd_, fd);
130 DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
131 DCHECK(thread_checker_.CalledOnValidThread());
132
133 // Run the callback on the sequence on which the watch was initiated.
134 callback_task_runner_->PostTask(
135 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
136 }
137
OnFileCanWriteWithoutBlocking(int fd)138 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
139 int fd) {
140 DCHECK_EQ(fd_, fd);
141 DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
142 DCHECK(thread_checker_.CalledOnValidThread());
143
144 // Run the callback on the sequence on which the watch was initiated.
145 callback_task_runner_->PostTask(
146 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
147 }
148
149 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()150 WillDestroyCurrentMessageLoop() {
151 DCHECK(thread_checker_.CalledOnValidThread());
152
153 // A Watcher is owned by a Controller. When the Controller is deleted, it
154 // transfers ownership of the Watcher to a delete task posted to the
155 // MessageLoopForIO. If the MessageLoopForIO is deleted before the delete task
156 // runs, the following line takes care of deleting the Watcher.
157 delete this;
158 }
159
Controller(MessagePumpForIO::Mode mode,int fd,const Closure & callback)160 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
161 int fd,
162 const Closure& callback)
163 : callback_(callback),
164 message_loop_for_io_task_runner_(
165 tls_message_loop_for_io.Get().Get()->task_runner()),
166 weak_factory_(this) {
167 DCHECK(!callback_.is_null());
168 DCHECK(message_loop_for_io_task_runner_);
169 watcher_ = std::make_unique<Watcher>(weak_factory_.GetWeakPtr(), mode, fd);
170 StartWatching();
171 }
172
StartWatching()173 void FileDescriptorWatcher::Controller::StartWatching() {
174 DCHECK(sequence_checker_.CalledOnValidSequence());
175 // It is safe to use Unretained() below because |watcher_| can only be deleted
176 // by a delete task posted to |message_loop_for_io_task_runner_| by this
177 // Controller's destructor. Since this delete task hasn't been posted yet, it
178 // can't run before the task posted below.
179 message_loop_for_io_task_runner_->PostTask(
180 FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_.get())));
181 }
182
RunCallback()183 void FileDescriptorWatcher::Controller::RunCallback() {
184 DCHECK(sequence_checker_.CalledOnValidSequence());
185
186 WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
187
188 callback_.Run();
189
190 // If |this| wasn't deleted, re-enable the watch.
191 if (weak_this)
192 StartWatching();
193 }
194
FileDescriptorWatcher(MessageLoopForIO * message_loop_for_io)195 FileDescriptorWatcher::FileDescriptorWatcher(
196 MessageLoopForIO* message_loop_for_io) {
197 DCHECK(message_loop_for_io);
198 DCHECK(!tls_message_loop_for_io.Get().Get());
199 tls_message_loop_for_io.Get().Set(message_loop_for_io);
200 }
201
~FileDescriptorWatcher()202 FileDescriptorWatcher::~FileDescriptorWatcher() {
203 tls_message_loop_for_io.Get().Set(nullptr);
204 }
205
206 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const Closure & callback)207 FileDescriptorWatcher::WatchReadable(int fd, const Closure& callback) {
208 return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
209 }
210
211 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const Closure & callback)212 FileDescriptorWatcher::WatchWritable(int fd, const Closure& callback) {
213 return WrapUnique(
214 new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
215 }
216
217 } // namespace base
218