• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 
10 #include "base/auto_reset.h"
11 #include "base/eintr_wrapper.h"
12 #include "base/logging.h"
13 #include "base/mac/scoped_nsautorelease_pool.h"
14 #include "base/memory/scoped_ptr.h"
15 #include "base/observer_list.h"
16 #include "base/time.h"
17 #if defined(USE_SYSTEM_LIBEVENT)
18 #include <event.h>
19 #else
20 #include "third_party/libevent/event.h"
21 #endif
22 
23 // Lifecycle of struct event
24 // Libevent uses two main data structures:
25 // struct event_base (of which there is one per message pump), and
26 // struct event (of which there is roughly one per socket).
27 // The socket's struct event is created in
28 // MessagePumpLibevent::WatchFileDescriptor(),
29 // is owned by the FileDescriptorWatcher, and is destroyed in
30 // StopWatchingFileDescriptor().
31 // It is moved into and out of lists in struct event_base by
32 // the libevent functions event_add() and event_del().
33 //
34 // TODO(dkegel):
35 // At the moment bad things happen if a FileDescriptorWatcher
36 // is active after its MessagePumpLibevent has been destroyed.
37 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
38 // Not clear yet whether that situation occurs in practice,
39 // but if it does, we need to fix it.
40 
41 namespace base {
42 
43 // Return 0 on success
44 // Too small a function to bother putting in a library?
SetNonBlocking(int fd)45 static int SetNonBlocking(int fd) {
46   int flags = fcntl(fd, F_GETFL, 0);
47   if (flags == -1)
48     flags = 0;
49   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
50 }
51 
FileDescriptorWatcher()52 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
53     : is_persistent_(false),
54       event_(NULL),
55       pump_(NULL),
56       watcher_(NULL) {
57 }
58 
~FileDescriptorWatcher()59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
60   if (event_) {
61     StopWatchingFileDescriptor();
62   }
63 }
64 
StopWatchingFileDescriptor()65 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
66   event* e = ReleaseEvent();
67   if (e == NULL)
68     return true;
69 
70   // event_del() is a no-op if the event isn't active.
71   int rv = event_del(e);
72   delete e;
73   pump_ = NULL;
74   watcher_ = NULL;
75   return (rv == 0);
76 }
77 
Init(event * e,bool is_persistent)78 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
79                                                       bool is_persistent) {
80   DCHECK(e);
81   DCHECK(!event_);
82 
83   is_persistent_ = is_persistent;
84   event_ = e;
85 }
86 
ReleaseEvent()87 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
88   struct event *e = event_;
89   event_ = NULL;
90   return e;
91 }
92 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
94     int fd, MessagePumpLibevent* pump) {
95   pump->WillProcessIOEvent();
96   watcher_->OnFileCanReadWithoutBlocking(fd);
97   pump->DidProcessIOEvent();
98 }
99 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
101     int fd, MessagePumpLibevent* pump) {
102   pump->WillProcessIOEvent();
103   watcher_->OnFileCanWriteWithoutBlocking(fd);
104   pump->DidProcessIOEvent();
105 }
106 
MessagePumpLibevent()107 MessagePumpLibevent::MessagePumpLibevent()
108     : keep_running_(true),
109       in_run_(false),
110       event_base_(event_base_new()),
111       wakeup_pipe_in_(-1),
112       wakeup_pipe_out_(-1) {
113   if (!Init())
114      NOTREACHED();
115 }
116 
~MessagePumpLibevent()117 MessagePumpLibevent::~MessagePumpLibevent() {
118   DCHECK(wakeup_event_);
119   DCHECK(event_base_);
120   event_del(wakeup_event_);
121   delete wakeup_event_;
122   if (wakeup_pipe_in_ >= 0) {
123     if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
124       PLOG(ERROR) << "close";
125   }
126   if (wakeup_pipe_out_ >= 0) {
127     if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
128       PLOG(ERROR) << "close";
129   }
130   event_base_free(event_base_);
131 }
132 
WatchFileDescriptor(int fd,bool persistent,Mode mode,FileDescriptorWatcher * controller,Watcher * delegate)133 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
134                                               bool persistent,
135                                               Mode mode,
136                                               FileDescriptorWatcher *controller,
137                                               Watcher *delegate) {
138   DCHECK_GE(fd, 0);
139   DCHECK(controller);
140   DCHECK(delegate);
141   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
142 
143   int event_mask = persistent ? EV_PERSIST : 0;
144   if ((mode & WATCH_READ) != 0) {
145     event_mask |= EV_READ;
146   }
147   if ((mode & WATCH_WRITE) != 0) {
148     event_mask |= EV_WRITE;
149   }
150 
151   scoped_ptr<event> evt(controller->ReleaseEvent());
152   if (evt.get() == NULL) {
153     // Ownership is transferred to the controller.
154     evt.reset(new event);
155   } else {
156     // Make sure we don't pick up any funky internal libevent masks.
157     int old_interest_mask = evt.get()->ev_events &
158         (EV_READ | EV_WRITE | EV_PERSIST);
159 
160     // Combine old/new event masks.
161     event_mask |= old_interest_mask;
162 
163     // Must disarm the event before we can reuse it.
164     event_del(evt.get());
165 
166     // It's illegal to use this function to listen on 2 separate fds with the
167     // same |controller|.
168     if (EVENT_FD(evt.get()) != fd) {
169       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
170       return false;
171     }
172   }
173 
174   // Set current interest mask and message pump for this event.
175   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
176 
177   // Tell libevent which message pump this socket will belong to when we add it.
178   if (event_base_set(event_base_, evt.get()) != 0) {
179     return false;
180   }
181 
182   // Add this socket to the list of monitored sockets.
183   if (event_add(evt.get(), NULL) != 0) {
184     return false;
185   }
186 
187   // Transfer ownership of evt to controller.
188   controller->Init(evt.release(), persistent);
189 
190   controller->set_watcher(delegate);
191   controller->set_pump(this);
192 
193   return true;
194 }
195 
AddIOObserver(IOObserver * obs)196 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
197   io_observers_.AddObserver(obs);
198 }
199 
RemoveIOObserver(IOObserver * obs)200 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
201   io_observers_.RemoveObserver(obs);
202 }
203 
204 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)205 static void timer_callback(int fd, short events, void *context)
206 {
207   event_base_loopbreak((struct event_base *)context);
208 }
209 
210 // Reentrant!
Run(Delegate * delegate)211 void MessagePumpLibevent::Run(Delegate* delegate) {
212   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
213   AutoReset<bool> auto_reset_in_run(&in_run_, true);
214 
215   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
216   // Instead, make our own timer and reuse it on each call to event_base_loop().
217   scoped_ptr<event> timer_event(new event);
218 
219   for (;;) {
220     mac::ScopedNSAutoreleasePool autorelease_pool;
221 
222     bool did_work = delegate->DoWork();
223     if (!keep_running_)
224       break;
225 
226     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
227     if (!keep_running_)
228       break;
229 
230     if (did_work)
231       continue;
232 
233     did_work = delegate->DoIdleWork();
234     if (!keep_running_)
235       break;
236 
237     if (did_work)
238       continue;
239 
240     // EVLOOP_ONCE tells libevent to only block once,
241     // but to service all pending events when it wakes up.
242     if (delayed_work_time_.is_null()) {
243       event_base_loop(event_base_, EVLOOP_ONCE);
244     } else {
245       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
246       if (delay > TimeDelta()) {
247         struct timeval poll_tv;
248         poll_tv.tv_sec = delay.InSeconds();
249         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
250         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
251         event_base_set(event_base_, timer_event.get());
252         event_add(timer_event.get(), &poll_tv);
253         event_base_loop(event_base_, EVLOOP_ONCE);
254         event_del(timer_event.get());
255       } else {
256         // It looks like delayed_work_time_ indicates a time in the past, so we
257         // need to call DoDelayedWork now.
258         delayed_work_time_ = TimeTicks();
259       }
260     }
261   }
262 
263   keep_running_ = true;
264 }
265 
Quit()266 void MessagePumpLibevent::Quit() {
267   DCHECK(in_run_);
268   // Tell both libevent and Run that they should break out of their loops.
269   keep_running_ = false;
270   ScheduleWork();
271 }
272 
ScheduleWork()273 void MessagePumpLibevent::ScheduleWork() {
274   // Tell libevent (in a threadsafe way) that it should break out of its loop.
275   char buf = 0;
276   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
277   DCHECK(nwrite == 1 || errno == EAGAIN)
278       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
279 }
280 
ScheduleDelayedWork(const TimeTicks & delayed_work_time)281 void MessagePumpLibevent::ScheduleDelayedWork(
282     const TimeTicks& delayed_work_time) {
283   // We know that we can't be blocked on Wait right now since this method can
284   // only be called on the same thread as Run, so we only need to update our
285   // record of how long to sleep when we do sleep.
286   delayed_work_time_ = delayed_work_time;
287 }
288 
WillProcessIOEvent()289 void MessagePumpLibevent::WillProcessIOEvent() {
290   FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
291 }
292 
DidProcessIOEvent()293 void MessagePumpLibevent::DidProcessIOEvent() {
294   FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
295 }
296 
Init()297 bool MessagePumpLibevent::Init() {
298   int fds[2];
299   if (pipe(fds)) {
300     DLOG(ERROR) << "pipe() failed, errno: " << errno;
301     return false;
302   }
303   if (SetNonBlocking(fds[0])) {
304     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
305     return false;
306   }
307   if (SetNonBlocking(fds[1])) {
308     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
309     return false;
310   }
311   wakeup_pipe_out_ = fds[0];
312   wakeup_pipe_in_ = fds[1];
313 
314   wakeup_event_ = new event;
315   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
316             OnWakeup, this);
317   event_base_set(event_base_, wakeup_event_);
318 
319   if (event_add(wakeup_event_, 0))
320     return false;
321   return true;
322 }
323 
324 // static
OnLibeventNotification(int fd,short flags,void * context)325 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
326                                                  void* context) {
327   FileDescriptorWatcher* controller =
328       static_cast<FileDescriptorWatcher*>(context);
329 
330   MessagePumpLibevent* pump = controller->pump();
331 
332   if (flags & EV_WRITE) {
333     controller->OnFileCanWriteWithoutBlocking(fd, pump);
334   }
335   if (flags & EV_READ) {
336     controller->OnFileCanReadWithoutBlocking(fd, pump);
337   }
338 }
339 
340 // Called if a byte is received on the wakeup pipe.
341 // static
OnWakeup(int socket,short flags,void * context)342 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
343   base::MessagePumpLibevent* that =
344               static_cast<base::MessagePumpLibevent*>(context);
345   DCHECK(that->wakeup_pipe_out_ == socket);
346 
347   // Remove and discard the wakeup byte.
348   char buf;
349   int nread = HANDLE_EINTR(read(socket, &buf, 1));
350   DCHECK_EQ(nread, 1);
351   // Tell libevent to break out of inner loop.
352   event_base_loopbreak(that->event_base_);
353 }
354 
355 }  // namespace base
356