• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <unistd.h>
9 
10 #include <memory>
11 
12 #include "base/auto_reset.h"
13 #include "base/compiler_specific.h"
14 #include "base/files/file_util.h"
15 #include "base/logging.h"
16 #include "base/observer_list.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/third_party/libevent/event.h"
19 #include "base/time/time.h"
20 #include "base/trace_event/trace_event.h"
21 #include "build/build_config.h"
22 
23 #if defined(OS_MACOSX)
24 #include "base/mac/scoped_nsautorelease_pool.h"
25 #endif
26 
27 // Lifecycle of struct event
28 // Libevent uses two main data structures:
29 // struct event_base (of which there is one per message pump), and
30 // struct event (of which there is roughly one per socket).
31 // The socket's struct event is created in
32 // MessagePumpLibevent::WatchFileDescriptor(),
33 // is owned by the FileDescriptorWatcher, and is destroyed in
34 // StopWatchingFileDescriptor().
35 // It is moved into and out of lists in struct event_base by
36 // the libevent functions event_add() and event_del().
37 //
38 // TODO(dkegel):
39 // At the moment bad things happen if a FileDescriptorWatcher
40 // is active after its MessagePumpLibevent has been destroyed.
41 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
42 // Not clear yet whether that situation occurs in practice,
43 // but if it does, we need to fix it.
44 
45 namespace base {
46 
FileDescriptorWatcher()47 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
48     : event_(NULL),
49       pump_(NULL),
50       watcher_(NULL),
51       was_destroyed_(NULL) {
52 }
53 
~FileDescriptorWatcher()54 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
55   if (event_) {
56     StopWatchingFileDescriptor();
57   }
58   if (was_destroyed_) {
59     DCHECK(!*was_destroyed_);
60     *was_destroyed_ = true;
61   }
62 }
63 
StopWatchingFileDescriptor()64 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
65   event* e = ReleaseEvent();
66   if (e == NULL)
67     return true;
68 
69   // event_del() is a no-op if the event isn't active.
70   int rv = event_del(e);
71   delete e;
72   pump_ = NULL;
73   watcher_ = NULL;
74   return (rv == 0);
75 }
76 
Init(event * e)77 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
78   DCHECK(e);
79   DCHECK(!event_);
80 
81   event_ = e;
82 }
83 
ReleaseEvent()84 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
85   struct event *e = event_;
86   event_ = NULL;
87   return e;
88 }
89 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent *)90 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
91     int fd,
92     MessagePumpLibevent*) {
93   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
94   // watching the file descriptor.
95   if (!watcher_)
96     return;
97   watcher_->OnFileCanReadWithoutBlocking(fd);
98 }
99 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent *)100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
101     int fd,
102     MessagePumpLibevent*) {
103   DCHECK(watcher_);
104   watcher_->OnFileCanWriteWithoutBlocking(fd);
105 }
106 
MessagePumpLibevent()107 MessagePumpLibevent::MessagePumpLibevent()
108     : keep_running_(true),
109       in_run_(false),
110       processed_io_events_(false),
111       event_base_(event_base_new()),
112       wakeup_pipe_in_(-1),
113       wakeup_pipe_out_(-1) {
114   if (!Init())
115      NOTREACHED();
116 }
117 
~MessagePumpLibevent()118 MessagePumpLibevent::~MessagePumpLibevent() {
119   DCHECK(wakeup_event_);
120   DCHECK(event_base_);
121   event_del(wakeup_event_);
122   delete wakeup_event_;
123   if (wakeup_pipe_in_ >= 0) {
124     if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
125       DPLOG(ERROR) << "close";
126   }
127   if (wakeup_pipe_out_ >= 0) {
128     if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
129       DPLOG(ERROR) << "close";
130   }
131   event_base_free(event_base_);
132 }
133 
WatchFileDescriptor(int fd,bool persistent,int mode,FileDescriptorWatcher * controller,Watcher * delegate)134 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
135                                               bool persistent,
136                                               int mode,
137                                               FileDescriptorWatcher *controller,
138                                               Watcher *delegate) {
139   DCHECK_GE(fd, 0);
140   DCHECK(controller);
141   DCHECK(delegate);
142   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
143   // WatchFileDescriptor should be called on the pump thread. It is not
144   // threadsafe, and your watcher may never be registered.
145   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
146 
147   int event_mask = persistent ? EV_PERSIST : 0;
148   if (mode & WATCH_READ) {
149     event_mask |= EV_READ;
150   }
151   if (mode & WATCH_WRITE) {
152     event_mask |= EV_WRITE;
153   }
154 
155   std::unique_ptr<event> evt(controller->ReleaseEvent());
156   if (evt.get() == NULL) {
157     // Ownership is transferred to the controller.
158     evt.reset(new event);
159   } else {
160     // Make sure we don't pick up any funky internal libevent masks.
161     int old_interest_mask = evt.get()->ev_events &
162         (EV_READ | EV_WRITE | EV_PERSIST);
163 
164     // Combine old/new event masks.
165     event_mask |= old_interest_mask;
166 
167     // Must disarm the event before we can reuse it.
168     event_del(evt.get());
169 
170     // It's illegal to use this function to listen on 2 separate fds with the
171     // same |controller|.
172     if (EVENT_FD(evt.get()) != fd) {
173       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
174       return false;
175     }
176   }
177 
178   // Set current interest mask and message pump for this event.
179   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
180 
181   // Tell libevent which message pump this socket will belong to when we add it.
182   if (event_base_set(event_base_, evt.get())) {
183     return false;
184   }
185 
186   // Add this socket to the list of monitored sockets.
187   if (event_add(evt.get(), NULL)) {
188     return false;
189   }
190 
191   // Transfer ownership of evt to controller.
192   controller->Init(evt.release());
193 
194   controller->set_watcher(delegate);
195   controller->set_pump(this);
196 
197   return true;
198 }
199 
200 // Tell libevent to break out of inner loop.
timer_callback(int,short,void * context)201 static void timer_callback(int /*fd*/, short /*events*/, void* context) {
202   event_base_loopbreak((struct event_base *)context);
203 }
204 
205 // Reentrant!
Run(Delegate * delegate)206 void MessagePumpLibevent::Run(Delegate* delegate) {
207   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
208   AutoReset<bool> auto_reset_in_run(&in_run_, true);
209 
210   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
211   // Instead, make our own timer and reuse it on each call to event_base_loop().
212   std::unique_ptr<event> timer_event(new event);
213 
214   for (;;) {
215 #if defined(OS_MACOSX)
216     mac::ScopedNSAutoreleasePool autorelease_pool;
217 #endif
218 
219     bool did_work = delegate->DoWork();
220     if (!keep_running_)
221       break;
222 
223     event_base_loop(event_base_, EVLOOP_NONBLOCK);
224     did_work |= processed_io_events_;
225     processed_io_events_ = false;
226     if (!keep_running_)
227       break;
228 
229     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
230     if (!keep_running_)
231       break;
232 
233     if (did_work)
234       continue;
235 
236     did_work = delegate->DoIdleWork();
237     if (!keep_running_)
238       break;
239 
240     if (did_work)
241       continue;
242 
243     // EVLOOP_ONCE tells libevent to only block once,
244     // but to service all pending events when it wakes up.
245     if (delayed_work_time_.is_null()) {
246       event_base_loop(event_base_, EVLOOP_ONCE);
247     } else {
248       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
249       if (delay > TimeDelta()) {
250         struct timeval poll_tv;
251         poll_tv.tv_sec = delay.InSeconds();
252         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
253         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
254         event_base_set(event_base_, timer_event.get());
255         event_add(timer_event.get(), &poll_tv);
256         event_base_loop(event_base_, EVLOOP_ONCE);
257         event_del(timer_event.get());
258       } else {
259         // It looks like delayed_work_time_ indicates a time in the past, so we
260         // need to call DoDelayedWork now.
261         delayed_work_time_ = TimeTicks();
262       }
263     }
264 
265     if (!keep_running_)
266       break;
267   }
268 }
269 
Quit()270 void MessagePumpLibevent::Quit() {
271   DCHECK(in_run_) << "Quit was called outside of Run!";
272   // Tell both libevent and Run that they should break out of their loops.
273   keep_running_ = false;
274   ScheduleWork();
275 }
276 
ScheduleWork()277 void MessagePumpLibevent::ScheduleWork() {
278   // Tell libevent (in a threadsafe way) that it should break out of its loop.
279   char buf = 0;
280   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
281   DCHECK(nwrite == 1 || errno == EAGAIN)
282       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
283 }
284 
ScheduleDelayedWork(const TimeTicks & delayed_work_time)285 void MessagePumpLibevent::ScheduleDelayedWork(
286     const TimeTicks& delayed_work_time) {
287   // We know that we can't be blocked on Wait right now since this method can
288   // only be called on the same thread as Run, so we only need to update our
289   // record of how long to sleep when we do sleep.
290   delayed_work_time_ = delayed_work_time;
291 }
292 
Init()293 bool MessagePumpLibevent::Init() {
294   int fds[2];
295   if (pipe(fds)) {
296     DLOG(ERROR) << "pipe() failed, errno: " << errno;
297     return false;
298   }
299   if (!SetNonBlocking(fds[0])) {
300     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
301     return false;
302   }
303   if (!SetNonBlocking(fds[1])) {
304     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
305     return false;
306   }
307   wakeup_pipe_out_ = fds[0];
308   wakeup_pipe_in_ = fds[1];
309 
310   wakeup_event_ = new event;
311   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
312             OnWakeup, this);
313   event_base_set(event_base_, wakeup_event_);
314 
315   if (event_add(wakeup_event_, 0))
316     return false;
317   return true;
318 }
319 
320 // static
OnLibeventNotification(int fd,short flags,void * context)321 void MessagePumpLibevent::OnLibeventNotification(int fd,
322                                                  short flags,
323                                                  void* context) {
324   FileDescriptorWatcher* controller =
325       static_cast<FileDescriptorWatcher*>(context);
326   DCHECK(controller);
327   TRACE_EVENT1("toplevel", "MessagePumpLibevent::OnLibeventNotification",
328                "fd", fd);
329 
330   MessagePumpLibevent* pump = controller->pump();
331   pump->processed_io_events_ = true;
332 
333   if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
334     // Both callbacks will be called. It is necessary to check that |controller|
335     // is not destroyed.
336     bool controller_was_destroyed = false;
337     controller->was_destroyed_ = &controller_was_destroyed;
338     controller->OnFileCanWriteWithoutBlocking(fd, pump);
339     if (!controller_was_destroyed)
340       controller->OnFileCanReadWithoutBlocking(fd, pump);
341     if (!controller_was_destroyed)
342       controller->was_destroyed_ = nullptr;
343   } else if (flags & EV_WRITE) {
344     controller->OnFileCanWriteWithoutBlocking(fd, pump);
345   } else if (flags & EV_READ) {
346     controller->OnFileCanReadWithoutBlocking(fd, pump);
347   }
348 }
349 
350 // Called if a byte is received on the wakeup pipe.
351 // static
OnWakeup(int socket,short,void * context)352 void MessagePumpLibevent::OnWakeup(int socket, short /*flags*/, void* context) {
353   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
354   DCHECK(that->wakeup_pipe_out_ == socket);
355 
356   // Remove and discard the wakeup byte.
357   char buf;
358   int nread = HANDLE_EINTR(read(socket, &buf, 1));
359   DCHECK_EQ(nread, 1);
360   that->processed_io_events_ = true;
361   // Tell libevent to break out of inner loop.
362   event_base_loopbreak(that->event_base_);
363 }
364 
365 }  // namespace base
366