• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/files/file_descriptor_watcher_posix.h"
6 
7 #include "base/bind.h"
8 #include "base/lazy_instance.h"
9 #include "base/logging.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/message_loop/message_loop_current.h"
12 #include "base/message_loop/message_pump_for_io.h"
13 #include "base/sequenced_task_runner.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/threading/sequenced_task_runner_handle.h"
16 #include "base/threading/thread_checker.h"
17 #include "base/threading/thread_local.h"
18 
19 namespace base {
20 
21 namespace {
22 
23 // MessageLoopForIO used to watch file descriptors for which callbacks are
24 // registered from a given thread.
25 LazyInstance<ThreadLocalPointer<MessageLoopForIO>>::Leaky
26     tls_message_loop_for_io = LAZY_INSTANCE_INITIALIZER;
27 
28 }  // namespace
29 
~Controller()30 FileDescriptorWatcher::Controller::~Controller() {
31   DCHECK(sequence_checker_.CalledOnValidSequence());
32 
33   // Delete |watcher_| on the MessageLoopForIO.
34   //
35   // If the MessageLoopForIO is deleted before Watcher::StartWatching() runs,
36   // |watcher_| is leaked. If the MessageLoopForIO is deleted after
37   // Watcher::StartWatching() runs but before the DeleteSoon task runs,
38   // |watcher_| is deleted from Watcher::WillDestroyCurrentMessageLoop().
39   message_loop_for_io_task_runner_->DeleteSoon(FROM_HERE, watcher_.release());
40 
41   // Since WeakPtrs are invalidated by the destructor, RunCallback() won't be
42   // invoked after this returns.
43 }
44 
45 class FileDescriptorWatcher::Controller::Watcher
46     : public MessagePumpForIO::FdWatcher,
47       public MessageLoopCurrent::DestructionObserver {
48  public:
49   Watcher(WeakPtr<Controller> controller, MessagePumpForIO::Mode mode, int fd);
50   ~Watcher() override;
51 
52   void StartWatching();
53 
54  private:
55   friend class FileDescriptorWatcher;
56 
57   // MessagePumpForIO::FdWatcher:
58   void OnFileCanReadWithoutBlocking(int fd) override;
59   void OnFileCanWriteWithoutBlocking(int fd) override;
60 
61   // MessageLoopCurrent::DestructionObserver:
62   void WillDestroyCurrentMessageLoop() override;
63 
64   // The MessageLoopForIO's watch handle (stops the watch when destroyed).
65   MessagePumpForIO::FdWatchController fd_watch_controller_;
66 
67   // Runs tasks on the sequence on which this was instantiated (i.e. the
68   // sequence on which the callback must run).
69   const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
70       SequencedTaskRunnerHandle::Get();
71 
72   // The Controller that created this Watcher.
73   WeakPtr<Controller> controller_;
74 
75   // Whether this Watcher is notified when |fd_| becomes readable or writable
76   // without blocking.
77   const MessagePumpForIO::Mode mode_;
78 
79   // The watched file descriptor.
80   const int fd_;
81 
82   // Except for the constructor, every method of this class must run on the same
83   // MessageLoopForIO thread.
84   ThreadChecker thread_checker_;
85 
86   // Whether this Watcher was registered as a DestructionObserver on the
87   // MessageLoopForIO thread.
88   bool registered_as_destruction_observer_ = false;
89 
90   DISALLOW_COPY_AND_ASSIGN(Watcher);
91 };
92 
Watcher(WeakPtr<Controller> controller,MessagePumpForIO::Mode mode,int fd)93 FileDescriptorWatcher::Controller::Watcher::Watcher(
94     WeakPtr<Controller> controller,
95     MessagePumpForIO::Mode mode,
96     int fd)
97     : fd_watch_controller_(FROM_HERE),
98       controller_(controller),
99       mode_(mode),
100       fd_(fd) {
101   DCHECK(callback_task_runner_);
102   thread_checker_.DetachFromThread();
103 }
104 
~Watcher()105 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
106   DCHECK(thread_checker_.CalledOnValidThread());
107   MessageLoopCurrentForIO::Get()->RemoveDestructionObserver(this);
108 }
109 
StartWatching()110 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
111   DCHECK(thread_checker_.CalledOnValidThread());
112 
113   if (!MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
114           fd_, false, mode_, &fd_watch_controller_, this)) {
115     // TODO(wez): Ideally we would [D]CHECK here, or propagate the failure back
116     // to the caller, but there is no guarantee that they haven't already
117     // closed |fd_| on another thread, so the best we can do is Debug-log.
118     DLOG(ERROR) << "Failed to watch fd=" << fd_;
119   }
120 
121   if (!registered_as_destruction_observer_) {
122     MessageLoopCurrentForIO::Get()->AddDestructionObserver(this);
123     registered_as_destruction_observer_ = true;
124   }
125 }
126 
OnFileCanReadWithoutBlocking(int fd)127 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
128     int fd) {
129   DCHECK_EQ(fd_, fd);
130   DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
131   DCHECK(thread_checker_.CalledOnValidThread());
132 
133   // Run the callback on the sequence on which the watch was initiated.
134   callback_task_runner_->PostTask(
135       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
136 }
137 
OnFileCanWriteWithoutBlocking(int fd)138 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
139     int fd) {
140   DCHECK_EQ(fd_, fd);
141   DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
142   DCHECK(thread_checker_.CalledOnValidThread());
143 
144   // Run the callback on the sequence on which the watch was initiated.
145   callback_task_runner_->PostTask(
146       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
147 }
148 
149 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()150     WillDestroyCurrentMessageLoop() {
151   DCHECK(thread_checker_.CalledOnValidThread());
152 
153   // A Watcher is owned by a Controller. When the Controller is deleted, it
154   // transfers ownership of the Watcher to a delete task posted to the
155   // MessageLoopForIO. If the MessageLoopForIO is deleted before the delete task
156   // runs, the following line takes care of deleting the Watcher.
157   delete this;
158 }
159 
Controller(MessagePumpForIO::Mode mode,int fd,const Closure & callback)160 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
161                                               int fd,
162                                               const Closure& callback)
163     : callback_(callback),
164       message_loop_for_io_task_runner_(
165           tls_message_loop_for_io.Get().Get()->task_runner()),
166       weak_factory_(this) {
167   DCHECK(!callback_.is_null());
168   DCHECK(message_loop_for_io_task_runner_);
169   watcher_ = std::make_unique<Watcher>(weak_factory_.GetWeakPtr(), mode, fd);
170   StartWatching();
171 }
172 
StartWatching()173 void FileDescriptorWatcher::Controller::StartWatching() {
174   DCHECK(sequence_checker_.CalledOnValidSequence());
175   // It is safe to use Unretained() below because |watcher_| can only be deleted
176   // by a delete task posted to |message_loop_for_io_task_runner_| by this
177   // Controller's destructor. Since this delete task hasn't been posted yet, it
178   // can't run before the task posted below.
179   message_loop_for_io_task_runner_->PostTask(
180       FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_.get())));
181 }
182 
RunCallback()183 void FileDescriptorWatcher::Controller::RunCallback() {
184   DCHECK(sequence_checker_.CalledOnValidSequence());
185 
186   WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
187 
188   callback_.Run();
189 
190   // If |this| wasn't deleted, re-enable the watch.
191   if (weak_this)
192     StartWatching();
193 }
194 
FileDescriptorWatcher(MessageLoopForIO * message_loop_for_io)195 FileDescriptorWatcher::FileDescriptorWatcher(
196     MessageLoopForIO* message_loop_for_io) {
197   DCHECK(message_loop_for_io);
198   DCHECK(!tls_message_loop_for_io.Get().Get());
199   tls_message_loop_for_io.Get().Set(message_loop_for_io);
200 }
201 
~FileDescriptorWatcher()202 FileDescriptorWatcher::~FileDescriptorWatcher() {
203   tls_message_loop_for_io.Get().Set(nullptr);
204 }
205 
206 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const Closure & callback)207 FileDescriptorWatcher::WatchReadable(int fd, const Closure& callback) {
208   return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
209 }
210 
211 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const Closure & callback)212 FileDescriptorWatcher::WatchWritable(int fd, const Closure& callback) {
213   return WrapUnique(
214       new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
215 }
216 
217 }  // namespace base
218