• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <unistd.h>
9 
10 #include <utility>
11 
12 #include "base/auto_reset.h"
13 #include "base/compiler_specific.h"
14 #include "base/files/file_util.h"
15 #include "base/logging.h"
16 #include "base/posix/eintr_wrapper.h"
17 #include "base/third_party/libevent/event.h"
18 #include "base/time/time.h"
19 #include "base/trace_event/trace_event.h"
20 #include "build/build_config.h"
21 
22 #if defined(OS_MACOSX)
23 #include "base/mac/scoped_nsautorelease_pool.h"
24 #endif
25 
26 // Lifecycle of struct event
27 // Libevent uses two main data structures:
28 // struct event_base (of which there is one per message pump), and
29 // struct event (of which there is roughly one per socket).
30 // The socket's struct event is created in
31 // MessagePumpLibevent::WatchFileDescriptor(),
32 // is owned by the FdWatchController, and is destroyed in
33 // StopWatchingFileDescriptor().
34 // It is moved into and out of lists in struct event_base by
35 // the libevent functions event_add() and event_del().
36 //
37 // TODO(dkegel):
38 // At the moment bad things happen if a FdWatchController
39 // is active after its MessagePumpLibevent has been destroyed.
40 // See MessageLoopTest.FdWatchControllerOutlivesMessageLoop
41 // Not clear yet whether that situation occurs in practice,
42 // but if it does, we need to fix it.
43 
44 namespace base {
45 
FdWatchController(const Location & from_here)46 MessagePumpLibevent::FdWatchController::FdWatchController(
47     const Location& from_here)
48     : FdWatchControllerInterface(from_here) {}
49 
~FdWatchController()50 MessagePumpLibevent::FdWatchController::~FdWatchController() {
51   if (event_) {
52     StopWatchingFileDescriptor();
53   }
54   if (was_destroyed_) {
55     DCHECK(!*was_destroyed_);
56     *was_destroyed_ = true;
57   }
58 }
59 
StopWatchingFileDescriptor()60 bool MessagePumpLibevent::FdWatchController::StopWatchingFileDescriptor() {
61   std::unique_ptr<event> e = ReleaseEvent();
62   if (!e)
63     return true;
64 
65   // event_del() is a no-op if the event isn't active.
66   int rv = event_del(e.get());
67   pump_ = nullptr;
68   watcher_ = nullptr;
69   return (rv == 0);
70 }
71 
Init(std::unique_ptr<event> e)72 void MessagePumpLibevent::FdWatchController::Init(std::unique_ptr<event> e) {
73   DCHECK(e);
74   DCHECK(!event_);
75 
76   event_ = std::move(e);
77 }
78 
ReleaseEvent()79 std::unique_ptr<event> MessagePumpLibevent::FdWatchController::ReleaseEvent() {
80   return std::move(event_);
81 }
82 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)83 void MessagePumpLibevent::FdWatchController::OnFileCanReadWithoutBlocking(
84     int fd,
85     MessagePumpLibevent* pump) {
86   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
87   // watching the file descriptor.
88   if (!watcher_)
89     return;
90   watcher_->OnFileCanReadWithoutBlocking(fd);
91 }
92 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)93 void MessagePumpLibevent::FdWatchController::OnFileCanWriteWithoutBlocking(
94     int fd,
95     MessagePumpLibevent* pump) {
96   DCHECK(watcher_);
97   watcher_->OnFileCanWriteWithoutBlocking(fd);
98 }
99 
MessagePumpLibevent()100 MessagePumpLibevent::MessagePumpLibevent()
101     : keep_running_(true),
102       in_run_(false),
103       processed_io_events_(false),
104       event_base_(event_base_new()),
105       wakeup_pipe_in_(-1),
106       wakeup_pipe_out_(-1) {
107   if (!Init())
108     NOTREACHED();
109 }
110 
~MessagePumpLibevent()111 MessagePumpLibevent::~MessagePumpLibevent() {
112   DCHECK(wakeup_event_);
113   DCHECK(event_base_);
114   event_del(wakeup_event_);
115   delete wakeup_event_;
116   if (wakeup_pipe_in_ >= 0) {
117     if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
118       DPLOG(ERROR) << "close";
119   }
120   if (wakeup_pipe_out_ >= 0) {
121     if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
122       DPLOG(ERROR) << "close";
123   }
124   event_base_free(event_base_);
125 }
126 
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)127 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
128                                               bool persistent,
129                                               int mode,
130                                               FdWatchController* controller,
131                                               FdWatcher* delegate) {
132   DCHECK_GE(fd, 0);
133   DCHECK(controller);
134   DCHECK(delegate);
135   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
136   // WatchFileDescriptor should be called on the pump thread. It is not
137   // threadsafe, and your watcher may never be registered.
138   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
139 
140   int event_mask = persistent ? EV_PERSIST : 0;
141   if (mode & WATCH_READ) {
142     event_mask |= EV_READ;
143   }
144   if (mode & WATCH_WRITE) {
145     event_mask |= EV_WRITE;
146   }
147 
148   std::unique_ptr<event> evt(controller->ReleaseEvent());
149   if (!evt) {
150     // Ownership is transferred to the controller.
151     evt.reset(new event);
152   } else {
153     // Make sure we don't pick up any funky internal libevent masks.
154     int old_interest_mask = evt->ev_events & (EV_READ | EV_WRITE | EV_PERSIST);
155 
156     // Combine old/new event masks.
157     event_mask |= old_interest_mask;
158 
159     // Must disarm the event before we can reuse it.
160     event_del(evt.get());
161 
162     // It's illegal to use this function to listen on 2 separate fds with the
163     // same |controller|.
164     if (EVENT_FD(evt.get()) != fd) {
165       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
166       return false;
167     }
168   }
169 
170   // Set current interest mask and message pump for this event.
171   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
172 
173   // Tell libevent which message pump this socket will belong to when we add it.
174   if (event_base_set(event_base_, evt.get())) {
175     DPLOG(ERROR) << "event_base_set(fd=" << EVENT_FD(evt.get()) << ")";
176     return false;
177   }
178 
179   // Add this socket to the list of monitored sockets.
180   if (event_add(evt.get(), nullptr)) {
181     DPLOG(ERROR) << "event_add failed(fd=" << EVENT_FD(evt.get()) << ")";
182     return false;
183   }
184 
185   controller->Init(std::move(evt));
186   controller->set_watcher(delegate);
187   controller->set_pump(this);
188   return true;
189 }
190 
191 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)192 static void timer_callback(int fd, short events, void* context) {
193   event_base_loopbreak((struct event_base*)context);
194 }
195 
196 // Reentrant!
Run(Delegate * delegate)197 void MessagePumpLibevent::Run(Delegate* delegate) {
198   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
199   AutoReset<bool> auto_reset_in_run(&in_run_, true);
200 
201   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
202   // Instead, make our own timer and reuse it on each call to event_base_loop().
203   std::unique_ptr<event> timer_event(new event);
204 
205   for (;;) {
206 #if defined(OS_MACOSX)
207     mac::ScopedNSAutoreleasePool autorelease_pool;
208 #endif
209 
210     bool did_work = delegate->DoWork();
211     if (!keep_running_)
212       break;
213 
214     event_base_loop(event_base_, EVLOOP_NONBLOCK);
215     did_work |= processed_io_events_;
216     processed_io_events_ = false;
217     if (!keep_running_)
218       break;
219 
220     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
221     if (!keep_running_)
222       break;
223 
224     if (did_work)
225       continue;
226 
227     did_work = delegate->DoIdleWork();
228     if (!keep_running_)
229       break;
230 
231     if (did_work)
232       continue;
233 
234     // EVLOOP_ONCE tells libevent to only block once,
235     // but to service all pending events when it wakes up.
236     if (delayed_work_time_.is_null()) {
237       event_base_loop(event_base_, EVLOOP_ONCE);
238     } else {
239       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
240       if (delay > TimeDelta()) {
241         struct timeval poll_tv;
242         poll_tv.tv_sec = delay.InSeconds();
243         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
244         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
245         event_base_set(event_base_, timer_event.get());
246         event_add(timer_event.get(), &poll_tv);
247         event_base_loop(event_base_, EVLOOP_ONCE);
248         event_del(timer_event.get());
249       } else {
250         // It looks like delayed_work_time_ indicates a time in the past, so we
251         // need to call DoDelayedWork now.
252         delayed_work_time_ = TimeTicks();
253       }
254     }
255 
256     if (!keep_running_)
257       break;
258   }
259 }
260 
Quit()261 void MessagePumpLibevent::Quit() {
262   DCHECK(in_run_) << "Quit was called outside of Run!";
263   // Tell both libevent and Run that they should break out of their loops.
264   keep_running_ = false;
265   ScheduleWork();
266 }
267 
ScheduleWork()268 void MessagePumpLibevent::ScheduleWork() {
269   // Tell libevent (in a threadsafe way) that it should break out of its loop.
270   char buf = 0;
271   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
272   DCHECK(nwrite == 1 || errno == EAGAIN)
273       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
274 }
275 
ScheduleDelayedWork(const TimeTicks & delayed_work_time)276 void MessagePumpLibevent::ScheduleDelayedWork(
277     const TimeTicks& delayed_work_time) {
278   // We know that we can't be blocked on Wait right now since this method can
279   // only be called on the same thread as Run, so we only need to update our
280   // record of how long to sleep when we do sleep.
281   delayed_work_time_ = delayed_work_time;
282 }
283 
Init()284 bool MessagePumpLibevent::Init() {
285   int fds[2];
286   if (!CreateLocalNonBlockingPipe(fds)) {
287     DPLOG(ERROR) << "pipe creation failed";
288     return false;
289   }
290   wakeup_pipe_out_ = fds[0];
291   wakeup_pipe_in_ = fds[1];
292 
293   wakeup_event_ = new event;
294   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
295             OnWakeup, this);
296   event_base_set(event_base_, wakeup_event_);
297 
298   if (event_add(wakeup_event_, nullptr))
299     return false;
300   return true;
301 }
302 
303 // static
OnLibeventNotification(int fd,short flags,void * context)304 void MessagePumpLibevent::OnLibeventNotification(int fd,
305                                                  short flags,
306                                                  void* context) {
307   FdWatchController* controller = static_cast<FdWatchController*>(context);
308   DCHECK(controller);
309   TRACE_EVENT2("toplevel", "MessagePumpLibevent::OnLibeventNotification",
310                "src_file", controller->created_from_location().file_name(),
311                "src_func", controller->created_from_location().function_name());
312   TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope(
313       controller->created_from_location().file_name());
314 
315   MessagePumpLibevent* pump = controller->pump();
316   pump->processed_io_events_ = true;
317 
318   if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
319     // Both callbacks will be called. It is necessary to check that |controller|
320     // is not destroyed.
321     bool controller_was_destroyed = false;
322     controller->was_destroyed_ = &controller_was_destroyed;
323     controller->OnFileCanWriteWithoutBlocking(fd, pump);
324     if (!controller_was_destroyed)
325       controller->OnFileCanReadWithoutBlocking(fd, pump);
326     if (!controller_was_destroyed)
327       controller->was_destroyed_ = nullptr;
328   } else if (flags & EV_WRITE) {
329     controller->OnFileCanWriteWithoutBlocking(fd, pump);
330   } else if (flags & EV_READ) {
331     controller->OnFileCanReadWithoutBlocking(fd, pump);
332   }
333 }
334 
335 // Called if a byte is received on the wakeup pipe.
336 // static
OnWakeup(int socket,short flags,void * context)337 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
338   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
339   DCHECK(that->wakeup_pipe_out_ == socket);
340 
341   // Remove and discard the wakeup byte.
342   char buf;
343   int nread = HANDLE_EINTR(read(socket, &buf, 1));
344   DCHECK_EQ(nread, 1);
345   that->processed_io_events_ = true;
346   // Tell libevent to break out of inner loop.
347   event_base_loopbreak(that->event_base_);
348 }
349 
350 }  // namespace base
351