• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <unistd.h>
10 
11 #include "base/auto_reset.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/memory/scoped_ptr.h"
15 #include "base/observer_list.h"
16 #include "base/posix/eintr_wrapper.h"
17 #include "base/time/time.h"
18 #include "third_party/libevent/event.h"
19 
20 #if defined(OS_MACOSX)
21 #include "base/mac/scoped_nsautorelease_pool.h"
22 #endif
23 
24 // Lifecycle of struct event
25 // Libevent uses two main data structures:
26 // struct event_base (of which there is one per message pump), and
27 // struct event (of which there is roughly one per socket).
28 // The socket's struct event is created in
29 // MessagePumpLibevent::WatchFileDescriptor(),
30 // is owned by the FileDescriptorWatcher, and is destroyed in
31 // StopWatchingFileDescriptor().
32 // It is moved into and out of lists in struct event_base by
33 // the libevent functions event_add() and event_del().
34 //
35 // TODO(dkegel):
36 // At the moment bad things happen if a FileDescriptorWatcher
37 // is active after its MessagePumpLibevent has been destroyed.
38 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
39 // Not clear yet whether that situation occurs in practice,
40 // but if it does, we need to fix it.
41 
42 namespace base {
43 
44 // Return 0 on success
45 // Too small a function to bother putting in a library?
SetNonBlocking(int fd)46 static int SetNonBlocking(int fd) {
47   int flags = fcntl(fd, F_GETFL, 0);
48   if (flags == -1)
49     flags = 0;
50   return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
51 }
52 
FileDescriptorWatcher()53 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
54     : event_(NULL),
55       pump_(NULL),
56       watcher_(NULL),
57       weak_factory_(this) {
58 }
59 
~FileDescriptorWatcher()60 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
61   if (event_) {
62     StopWatchingFileDescriptor();
63   }
64 }
65 
StopWatchingFileDescriptor()66 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
67   event* e = ReleaseEvent();
68   if (e == NULL)
69     return true;
70 
71   // event_del() is a no-op if the event isn't active.
72   int rv = event_del(e);
73   delete e;
74   pump_ = NULL;
75   watcher_ = NULL;
76   return (rv == 0);
77 }
78 
Init(event * e)79 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
80   DCHECK(e);
81   DCHECK(!event_);
82 
83   event_ = e;
84 }
85 
ReleaseEvent()86 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
87   struct event *e = event_;
88   event_ = NULL;
89   return e;
90 }
91 
OnFileCanReadWithoutBlocking(int fd,MessagePumpLibevent * pump)92 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
93     int fd, MessagePumpLibevent* pump) {
94   // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
95   // watching the file descriptor.
96   if (!watcher_)
97     return;
98   pump->WillProcessIOEvent();
99   watcher_->OnFileCanReadWithoutBlocking(fd);
100   pump->DidProcessIOEvent();
101 }
102 
OnFileCanWriteWithoutBlocking(int fd,MessagePumpLibevent * pump)103 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
104     int fd, MessagePumpLibevent* pump) {
105   DCHECK(watcher_);
106   pump->WillProcessIOEvent();
107   watcher_->OnFileCanWriteWithoutBlocking(fd);
108   pump->DidProcessIOEvent();
109 }
110 
MessagePumpLibevent()111 MessagePumpLibevent::MessagePumpLibevent()
112     : keep_running_(true),
113       in_run_(false),
114       processed_io_events_(false),
115       event_base_(event_base_new()),
116       wakeup_pipe_in_(-1),
117       wakeup_pipe_out_(-1) {
118   if (!Init())
119      NOTREACHED();
120 }
121 
~MessagePumpLibevent()122 MessagePumpLibevent::~MessagePumpLibevent() {
123   DCHECK(wakeup_event_);
124   DCHECK(event_base_);
125   event_del(wakeup_event_);
126   delete wakeup_event_;
127   if (wakeup_pipe_in_ >= 0) {
128     if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0)
129       DPLOG(ERROR) << "close";
130   }
131   if (wakeup_pipe_out_ >= 0) {
132     if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0)
133       DPLOG(ERROR) << "close";
134   }
135   event_base_free(event_base_);
136 }
137 
WatchFileDescriptor(int fd,bool persistent,int mode,FileDescriptorWatcher * controller,Watcher * delegate)138 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
139                                               bool persistent,
140                                               int mode,
141                                               FileDescriptorWatcher *controller,
142                                               Watcher *delegate) {
143   DCHECK_GE(fd, 0);
144   DCHECK(controller);
145   DCHECK(delegate);
146   DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
147   // WatchFileDescriptor should be called on the pump thread. It is not
148   // threadsafe, and your watcher may never be registered.
149   DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
150 
151   int event_mask = persistent ? EV_PERSIST : 0;
152   if (mode & WATCH_READ) {
153     event_mask |= EV_READ;
154   }
155   if (mode & WATCH_WRITE) {
156     event_mask |= EV_WRITE;
157   }
158 
159   scoped_ptr<event> evt(controller->ReleaseEvent());
160   if (evt.get() == NULL) {
161     // Ownership is transferred to the controller.
162     evt.reset(new event);
163   } else {
164     // Make sure we don't pick up any funky internal libevent masks.
165     int old_interest_mask = evt.get()->ev_events &
166         (EV_READ | EV_WRITE | EV_PERSIST);
167 
168     // Combine old/new event masks.
169     event_mask |= old_interest_mask;
170 
171     // Must disarm the event before we can reuse it.
172     event_del(evt.get());
173 
174     // It's illegal to use this function to listen on 2 separate fds with the
175     // same |controller|.
176     if (EVENT_FD(evt.get()) != fd) {
177       NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
178       return false;
179     }
180   }
181 
182   // Set current interest mask and message pump for this event.
183   event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
184 
185   // Tell libevent which message pump this socket will belong to when we add it.
186   if (event_base_set(event_base_, evt.get())) {
187     return false;
188   }
189 
190   // Add this socket to the list of monitored sockets.
191   if (event_add(evt.get(), NULL)) {
192     return false;
193   }
194 
195   // Transfer ownership of evt to controller.
196   controller->Init(evt.release());
197 
198   controller->set_watcher(delegate);
199   controller->set_pump(this);
200 
201   return true;
202 }
203 
AddIOObserver(IOObserver * obs)204 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
205   io_observers_.AddObserver(obs);
206 }
207 
RemoveIOObserver(IOObserver * obs)208 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
209   io_observers_.RemoveObserver(obs);
210 }
211 
212 // Tell libevent to break out of inner loop.
timer_callback(int fd,short events,void * context)213 static void timer_callback(int fd, short events, void *context)
214 {
215   event_base_loopbreak((struct event_base *)context);
216 }
217 
218 // Reentrant!
Run(Delegate * delegate)219 void MessagePumpLibevent::Run(Delegate* delegate) {
220   AutoReset<bool> auto_reset_keep_running(&keep_running_, true);
221   AutoReset<bool> auto_reset_in_run(&in_run_, true);
222 
223   // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
224   // Instead, make our own timer and reuse it on each call to event_base_loop().
225   scoped_ptr<event> timer_event(new event);
226 
227   for (;;) {
228 #if defined(OS_MACOSX)
229     mac::ScopedNSAutoreleasePool autorelease_pool;
230 #endif
231 
232     bool did_work = delegate->DoWork();
233     if (!keep_running_)
234       break;
235 
236     event_base_loop(event_base_, EVLOOP_NONBLOCK);
237     did_work |= processed_io_events_;
238     processed_io_events_ = false;
239     if (!keep_running_)
240       break;
241 
242     did_work |= delegate->DoDelayedWork(&delayed_work_time_);
243     if (!keep_running_)
244       break;
245 
246     if (did_work)
247       continue;
248 
249     did_work = delegate->DoIdleWork();
250     if (!keep_running_)
251       break;
252 
253     if (did_work)
254       continue;
255 
256     // EVLOOP_ONCE tells libevent to only block once,
257     // but to service all pending events when it wakes up.
258     if (delayed_work_time_.is_null()) {
259       event_base_loop(event_base_, EVLOOP_ONCE);
260     } else {
261       TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
262       if (delay > TimeDelta()) {
263         struct timeval poll_tv;
264         poll_tv.tv_sec = delay.InSeconds();
265         poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
266         event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
267         event_base_set(event_base_, timer_event.get());
268         event_add(timer_event.get(), &poll_tv);
269         event_base_loop(event_base_, EVLOOP_ONCE);
270         event_del(timer_event.get());
271       } else {
272         // It looks like delayed_work_time_ indicates a time in the past, so we
273         // need to call DoDelayedWork now.
274         delayed_work_time_ = TimeTicks();
275       }
276     }
277   }
278 }
279 
Quit()280 void MessagePumpLibevent::Quit() {
281   DCHECK(in_run_) << "Quit was called outside of Run!";
282   // Tell both libevent and Run that they should break out of their loops.
283   keep_running_ = false;
284   ScheduleWork();
285 }
286 
ScheduleWork()287 void MessagePumpLibevent::ScheduleWork() {
288   // Tell libevent (in a threadsafe way) that it should break out of its loop.
289   char buf = 0;
290   int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
291   DCHECK(nwrite == 1 || errno == EAGAIN)
292       << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
293 }
294 
ScheduleDelayedWork(const TimeTicks & delayed_work_time)295 void MessagePumpLibevent::ScheduleDelayedWork(
296     const TimeTicks& delayed_work_time) {
297   // We know that we can't be blocked on Wait right now since this method can
298   // only be called on the same thread as Run, so we only need to update our
299   // record of how long to sleep when we do sleep.
300   delayed_work_time_ = delayed_work_time;
301 }
302 
WillProcessIOEvent()303 void MessagePumpLibevent::WillProcessIOEvent() {
304   FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
305 }
306 
DidProcessIOEvent()307 void MessagePumpLibevent::DidProcessIOEvent() {
308   FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
309 }
310 
Init()311 bool MessagePumpLibevent::Init() {
312   int fds[2];
313   if (pipe(fds)) {
314     DLOG(ERROR) << "pipe() failed, errno: " << errno;
315     return false;
316   }
317   if (SetNonBlocking(fds[0])) {
318     DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
319     return false;
320   }
321   if (SetNonBlocking(fds[1])) {
322     DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
323     return false;
324   }
325   wakeup_pipe_out_ = fds[0];
326   wakeup_pipe_in_ = fds[1];
327 
328   wakeup_event_ = new event;
329   event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
330             OnWakeup, this);
331   event_base_set(event_base_, wakeup_event_);
332 
333   if (event_add(wakeup_event_, 0))
334     return false;
335   return true;
336 }
337 
338 // static
OnLibeventNotification(int fd,short flags,void * context)339 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
340                                                  void* context) {
341   WeakPtr<FileDescriptorWatcher> controller =
342       static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
343   DCHECK(controller.get());
344 
345   MessagePumpLibevent* pump = controller->pump();
346   pump->processed_io_events_ = true;
347 
348   if (flags & EV_WRITE) {
349     controller->OnFileCanWriteWithoutBlocking(fd, pump);
350   }
351   // Check |controller| in case it's been deleted in
352   // controller->OnFileCanWriteWithoutBlocking().
353   if (controller.get() && flags & EV_READ) {
354     controller->OnFileCanReadWithoutBlocking(fd, pump);
355   }
356 }
357 
358 // Called if a byte is received on the wakeup pipe.
359 // static
OnWakeup(int socket,short flags,void * context)360 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
361   MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
362   DCHECK(that->wakeup_pipe_out_ == socket);
363 
364   // Remove and discard the wakeup byte.
365   char buf;
366   int nread = HANDLE_EINTR(read(socket, &buf, 1));
367   DCHECK_EQ(nread, 1);
368   that->processed_io_events_ = true;
369   // Tell libevent to break out of inner loop.
370   event_base_loopbreak(that->event_base_);
371 }
372 
373 }  // namespace base
374