• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <unistd.h>
10 
11 #include "base/auto_reset.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/memory/scoped_ptr.h"
15 #include "base/observer_list.h"
16 #include "base/posix/eintr_wrapper.h"
17 #include "base/time/time.h"
18 #include "third_party/libevent/event.h"
19 
20 #if defined(OS_MACOSX)
21 #include "base/mac/scoped_nsautorelease_pool.h"
22 #endif
23 
24 // Lifecycle of struct event
25 // Libevent uses two main data structures:
26 // struct event_base (of which there is one per message pump), and
27 // struct event (of which there is roughly one per socket).
28 // The socket's struct event is created in
29 // MessagePumpLibevent::WatchFileDescriptor(),
30 // is owned by the FileDescriptorWatcher, and is destroyed in
31 // StopWatchingFileDescriptor().
32 // It is moved into and out of lists in struct event_base by
33 // the libevent functions event_add() and event_del().
34 //
35 // TODO(dkegel):
36 // At the moment bad things happen if a FileDescriptorWatcher
37 // is active after its MessagePumpLibevent has been destroyed.
38 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
39 // Not clear yet whether that situation occurs in practice,
40 // but if it does, we need to fix it.
41 
42 namespace base {
43 
44 // Return 0 on success
45 // Too small a function to bother putting in a library?
SetNonBlocking(int fd)46 static int SetNonBlocking(int fd) {
47   int flags = fcntl(fd, F_GETFL, 0);
48   if (flags == -1)
49     flags = 0;
50   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
51 }
52 
FileDescriptorWatcher()53 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
54     : event_(NULL),
55       pump_(NULL),
56       watcher_(NULL),
57       weak_factory_(this) {
58 }
59 
~FileDescriptorWatcher()60 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
61   if (event_) {
62     StopWatchingFileDescriptor();
63   }
64 }
65 
StopWatchingFileDescriptor()66 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
67   event* e = ReleaseEvent();
68   if (e == NULL)
69     return true;
70 
71   // event_del() is a no-op if the event isn't active.
72   int rv = event_del(e);
73   delete e;
74   pump_ = NULL;
75   watcher_ = NULL;
76   return (rv == 0);
77 }
78 
Init(event * e)79 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
80   DCHECK(e);
81   DCHECK(!event_);
82 
83   event_ = e;
84 }
85 
ReleaseEvent()86 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
87   struct event *e = event_;
88   event_ = NULL;
89   return e;
90 }
91 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)92 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
93     int fd, MessagePumpLibevent* pump) {
94   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
95   // watching the file descriptor.
96   if (!watcher_)
97     return;
98   pump->WillProcessIOEvent();
99   watcher_->OnFileCanReadWithoutBlocking(fd);
100   pump->DidProcessIOEvent();
101 }
102 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)103 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
104     int fd, MessagePumpLibevent* pump) {
105   DCHECK(watcher_);
106   pump->WillProcessIOEvent();
107   watcher_->OnFileCanWriteWithoutBlocking(fd);
108   pump->DidProcessIOEvent();
109 }
110 
MessagePumpLibevent()111 MessagePumpLibevent::MessagePumpLibevent()
112     : keep_running_(true),
113       in_run_(false),
114       processed_io_events_(false),
115       event_base_(event_base_new()),
116       wakeup_pipe_in_(-1),
117       wakeup_pipe_out_(-1) {
118   if (!Init())
119      NOTREACHED();
120 }
121 
~MessagePumpLibevent()122 MessagePumpLibevent::~MessagePumpLibevent() {
123   DCHECK(wakeup_event_);
124   DCHECK(event_base_);
125   event_del(wakeup_event_);
126   delete wakeup_event_;
127   if (wakeup_pipe_in_ >= 0) {
128     if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
129       DPLOG(ERROR) << "close";
130   }
131   if (wakeup_pipe_out_ >= 0) {
132     if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
133       DPLOG(ERROR) << "close";
134   }
135   event_base_free(event_base_);
136 }
137 
WatchFileDescriptor(int fd,bool persistent,int mode,FileDescriptorWatcher * controller,Watcher * delegate)138 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
139                                               bool persistent,
140                                               int mode,
141                                               FileDescriptorWatcher *controller,
142                                               Watcher *delegate) {
143   DCHECK_GE(fd, 0);
144   DCHECK(controller);
145   DCHECK(delegate);
146   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
147   // WatchFileDescriptor should be called on the pump thread. It is not
148   // threadsafe, and your watcher may never be registered.
149   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
150 
151   int event_mask = persistent ? EV_PERSIST : 0;
152   if (mode & WATCH_READ) {
153     event_mask |= EV_READ;
154   }
155   if (mode & WATCH_WRITE) {
156     event_mask |= EV_WRITE;
157   }
158 
159   scoped_ptr<event> evt(controller->ReleaseEvent());
160   if (evt.get() == NULL) {
161     // Ownership is transferred to the controller.
162     evt.reset(new event);
163   } else {
164     // Make sure we don't pick up any funky internal libevent masks.
165     int old_interest_mask = evt.get()->ev_events &
166         (EV_READ | EV_WRITE | EV_PERSIST);
167 
168     // Combine old/new event masks.
169     event_mask |= old_interest_mask;
170 
171     // Must disarm the event before we can reuse it.
172     event_del(evt.get());
173 
174     // It's illegal to use this function to listen on 2 separate fds with the
175     // same |controller|.
176     if (EVENT_FD(evt.get()) != fd) {
177       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
178       return false;
179     }
180   }
181 
182   // Set current interest mask and message pump for this event.
183   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
184 
185   // Tell libevent which message pump this socket will belong to when we add it.
186   if (event_base_set(event_base_, evt.get())) {
187     return false;
188   }
189 
190   // Add this socket to the list of monitored sockets.
191   if (event_add(evt.get(), NULL)) {
192     return false;
193   }
194 
195   // Transfer ownership of evt to controller.
196   controller->Init(evt.release());
197 
198   controller->set_watcher(delegate);
199   controller->set_pump(this);
200 
201   return true;
202 }
203 
AddIOObserver(IOObserver * obs)204 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
205   io_observers_.AddObserver(obs);
206 }
207 
RemoveIOObserver(IOObserver * obs)208 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
209   io_observers_.RemoveObserver(obs);
210 }
211 
212 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)213 static void timer_callback(int fd, short events, void *context)
214 {
215   event_base_loopbreak((struct event_base *)context);
216 }
217 
218 // Reentrant!
Run(Delegate * delegate)219 void MessagePumpLibevent::Run(Delegate* delegate) {
220   DCHECK(keep_running_) << "Quit must have been called outside of Run!";
221   AutoReset<bool> auto_reset_in_run(&in_run_, true);
222 
223   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
224   // Instead, make our own timer and reuse it on each call to event_base_loop().
225   scoped_ptr<event> timer_event(new event);
226 
227   for (;;) {
228 #if defined(OS_MACOSX)
229     mac::ScopedNSAutoreleasePool autorelease_pool;
230 #endif
231 
232     bool did_work = delegate->DoWork();
233     if (!keep_running_)
234       break;
235 
236     event_base_loop(event_base_, EVLOOP_NONBLOCK);
237     did_work |= processed_io_events_;
238     processed_io_events_ = false;
239     if (!keep_running_)
240       break;
241 
242     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
243     if (!keep_running_)
244       break;
245 
246     if (did_work)
247       continue;
248 
249     did_work = delegate->DoIdleWork();
250     if (!keep_running_)
251       break;
252 
253     if (did_work)
254       continue;
255 
256     // EVLOOP_ONCE tells libevent to only block once,
257     // but to service all pending events when it wakes up.
258     if (delayed_work_time_.is_null()) {
259       event_base_loop(event_base_, EVLOOP_ONCE);
260     } else {
261       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
262       if (delay > TimeDelta()) {
263         struct timeval poll_tv;
264         poll_tv.tv_sec = delay.InSeconds();
265         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
266         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
267         event_base_set(event_base_, timer_event.get());
268         event_add(timer_event.get(), &poll_tv);
269         event_base_loop(event_base_, EVLOOP_ONCE);
270         event_del(timer_event.get());
271       } else {
272         // It looks like delayed_work_time_ indicates a time in the past, so we
273         // need to call DoDelayedWork now.
274         delayed_work_time_ = TimeTicks();
275       }
276     }
277   }
278 
279   keep_running_ = true;
280 }
281 
Quit()282 void MessagePumpLibevent::Quit() {
283   DCHECK(in_run_);
284   // Tell both libevent and Run that they should break out of their loops.
285   keep_running_ = false;
286   ScheduleWork();
287 }
288 
ScheduleWork()289 void MessagePumpLibevent::ScheduleWork() {
290   // Tell libevent (in a threadsafe way) that it should break out of its loop.
291   char buf = 0;
292   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
293   DCHECK(nwrite == 1 || errno == EAGAIN)
294       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
295 }
296 
ScheduleDelayedWork(const TimeTicks & delayed_work_time)297 void MessagePumpLibevent::ScheduleDelayedWork(
298     const TimeTicks& delayed_work_time) {
299   // We know that we can't be blocked on Wait right now since this method can
300   // only be called on the same thread as Run, so we only need to update our
301   // record of how long to sleep when we do sleep.
302   delayed_work_time_ = delayed_work_time;
303 }
304 
WillProcessIOEvent()305 void MessagePumpLibevent::WillProcessIOEvent() {
306   FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
307 }
308 
DidProcessIOEvent()309 void MessagePumpLibevent::DidProcessIOEvent() {
310   FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
311 }
312 
Init()313 bool MessagePumpLibevent::Init() {
314   int fds[2];
315   if (pipe(fds)) {
316     DLOG(ERROR) << "pipe() failed, errno: " << errno;
317     return false;
318   }
319   if (SetNonBlocking(fds[0])) {
320     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
321     return false;
322   }
323   if (SetNonBlocking(fds[1])) {
324     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
325     return false;
326   }
327   wakeup_pipe_out_ = fds[0];
328   wakeup_pipe_in_ = fds[1];
329 
330   wakeup_event_ = new event;
331   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
332             OnWakeup, this);
333   event_base_set(event_base_, wakeup_event_);
334 
335   if (event_add(wakeup_event_, 0))
336     return false;
337   return true;
338 }
339 
340 // static
OnLibeventNotification(int fd,short flags,void * context)341 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
342                                                  void* context) {
343   WeakPtr<FileDescriptorWatcher> controller =
344       static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
345   DCHECK(controller.get());
346 
347   MessagePumpLibevent* pump = controller->pump();
348   pump->processed_io_events_ = true;
349 
350   if (flags & EV_WRITE) {
351     controller->OnFileCanWriteWithoutBlocking(fd, pump);
352   }
353   // Check |controller| in case it's been deleted in
354   // controller->OnFileCanWriteWithoutBlocking().
355   if (controller.get() && flags & EV_READ) {
356     controller->OnFileCanReadWithoutBlocking(fd, pump);
357   }
358 }
359 
360 // Called if a byte is received on the wakeup pipe.
361 // static
OnWakeup(int socket,short flags,void * context)362 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
363   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
364   DCHECK(that->wakeup_pipe_out_ == socket);
365 
366   // Remove and discard the wakeup byte.
367   char buf;
368   int nread = HANDLE_EINTR(read(socket, &buf, 1));
369   DCHECK_EQ(nread, 1);
370   that->processed_io_events_ = true;
371   // Tell libevent to break out of inner loop.
372   event_base_loopbreak(that->event_base_);
373 }
374 
375 }  // namespace base
376