• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "tools/android/forwarder2/forwarders_manager.h"
6 
7 #include <sys/select.h>
8 #include <unistd.h>
9 
10 #include <algorithm>
11 
12 #include "base/basictypes.h"
13 #include "base/bind.h"
14 #include "base/callback_helpers.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/message_loop/message_loop_proxy.h"
18 #include "base/posix/eintr_wrapper.h"
19 #include "tools/android/forwarder2/forwarder.h"
20 #include "tools/android/forwarder2/socket.h"
21 
22 namespace forwarder2 {
23 
ForwardersManager()24 ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
25   thread_.Start();
26   WaitForEventsOnInternalThreadSoon();
27 }
28 
29 
~ForwardersManager()30 ForwardersManager::~ForwardersManager() {
31   deletion_notifier_.Notify();
32 }
33 
CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)34 void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
35                                                    scoped_ptr<Socket> socket2) {
36   // Note that the internal Forwarder vector is populated on the internal thread
37   // which is the only thread from which it's accessed.
38   thread_.message_loop_proxy()->PostTask(
39       FROM_HERE,
40       base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
41                  base::Unretained(this), base::Passed(&socket1),
42                  base::Passed(&socket2)));
43 
44   // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
45   // the internal thread gets executed immediately.
46   wakeup_notifier_.Notify();
47 }
48 
CreateNewForwarderOnInternalThread(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)49 void ForwardersManager::CreateNewForwarderOnInternalThread(
50     scoped_ptr<Socket> socket1,
51     scoped_ptr<Socket> socket2) {
52   DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
53   forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
54 }
55 
WaitForEventsOnInternalThreadSoon()56 void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
57   thread_.message_loop_proxy()->PostTask(
58       FROM_HERE,
59       base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
60                  base::Unretained(this)));
61 }
62 
WaitForEventsOnInternalThread()63 void ForwardersManager::WaitForEventsOnInternalThread() {
64   DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
65   fd_set read_fds;
66   fd_set write_fds;
67 
68   FD_ZERO(&read_fds);
69   FD_ZERO(&write_fds);
70 
71   // Populate the file descriptor sets.
72   int max_fd = -1;
73   for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
74        it != forwarders_.end(); ++it) {
75     Forwarder* const forwarder = *it;
76     forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
77   }
78 
79   const int notifier_fds[] = {
80     wakeup_notifier_.receiver_fd(),
81     deletion_notifier_.receiver_fd(),
82   };
83 
84   for (int i = 0; i < arraysize(notifier_fds); ++i) {
85     const int notifier_fd = notifier_fds[i];
86     DCHECK_GT(notifier_fd, -1);
87     FD_SET(notifier_fd, &read_fds);
88     max_fd = std::max(max_fd, notifier_fd);
89   }
90 
91   const int ret = HANDLE_EINTR(
92       select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
93   if (ret < 0) {
94     PLOG(ERROR) << "select";
95     return;
96   }
97 
98   const bool must_shutdown = FD_ISSET(
99       deletion_notifier_.receiver_fd(), &read_fds);
100   if (must_shutdown && forwarders_.empty())
101     return;
102 
103   base::ScopedClosureRunner wait_for_events_soon(
104       base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon,
105                  base::Unretained(this)));
106 
107   if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
108     // Note that the events on FDs other than the wakeup notifier one, if any,
109     // will be processed upon the next select().
110     wakeup_notifier_.Reset();
111     return;
112   }
113 
114   // Notify the Forwarder instances and remove the ones that are closed.
115   for (size_t i = 0; i < forwarders_.size(); ) {
116     Forwarder* const forwarder = forwarders_[i];
117     forwarder->ProcessEvents(read_fds, write_fds);
118 
119     if (must_shutdown)
120       forwarder->Shutdown();
121 
122     if (!forwarder->IsClosed()) {
123       ++i;
124       continue;
125     }
126 
127     std::swap(forwarders_[i], forwarders_.back());
128     forwarders_.pop_back();
129   }
130 }
131 
132 }  // namespace forwarder2
133