1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "tools/android/forwarder2/forwarder.h"
6
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/posix/eintr_wrapper.h"
10 #include "tools/android/forwarder2/socket.h"
11
12 namespace forwarder2 {
13 namespace {
14
15 const int kBufferSize = 32 * 1024;
16
17 } // namespace
18
19
20 // Helper class to buffer reads and writes from one socket to another.
21 // Each implements a small buffer connected two one input socket, and
22 // one output socket.
23 //
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_
25 //
26 // These objects are used in a pair to handle duplex traffic, as in:
27 //
28 // ------> [BufferedCopier_1] --->
29 // / \
30 // socket_1 * * socket_2
31 // \ /
32 // <------ [BufferedCopier_2] <----
33 //
34 // When a BufferedCopier is in the READING state (see below), it only listens
35 // to events on its input socket, and won't detect when its output socket
36 // disconnects. To work around this, its peer will call its Close() method
37 // when that happens.
38
39 class Forwarder::BufferedCopier {
40 public:
41 // Possible states:
42 // READING - Empty buffer and Waiting for input.
43 // WRITING - Data in buffer, and waiting for output.
44 // CLOSING - Like WRITING, but do not try to read after that.
45 // CLOSED - Completely closed.
46 //
47 // State transitions are:
48 //
49 // T01: READING ---[receive data]---> WRITING
50 // T02: READING ---[error on input socket]---> CLOSED
51 // T03: READING ---[Close() call]---> CLOSED
52 //
53 // T04: WRITING ---[write partial data]---> WRITING
54 // T05: WRITING ---[write all data]----> READING
55 // T06: WRITING ---[error on output socket]----> CLOSED
56 // T07: WRITING ---[Close() call]---> CLOSING
57 //
58 // T08: CLOSING ---[write partial data]---> CLOSING
59 // T09: CLOSING ---[write all data]----> CLOSED
60 // T10: CLOSING ---[Close() call]---> CLOSING
61 // T11: CLOSING ---[error on output socket] ---> CLOSED
62 //
63 enum State {
64 STATE_READING = 0,
65 STATE_WRITING = 1,
66 STATE_CLOSING = 2,
67 STATE_CLOSED = 3,
68 };
69
70 // Does NOT own the pointers.
BufferedCopier(Socket * socket_from,Socket * socket_to)71 BufferedCopier(Socket* socket_from, Socket* socket_to)
72 : socket_from_(socket_from),
73 socket_to_(socket_to),
74 bytes_read_(0),
75 write_offset_(0),
76 peer_(NULL),
77 state_(STATE_READING) {}
78
79 // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
SetPeer(BufferedCopier * peer)80 void SetPeer(BufferedCopier* peer) {
81 DCHECK(!peer_);
82 peer_ = peer;
83 }
84
is_closed() const85 bool is_closed() const { return state_ == STATE_CLOSED; }
86
87 // Gently asks to close a buffer. Called either by the peer or the forwarder.
Close()88 void Close() {
89 switch (state_) {
90 case STATE_READING:
91 state_ = STATE_CLOSED; // T03
92 break;
93 case STATE_WRITING:
94 state_ = STATE_CLOSING; // T07
95 break;
96 case STATE_CLOSING:
97 break; // T10
98 case STATE_CLOSED:
99 ;
100 }
101 }
102
103 // Call this before select(). This updates |read_fds|,
104 // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
PrepareSelect(fd_set * read_fds,fd_set * write_fds,int * max_fd)105 void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
106 int fd;
107 switch (state_) {
108 case STATE_READING:
109 DCHECK(bytes_read_ == 0);
110 DCHECK(write_offset_ == 0);
111 fd = socket_from_->fd();
112 if (fd < 0) {
113 ForceClose(); // T02
114 return;
115 }
116 FD_SET(fd, read_fds);
117 break;
118
119 case STATE_WRITING:
120 case STATE_CLOSING:
121 DCHECK(bytes_read_ > 0);
122 DCHECK(write_offset_ < bytes_read_);
123 fd = socket_to_->fd();
124 if (fd < 0) {
125 ForceClose(); // T06
126 return;
127 }
128 FD_SET(fd, write_fds);
129 break;
130
131 case STATE_CLOSED:
132 return;
133 }
134 *max_fd = std::max(*max_fd, fd);
135 }
136
137 // Call this after a select() call to operate over the buffer.
ProcessSelect(const fd_set & read_fds,const fd_set & write_fds)138 void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
139 int fd, ret;
140 switch (state_) {
141 case STATE_READING:
142 fd = socket_from_->fd();
143 if (fd < 0) {
144 state_ = STATE_CLOSED; // T02
145 return;
146 }
147 if (!FD_ISSET(fd, &read_fds))
148 return;
149
150 ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
151 if (ret <= 0) {
152 ForceClose(); // T02
153 return;
154 }
155 bytes_read_ = ret;
156 write_offset_ = 0;
157 state_ = STATE_WRITING; // T01
158 break;
159
160 case STATE_WRITING:
161 case STATE_CLOSING:
162 fd = socket_to_->fd();
163 if (fd < 0) {
164 ForceClose(); // T06 + T11
165 return;
166 }
167 if (!FD_ISSET(fd, &write_fds))
168 return;
169
170 ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
171 bytes_read_ - write_offset_);
172 if (ret <= 0) {
173 ForceClose(); // T06 + T11
174 return;
175 }
176
177 write_offset_ += ret;
178 if (write_offset_ < bytes_read_)
179 return; // T08 + T04
180
181 write_offset_ = 0;
182 bytes_read_ = 0;
183 if (state_ == STATE_CLOSING) {
184 ForceClose(); // T09
185 return;
186 }
187 state_ = STATE_READING; // T05
188 break;
189
190 case STATE_CLOSED:
191 ;
192 }
193 }
194
195 private:
196 // Internal method used to close the buffer and notify the peer, if any.
ForceClose()197 void ForceClose() {
198 if (peer_) {
199 peer_->Close();
200 peer_ = NULL;
201 }
202 state_ = STATE_CLOSED;
203 }
204
205 // Not owned.
206 Socket* socket_from_;
207 Socket* socket_to_;
208
209 int bytes_read_;
210 int write_offset_;
211 BufferedCopier* peer_;
212 State state_;
213 char buffer_[kBufferSize];
214
215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
216 };
217
Forwarder(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)218 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
219 scoped_ptr<Socket> socket2)
220 : socket1_(socket1.Pass()),
221 socket2_(socket2.Pass()),
222 buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
223 buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
224 buffer1_->SetPeer(buffer2_.get());
225 buffer2_->SetPeer(buffer1_.get());
226 }
227
~Forwarder()228 Forwarder::~Forwarder() {
229 DCHECK(thread_checker_.CalledOnValidThread());
230 }
231
RegisterFDs(fd_set * read_fds,fd_set * write_fds,int * max_fd)232 void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
233 DCHECK(thread_checker_.CalledOnValidThread());
234 buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
235 buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
236 }
237
ProcessEvents(const fd_set & read_fds,const fd_set & write_fds)238 void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
239 DCHECK(thread_checker_.CalledOnValidThread());
240 buffer1_->ProcessSelect(read_fds, write_fds);
241 buffer2_->ProcessSelect(read_fds, write_fds);
242 }
243
IsClosed() const244 bool Forwarder::IsClosed() const {
245 DCHECK(thread_checker_.CalledOnValidThread());
246 return buffer1_->is_closed() && buffer2_->is_closed();
247 }
248
Shutdown()249 void Forwarder::Shutdown() {
250 DCHECK(thread_checker_.CalledOnValidThread());
251 buffer1_->Close();
252 buffer2_->Close();
253 }
254
255 } // namespace forwarder2
256