• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "tools/android/forwarder2/forwarder.h"
6 
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/posix/eintr_wrapper.h"
10 #include "tools/android/forwarder2/socket.h"
11 
12 namespace forwarder2 {
13 namespace {
14 
15 const int kBufferSize = 32 * 1024;
16 
17 }  // namespace
18 
19 
20 // Helper class to buffer reads and writes from one socket to another.
21 // Each implements a small buffer connected two one input socket, and
22 // one output socket.
23 //
24 //   socket_from_ ---> [BufferedCopier] ---> socket_to_
25 //
26 // These objects are used in a pair to handle duplex traffic, as in:
27 //
28 //                    ------> [BufferedCopier_1] --->
29 //                  /                                \
30 //      socket_1   *                                  * socket_2
31 //                  \                                /
32 //                   <------ [BufferedCopier_2] <----
33 //
34 // When a BufferedCopier is in the READING state (see below), it only listens
35 // to events on its input socket, and won't detect when its output socket
36 // disconnects. To work around this, its peer will call its Close() method
37 // when that happens.
38 
39 class Forwarder::BufferedCopier {
40  public:
41   // Possible states:
42   //    READING - Empty buffer and Waiting for input.
43   //    WRITING - Data in buffer, and waiting for output.
44   //    CLOSING - Like WRITING, but do not try to read after that.
45   //    CLOSED  - Completely closed.
46   //
47   // State transitions are:
48   //
49   //   T01:  READING ---[receive data]---> WRITING
50   //   T02:  READING ---[error on input socket]---> CLOSED
51   //   T03:  READING ---[Close() call]---> CLOSED
52   //
53   //   T04:  WRITING ---[write partial data]---> WRITING
54   //   T05:  WRITING ---[write all data]----> READING
55   //   T06:  WRITING ---[error on output socket]----> CLOSED
56   //   T07:  WRITING ---[Close() call]---> CLOSING
57   //
58   //   T08:  CLOSING ---[write partial data]---> CLOSING
59   //   T09:  CLOSING ---[write all data]----> CLOSED
60   //   T10:  CLOSING ---[Close() call]---> CLOSING
61   //   T11:  CLOSING ---[error on output socket] ---> CLOSED
62   //
63   enum State {
64     STATE_READING = 0,
65     STATE_WRITING = 1,
66     STATE_CLOSING = 2,
67     STATE_CLOSED = 3,
68   };
69 
70   // Does NOT own the pointers.
BufferedCopier(Socket * socket_from,Socket * socket_to)71   BufferedCopier(Socket* socket_from, Socket* socket_to)
72       : socket_from_(socket_from),
73         socket_to_(socket_to),
74         bytes_read_(0),
75         write_offset_(0),
76         peer_(NULL),
77         state_(STATE_READING) {}
78 
79   // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
SetPeer(BufferedCopier * peer)80   void SetPeer(BufferedCopier* peer) {
81     DCHECK(!peer_);
82     peer_ = peer;
83   }
84 
is_closed() const85   bool is_closed() const { return state_ == STATE_CLOSED; }
86 
87   // Gently asks to close a buffer. Called either by the peer or the forwarder.
Close()88   void Close() {
89     switch (state_) {
90       case STATE_READING:
91         state_ = STATE_CLOSED;  // T03
92         break;
93       case STATE_WRITING:
94         state_ = STATE_CLOSING;  // T07
95         break;
96       case STATE_CLOSING:
97         break;  // T10
98       case STATE_CLOSED:
99         ;
100     }
101   }
102 
103   // Call this before select(). This updates |read_fds|,
104   // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
PrepareSelect(fd_set * read_fds,fd_set * write_fds,int * max_fd)105   void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
106     int fd;
107     switch (state_) {
108       case STATE_READING:
109         DCHECK(bytes_read_ == 0);
110         DCHECK(write_offset_ == 0);
111         fd = socket_from_->fd();
112         if (fd < 0) {
113           ForceClose();  // T02
114           return;
115         }
116         FD_SET(fd, read_fds);
117         break;
118 
119       case STATE_WRITING:
120       case STATE_CLOSING:
121         DCHECK(bytes_read_ > 0);
122         DCHECK(write_offset_ < bytes_read_);
123         fd = socket_to_->fd();
124         if (fd < 0) {
125           ForceClose();  // T06
126           return;
127         }
128         FD_SET(fd, write_fds);
129         break;
130 
131       case STATE_CLOSED:
132         return;
133     }
134     *max_fd = std::max(*max_fd, fd);
135   }
136 
137   // Call this after a select() call to operate over the buffer.
ProcessSelect(const fd_set & read_fds,const fd_set & write_fds)138   void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
139     int fd, ret;
140     switch (state_) {
141       case STATE_READING:
142         fd = socket_from_->fd();
143         if (fd < 0) {
144           state_ = STATE_CLOSED;  // T02
145           return;
146         }
147         if (!FD_ISSET(fd, &read_fds))
148           return;
149 
150         ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
151         if (ret <= 0) {
152           ForceClose();  // T02
153           return;
154         }
155         bytes_read_ = ret;
156         write_offset_ = 0;
157         state_ = STATE_WRITING;  // T01
158         break;
159 
160       case STATE_WRITING:
161       case STATE_CLOSING:
162         fd = socket_to_->fd();
163         if (fd < 0) {
164           ForceClose();  // T06 + T11
165           return;
166         }
167         if (!FD_ISSET(fd, &write_fds))
168           return;
169 
170         ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
171                                            bytes_read_ - write_offset_);
172         if (ret <= 0) {
173           ForceClose();  // T06 + T11
174           return;
175         }
176 
177         write_offset_ += ret;
178         if (write_offset_ < bytes_read_)
179           return;  // T08 + T04
180 
181         write_offset_ = 0;
182         bytes_read_ = 0;
183         if (state_ == STATE_CLOSING) {
184           ForceClose();  // T09
185           return;
186         }
187         state_ = STATE_READING;  // T05
188         break;
189 
190       case STATE_CLOSED:
191         ;
192     }
193   }
194 
195  private:
196   // Internal method used to close the buffer and notify the peer, if any.
ForceClose()197   void ForceClose() {
198     if (peer_) {
199       peer_->Close();
200       peer_ = NULL;
201     }
202     state_ = STATE_CLOSED;
203   }
204 
205   // Not owned.
206   Socket* socket_from_;
207   Socket* socket_to_;
208 
209   int bytes_read_;
210   int write_offset_;
211   BufferedCopier* peer_;
212   State state_;
213   char buffer_[kBufferSize];
214 
215   DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
216 };
217 
Forwarder(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)218 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
219                      scoped_ptr<Socket> socket2)
220     : socket1_(socket1.Pass()),
221       socket2_(socket2.Pass()),
222       buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
223       buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
224   buffer1_->SetPeer(buffer2_.get());
225   buffer2_->SetPeer(buffer1_.get());
226 }
227 
~Forwarder()228 Forwarder::~Forwarder() {
229   DCHECK(thread_checker_.CalledOnValidThread());
230 }
231 
RegisterFDs(fd_set * read_fds,fd_set * write_fds,int * max_fd)232 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
233   DCHECK(thread_checker_.CalledOnValidThread());
234   buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
235   buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
236 }
237 
ProcessEvents(const fd_set & read_fds,const fd_set & write_fds)238 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
239   DCHECK(thread_checker_.CalledOnValidThread());
240   buffer1_->ProcessSelect(read_fds, write_fds);
241   buffer2_->ProcessSelect(read_fds, write_fds);
242 }
243 
IsClosed() const244 bool Forwarder::IsClosed() const {
245   DCHECK(thread_checker_.CalledOnValidThread());
246   return buffer1_->is_closed() && buffer2_->is_closed();
247 }
248 
Shutdown()249 void Forwarder::Shutdown() {
250   DCHECK(thread_checker_.CalledOnValidThread());
251   buffer1_->Close();
252   buffer2_->Close();
253 }
254 
255 }  // namespace forwarder2
256