• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_fuchsia.h"
6 
7 #include <lib/async-loop/cpp/loop.h>
8 #include <lib/async-loop/default.h>
9 #include <lib/fdio/io.h>
10 #include <lib/fdio/unsafe.h>
11 #include <lib/zx/time.h>
12 #include <zircon/errors.h>
13 
14 #include "base/auto_reset.h"
15 #include "base/check.h"
16 #include "base/fuchsia/fuchsia_logging.h"
17 #include "base/logging.h"
18 #include "base/trace_event/base_tracing.h"
19 
20 namespace base {
21 
ZxHandleWatchController(const Location & from_here)22 MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
23     const Location& from_here)
24     : async_wait_t({}), created_from_location_(from_here) {}
25 
~ZxHandleWatchController()26 MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
27   const bool success = StopWatchingZxHandle();
28   CHECK(success);
29 }
30 
WaitBegin()31 bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
32   DCHECK(!handler);
33   async_wait_t::handler = &HandleSignal;
34 
35   zx_status_t status =
36       async_begin_wait(weak_pump_->async_loop_->dispatcher(), this);
37   if (status != ZX_OK) {
38     ZX_DLOG(ERROR, status) << "async_begin_wait():"
39                            << created_from_location_.ToString();
40     async_wait_t::handler = nullptr;
41     return false;
42   }
43 
44   return true;
45 }
46 
StopWatchingZxHandle()47 bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
48   if (was_stopped_) {
49     DCHECK(!*was_stopped_);
50     *was_stopped_ = true;
51 
52     // |was_stopped_| points at a value stored on the stack, which will go out
53     // of scope. MessagePumpFuchsia::Run() will reset it only if the value is
54     // false. So we need to reset this pointer here as well, to make sure it's
55     // not used again.
56     was_stopped_ = nullptr;
57   }
58 
59   // If the pump is gone then there is nothing to cancel.
60   if (!weak_pump_)
61     return true;
62 
63   if (!is_active())
64     return true;
65 
66   async_wait_t::handler = nullptr;
67 
68   zx_status_t result =
69       async_cancel_wait(weak_pump_->async_loop_->dispatcher(), this);
70   ZX_DLOG_IF(ERROR, result != ZX_OK, result)
71       << "async_cancel_wait(): " << created_from_location_.ToString();
72   return result == ZX_OK;
73 }
74 
75 // static
HandleSignal(async_dispatcher_t * async,async_wait_t * wait,zx_status_t status,const zx_packet_signal_t * signal)76 void MessagePumpFuchsia::ZxHandleWatchController::HandleSignal(
77     async_dispatcher_t* async,
78     async_wait_t* wait,
79     zx_status_t status,
80     const zx_packet_signal_t* signal) {
81   ZxHandleWatchController* controller =
82       static_cast<ZxHandleWatchController*>(wait);
83   DCHECK_EQ(controller->handler, &HandleSignal);
84 
85   // Inform ThreadController of this native work item for tracking purposes.
86   // This call must precede the "ZxHandleSignal" trace event below as
87   // BeginWorkItem() might generate a top-level trace event for the thread if
88   // this is the first work item post-wakeup.
89   Delegate::ScopedDoWorkItem scoped_do_work_item;
90   if (controller->weak_pump_ && controller->weak_pump_->run_state_) {
91     scoped_do_work_item =
92         controller->weak_pump_->run_state_->delegate->BeginWorkItem();
93   }
94 
95   TRACE_EVENT0("toplevel", "ZxHandleSignal");
96 
97   if (status != ZX_OK) {
98     ZX_DLOG(WARNING, status) << "async wait failed: "
99                              << controller->created_from_location_.ToString();
100     return;
101   }
102 
103   controller->handler = nullptr;
104 
105   // In the case of a persistent Watch, the Watch may be stopped and
106   // potentially deleted by the caller within the callback, in which case
107   // |controller| should not be accessed again, and we mustn't continue the
108   // watch. We check for this with a bool on the stack, which the Watch
109   // receives a pointer to.
110   bool was_stopped = false;
111   controller->was_stopped_ = &was_stopped;
112 
113   controller->watcher_->OnZxHandleSignalled(wait->object, signal->observed);
114 
115   if (was_stopped)
116     return;
117 
118   controller->was_stopped_ = nullptr;
119 
120   if (controller->persistent_)
121     controller->WaitBegin();
122 }
123 
OnZxHandleSignalled(zx_handle_t handle,zx_signals_t signals)124 void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
125     zx_handle_t handle,
126     zx_signals_t signals) {
127   uint32_t events;
128   fdio_unsafe_wait_end(io_, signals, &events);
129 
130   // |events| can include other spurious things, in particular, that an fd
131   // is writable, when we only asked to know when it was readable. In that
132   // case, we don't want to call both the CanWrite and CanRead callback,
133   // when the caller asked for only, for example, readable callbacks. So,
134   // mask with the events that we actually wanted to know about.
135   //
136   // Note that errors are always included.
137   const uint32_t desired_events = desired_events_ | FDIO_EVT_ERROR;
138   const uint32_t filtered_events = events & desired_events;
139   DCHECK_NE(filtered_events, 0u) << events << " & " << desired_events;
140 
141   // Each |watcher_| callback we invoke may stop or delete |this|. The pump has
142   // set |was_stopped_| to point to a safe location on the calling stack, so we
143   // can use that to detect being stopped mid-callback and avoid doing further
144   // work that would touch |this|.
145   bool* was_stopped = was_stopped_;
146   if (filtered_events & FDIO_EVT_WRITABLE)
147     watcher_->OnFileCanWriteWithoutBlocking(fd_);
148   if (!*was_stopped && (filtered_events & FDIO_EVT_READABLE))
149     watcher_->OnFileCanReadWithoutBlocking(fd_);
150 
151   // Don't add additional work here without checking |*was_stopped_| again.
152 }
153 
FdWatchController(const Location & from_here)154 MessagePumpFuchsia::FdWatchController::FdWatchController(
155     const Location& from_here)
156     : FdWatchControllerInterface(from_here),
157       ZxHandleWatchController(from_here) {}
158 
~FdWatchController()159 MessagePumpFuchsia::FdWatchController::~FdWatchController() {
160   const bool success = StopWatchingFileDescriptor();
161   CHECK(success);
162 }
163 
WaitBegin()164 bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
165   // Refresh the |handle_| and |desired_signals_| from the fdio for the fd.
166   // Some types of fdio map read/write events to different signals depending on
167   // their current state, so we must do this every time we begin to wait.
168   fdio_unsafe_wait_begin(io_, desired_events_, &object, &trigger);
169   if (async_wait_t::object == ZX_HANDLE_INVALID) {
170     DLOG(ERROR) << "fdio_wait_begin failed: "
171                 << ZxHandleWatchController::created_from_location_.ToString();
172     return false;
173   }
174 
175   return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
176 }
177 
StopWatchingFileDescriptor()178 bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
179   bool success = StopWatchingZxHandle();
180   if (io_) {
181     fdio_unsafe_release(io_);
182     io_ = nullptr;
183   }
184   return success;
185 }
186 
MessagePumpFuchsia()187 MessagePumpFuchsia::MessagePumpFuchsia()
188     : async_loop_(new async::Loop(&kAsyncLoopConfigAttachToCurrentThread)),
189       weak_factory_(this) {}
190 MessagePumpFuchsia::~MessagePumpFuchsia() = default;
191 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)192 bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
193                                              bool persistent,
194                                              int mode,
195                                              FdWatchController* controller,
196                                              FdWatcher* delegate) {
197   DCHECK_GE(fd, 0);
198   DCHECK(controller);
199   DCHECK(delegate);
200 
201   const bool success = controller->StopWatchingFileDescriptor();
202   CHECK(success);
203 
204   controller->fd_ = fd;
205   controller->watcher_ = delegate;
206 
207   DCHECK(!controller->io_);
208   controller->io_ = fdio_unsafe_fd_to_io(fd);
209   if (!controller->io_) {
210     DLOG(ERROR) << "Failed to get IO for FD";
211     return false;
212   }
213 
214   switch (mode) {
215     case WATCH_READ:
216       controller->desired_events_ = FDIO_EVT_READABLE;
217       break;
218     case WATCH_WRITE:
219       controller->desired_events_ = FDIO_EVT_WRITABLE;
220       break;
221     case WATCH_READ_WRITE:
222       controller->desired_events_ = FDIO_EVT_READABLE | FDIO_EVT_WRITABLE;
223       break;
224     default:
225       NOTREACHED() << "unexpected mode: " << mode;
226   }
227 
228   // Pass dummy |handle| and |signals| values to WatchZxHandle(). The real
229   // values will be populated by FdWatchController::WaitBegin(), before actually
230   // starting the wait operation.
231   return WatchZxHandle(ZX_HANDLE_INVALID, persistent, 1, controller,
232                        controller);
233 }
234 
WatchZxHandle(zx_handle_t handle,bool persistent,zx_signals_t signals,ZxHandleWatchController * controller,ZxHandleWatcher * delegate)235 bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
236                                        bool persistent,
237                                        zx_signals_t signals,
238                                        ZxHandleWatchController* controller,
239                                        ZxHandleWatcher* delegate) {
240   DCHECK_NE(0u, signals);
241   DCHECK(controller);
242   DCHECK(delegate);
243 
244   // If the watch controller is active then WatchZxHandle() can be called only
245   // for the same handle.
246   DCHECK(handle == ZX_HANDLE_INVALID || !controller->is_active() ||
247          handle == controller->async_wait_t::object);
248 
249   const bool success = controller->StopWatchingZxHandle();
250   CHECK(success);
251 
252   controller->async_wait_t::object = handle;
253   controller->persistent_ = persistent;
254   controller->async_wait_t::trigger = signals;
255   controller->watcher_ = delegate;
256 
257   controller->weak_pump_ = weak_factory_.GetWeakPtr();
258 
259   return controller->WaitBegin();
260 }
261 
HandleIoEventsUntil(zx_time_t deadline)262 bool MessagePumpFuchsia::HandleIoEventsUntil(zx_time_t deadline) {
263   zx_status_t status = async_loop_->Run(zx::time(deadline), /*once=*/true);
264   switch (status) {
265     // Return true if some tasks or events were dispatched or if the dispatcher
266     // was stopped by ScheduleWork().
267     case ZX_OK:
268       return true;
269 
270     case ZX_ERR_CANCELED:
271       async_loop_->ResetQuit();
272       return true;
273 
274     case ZX_ERR_TIMED_OUT:
275       return false;
276 
277     default:
278       ZX_DLOG(FATAL, status) << "unexpected wait status";
279       return false;
280   }
281 }
282 
Run(Delegate * delegate)283 void MessagePumpFuchsia::Run(Delegate* delegate) {
284   RunState run_state(delegate);
285   AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
286 
287   for (;;) {
288     const Delegate::NextWorkInfo next_work_info = delegate->DoWork();
289     if (run_state.should_quit) {
290       break;
291     }
292 
293     const bool did_handle_io_event = HandleIoEventsUntil(/*deadline=*/0);
294     if (run_state.should_quit) {
295       break;
296     }
297 
298     bool attempt_more_work =
299         next_work_info.is_immediate() || did_handle_io_event;
300     if (attempt_more_work)
301       continue;
302 
303     delegate->DoIdleWork();
304     if (run_state.should_quit) {
305       break;
306     }
307 
308     delegate->BeforeWait();
309 
310     zx_time_t deadline = next_work_info.delayed_run_time.is_max()
311                              ? ZX_TIME_INFINITE
312                              : next_work_info.delayed_run_time.ToZxTime();
313 
314     HandleIoEventsUntil(deadline);
315   }
316 }
317 
Quit()318 void MessagePumpFuchsia::Quit() {
319   CHECK(run_state_);
320   run_state_->should_quit = true;
321 }
322 
ScheduleWork()323 void MessagePumpFuchsia::ScheduleWork() {
324   // Stop async_loop to let MessagePumpFuchsia::Run() handle message loop tasks.
325   async_loop_->Quit();
326 }
327 
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)328 void MessagePumpFuchsia::ScheduleDelayedWork(
329     const Delegate::NextWorkInfo& next_work_info) {
330   // Since this is always called from the same thread as Run(), there is nothing
331   // to do as the loop is already running. It will wait in Run() with the
332   // correct timeout when it's out of immediate tasks.
333   // TODO(crbug.com/40594269): Consider removing ScheduleDelayedWork()
334   // when all pumps function this way (bit.ly/merge-message-pump-do-work).
335 }
336 
337 }  // namespace base
338