• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_fuchsia.h"
6 
7 #include <lib/async-loop/cpp/loop.h>
8 #include <lib/async-loop/default.h>
9 #include <lib/fdio/io.h>
10 #include <lib/fdio/unsafe.h>
11 #include <lib/zx/time.h>
12 #include <zircon/errors.h>
13 
14 #include "base/auto_reset.h"
15 #include "base/fuchsia/fuchsia_logging.h"
16 #include "base/logging.h"
17 #include "base/trace_event/base_tracing.h"
18 
19 namespace base {
20 
ZxHandleWatchController(const Location & from_here)21 MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
22     const Location& from_here)
23     : async_wait_t({}), created_from_location_(from_here) {}
24 
~ZxHandleWatchController()25 MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
26   if (!StopWatchingZxHandle())
27     NOTREACHED();
28 }
29 
WaitBegin()30 bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
31   DCHECK(!handler);
32   async_wait_t::handler = &HandleSignal;
33 
34   zx_status_t status =
35       async_begin_wait(weak_pump_->async_loop_->dispatcher(), this);
36   if (status != ZX_OK) {
37     ZX_DLOG(ERROR, status) << "async_begin_wait():"
38                            << created_from_location_.ToString();
39     async_wait_t::handler = nullptr;
40     return false;
41   }
42 
43   return true;
44 }
45 
StopWatchingZxHandle()46 bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
47   if (was_stopped_) {
48     DCHECK(!*was_stopped_);
49     *was_stopped_ = true;
50 
51     // |was_stopped_| points at a value stored on the stack, which will go out
52     // of scope. MessagePumpFuchsia::Run() will reset it only if the value is
53     // false. So we need to reset this pointer here as well, to make sure it's
54     // not used again.
55     was_stopped_ = nullptr;
56   }
57 
58   // If the pump is gone then there is nothing to cancel.
59   if (!weak_pump_)
60     return true;
61 
62   if (!is_active())
63     return true;
64 
65   async_wait_t::handler = nullptr;
66 
67   zx_status_t result =
68       async_cancel_wait(weak_pump_->async_loop_->dispatcher(), this);
69   ZX_DLOG_IF(ERROR, result != ZX_OK, result)
70       << "async_cancel_wait(): " << created_from_location_.ToString();
71   return result == ZX_OK;
72 }
73 
74 // static
HandleSignal(async_dispatcher_t * async,async_wait_t * wait,zx_status_t status,const zx_packet_signal_t * signal)75 void MessagePumpFuchsia::ZxHandleWatchController::HandleSignal(
76     async_dispatcher_t* async,
77     async_wait_t* wait,
78     zx_status_t status,
79     const zx_packet_signal_t* signal) {
80   TRACE_EVENT0("toplevel", "ZxHandleSignal");
81 
82   ZxHandleWatchController* controller =
83       static_cast<ZxHandleWatchController*>(wait);
84   DCHECK_EQ(controller->handler, &HandleSignal);
85 
86   if (status != ZX_OK) {
87     ZX_DLOG(WARNING, status) << "async wait failed: "
88                              << controller->created_from_location_.ToString();
89     return;
90   }
91 
92   controller->handler = nullptr;
93 
94   // In the case of a persistent Watch, the Watch may be stopped and
95   // potentially deleted by the caller within the callback, in which case
96   // |controller| should not be accessed again, and we mustn't continue the
97   // watch. We check for this with a bool on the stack, which the Watch
98   // receives a pointer to.
99   bool was_stopped = false;
100   controller->was_stopped_ = &was_stopped;
101 
102   controller->watcher_->OnZxHandleSignalled(wait->object, signal->observed);
103 
104   if (was_stopped)
105     return;
106 
107   controller->was_stopped_ = nullptr;
108 
109   if (controller->persistent_)
110     controller->WaitBegin();
111 }
112 
OnZxHandleSignalled(zx_handle_t handle,zx_signals_t signals)113 void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
114     zx_handle_t handle,
115     zx_signals_t signals) {
116   uint32_t events;
117   fdio_unsafe_wait_end(io_, signals, &events);
118 
119   // |events| can include other spurious things, in particular, that an fd
120   // is writable, when we only asked to know when it was readable. In that
121   // case, we don't want to call both the CanWrite and CanRead callback,
122   // when the caller asked for only, for example, readable callbacks. So,
123   // mask with the events that we actually wanted to know about.
124   //
125   // Note that errors are always included.
126   const uint32_t desired_events = desired_events_ | FDIO_EVT_ERROR;
127   const uint32_t filtered_events = events & desired_events;
128   DCHECK_NE(filtered_events, 0u) << events << " & " << desired_events;
129 
130   // Each |watcher_| callback we invoke may stop or delete |this|. The pump has
131   // set |was_stopped_| to point to a safe location on the calling stack, so we
132   // can use that to detect being stopped mid-callback and avoid doing further
133   // work that would touch |this|.
134   bool* was_stopped = was_stopped_;
135   if (filtered_events & FDIO_EVT_WRITABLE)
136     watcher_->OnFileCanWriteWithoutBlocking(fd_);
137   if (!*was_stopped && (filtered_events & FDIO_EVT_READABLE))
138     watcher_->OnFileCanReadWithoutBlocking(fd_);
139 
140   // Don't add additional work here without checking |*was_stopped_| again.
141 }
142 
FdWatchController(const Location & from_here)143 MessagePumpFuchsia::FdWatchController::FdWatchController(
144     const Location& from_here)
145     : FdWatchControllerInterface(from_here),
146       ZxHandleWatchController(from_here) {}
147 
~FdWatchController()148 MessagePumpFuchsia::FdWatchController::~FdWatchController() {
149   if (!StopWatchingFileDescriptor())
150     NOTREACHED();
151 }
152 
WaitBegin()153 bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
154   // Refresh the |handle_| and |desired_signals_| from the fdio for the fd.
155   // Some types of fdio map read/write events to different signals depending on
156   // their current state, so we must do this every time we begin to wait.
157   fdio_unsafe_wait_begin(io_, desired_events_, &object, &trigger);
158   if (async_wait_t::object == ZX_HANDLE_INVALID) {
159     DLOG(ERROR) << "fdio_wait_begin failed: "
160                 << ZxHandleWatchController::created_from_location_.ToString();
161     return false;
162   }
163 
164   return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
165 }
166 
StopWatchingFileDescriptor()167 bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
168   bool success = StopWatchingZxHandle();
169   if (io_) {
170     fdio_unsafe_release(io_);
171     io_ = nullptr;
172   }
173   return success;
174 }
175 
MessagePumpFuchsia()176 MessagePumpFuchsia::MessagePumpFuchsia()
177     : async_loop_(new async::Loop(&kAsyncLoopConfigAttachToCurrentThread)),
178       weak_factory_(this) {}
179 MessagePumpFuchsia::~MessagePumpFuchsia() = default;
180 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)181 bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
182                                              bool persistent,
183                                              int mode,
184                                              FdWatchController* controller,
185                                              FdWatcher* delegate) {
186   DCHECK_GE(fd, 0);
187   DCHECK(controller);
188   DCHECK(delegate);
189 
190   if (!controller->StopWatchingFileDescriptor())
191     NOTREACHED();
192 
193   controller->fd_ = fd;
194   controller->watcher_ = delegate;
195 
196   DCHECK(!controller->io_);
197   controller->io_ = fdio_unsafe_fd_to_io(fd);
198   if (!controller->io_) {
199     DLOG(ERROR) << "Failed to get IO for FD";
200     return false;
201   }
202 
203   switch (mode) {
204     case WATCH_READ:
205       controller->desired_events_ = FDIO_EVT_READABLE;
206       break;
207     case WATCH_WRITE:
208       controller->desired_events_ = FDIO_EVT_WRITABLE;
209       break;
210     case WATCH_READ_WRITE:
211       controller->desired_events_ = FDIO_EVT_READABLE | FDIO_EVT_WRITABLE;
212       break;
213     default:
214       NOTREACHED() << "unexpected mode: " << mode;
215       return false;
216   }
217 
218   // Pass dummy |handle| and |signals| values to WatchZxHandle(). The real
219   // values will be populated by FdWatchController::WaitBegin(), before actually
220   // starting the wait operation.
221   return WatchZxHandle(ZX_HANDLE_INVALID, persistent, 1, controller,
222                        controller);
223 }
224 
WatchZxHandle(zx_handle_t handle,bool persistent,zx_signals_t signals,ZxHandleWatchController * controller,ZxHandleWatcher * delegate)225 bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
226                                        bool persistent,
227                                        zx_signals_t signals,
228                                        ZxHandleWatchController* controller,
229                                        ZxHandleWatcher* delegate) {
230   DCHECK_NE(0u, signals);
231   DCHECK(controller);
232   DCHECK(delegate);
233 
234   // If the watch controller is active then WatchZxHandle() can be called only
235   // for the same handle.
236   DCHECK(handle == ZX_HANDLE_INVALID || !controller->is_active() ||
237          handle == controller->async_wait_t::object);
238 
239   if (!controller->StopWatchingZxHandle())
240     NOTREACHED();
241 
242   controller->async_wait_t::object = handle;
243   controller->persistent_ = persistent;
244   controller->async_wait_t::trigger = signals;
245   controller->watcher_ = delegate;
246 
247   controller->weak_pump_ = weak_factory_.GetWeakPtr();
248 
249   return controller->WaitBegin();
250 }
251 
HandleIoEventsUntil(zx_time_t deadline)252 bool MessagePumpFuchsia::HandleIoEventsUntil(zx_time_t deadline) {
253   zx_status_t status = async_loop_->Run(zx::time(deadline), /*once=*/true);
254   switch (status) {
255     // Return true if some tasks or events were dispatched or if the dispatcher
256     // was stopped by ScheduleWork().
257     case ZX_OK:
258       return true;
259 
260     case ZX_ERR_CANCELED:
261       async_loop_->ResetQuit();
262       return true;
263 
264     case ZX_ERR_TIMED_OUT:
265       return false;
266 
267     default:
268       ZX_DLOG(DCHECK, status) << "unexpected wait status";
269       return false;
270   }
271 }
272 
Run(Delegate * delegate)273 void MessagePumpFuchsia::Run(Delegate* delegate) {
274   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
275 
276   for (;;) {
277     const Delegate::NextWorkInfo next_work_info = delegate->DoWork();
278     if (!keep_running_)
279       break;
280 
281     const bool did_handle_io_event = HandleIoEventsUntil(/*deadline=*/0);
282     if (!keep_running_)
283       break;
284 
285     bool attempt_more_work =
286         next_work_info.is_immediate() || did_handle_io_event;
287     if (attempt_more_work)
288       continue;
289 
290     attempt_more_work = delegate->DoIdleWork();
291     if (!keep_running_)
292       break;
293 
294     if (attempt_more_work)
295       continue;
296 
297     zx_time_t deadline = next_work_info.delayed_run_time.is_max()
298                              ? ZX_TIME_INFINITE
299                              : next_work_info.delayed_run_time.ToZxTime();
300 
301     HandleIoEventsUntil(deadline);
302   }
303 }
304 
Quit()305 void MessagePumpFuchsia::Quit() {
306   keep_running_ = false;
307 }
308 
ScheduleWork()309 void MessagePumpFuchsia::ScheduleWork() {
310   // Stop async_loop to let MessagePumpFuchsia::Run() handle message loop tasks.
311   async_loop_->Quit();
312 }
313 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)314 void MessagePumpFuchsia::ScheduleDelayedWork(
315     const Delegate::NextWorkInfo& next_work_info) {
316   // Since this is always called from the same thread as Run(), there is nothing
317   // to do as the loop is already running. It will wait in Run() with the
318   // correct timeout when it's out of immediate tasks.
319   // TODO(https://crbug.com/885371): Consider removing ScheduleDelayedWork()
320   // when all pumps function this way (bit.ly/merge-message-pump-do-work).
321 }
322 
323 }  // namespace base
324