1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "tools/android/forwarder2/forwarder.h"
6
7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/posix/eintr_wrapper.h"
12 #include "base/single_thread_task_runner.h"
13 #include "tools/android/forwarder2/socket.h"
14
15 namespace forwarder2 {
16 namespace {
17
18 // Helper class to buffer reads and writes from one socket to another.
19 class BufferedCopier {
20 public:
21 // Does NOT own the pointers.
BufferedCopier(Socket * socket_from,Socket * socket_to)22 BufferedCopier(Socket* socket_from,
23 Socket* socket_to)
24 : socket_from_(socket_from),
25 socket_to_(socket_to),
26 bytes_read_(0),
27 write_offset_(0) {
28 }
29
AddToReadSet(fd_set * read_fds)30 bool AddToReadSet(fd_set* read_fds) {
31 if (bytes_read_ == 0)
32 return socket_from_->AddFdToSet(read_fds);
33 return false;
34 }
35
AddToWriteSet(fd_set * write_fds)36 bool AddToWriteSet(fd_set* write_fds) {
37 if (write_offset_ < bytes_read_)
38 return socket_to_->AddFdToSet(write_fds);
39 return false;
40 }
41
TryRead(const fd_set & read_fds)42 bool TryRead(const fd_set& read_fds) {
43 if (!socket_from_->IsFdInSet(read_fds))
44 return false;
45 if (bytes_read_ != 0) // Can't read.
46 return false;
47 int ret = socket_from_->Read(buffer_, kBufferSize);
48 if (ret > 0) {
49 bytes_read_ = ret;
50 return true;
51 }
52 return false;
53 }
54
TryWrite(const fd_set & write_fds)55 bool TryWrite(const fd_set& write_fds) {
56 if (!socket_to_->IsFdInSet(write_fds))
57 return false;
58 if (write_offset_ >= bytes_read_) // Nothing to write.
59 return false;
60 int ret = socket_to_->Write(buffer_ + write_offset_,
61 bytes_read_ - write_offset_);
62 if (ret > 0) {
63 write_offset_ += ret;
64 if (write_offset_ == bytes_read_) {
65 write_offset_ = 0;
66 bytes_read_ = 0;
67 }
68 return true;
69 }
70 return false;
71 }
72
73 private:
74 // Not owned.
75 Socket* socket_from_;
76 Socket* socket_to_;
77
78 // A big buffer to let our file-over-http bridge work more like real file.
79 static const int kBufferSize = 1024 * 128;
80 int bytes_read_;
81 int write_offset_;
82 char buffer_[kBufferSize];
83
84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
85 };
86
87 // Internal class that wraps a helper thread to forward traffic between
88 // |socket1| and |socket2|. After creating a new instance, call its Start()
89 // method to launch operations. Thread stops automatically if one of the socket
90 // disconnects, but ensures that all buffered writes to the other, still alive,
91 // socket, are written first. When this happens, the instance will delete itself
92 // automatically.
93 // Note that the instance will always be destroyed on the same thread that
94 // created it.
95 class Forwarder {
96 public:
Forwarder(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
98 : socket1_(socket1.Pass()),
99 socket2_(socket2.Pass()),
100 destructor_runner_(base::MessageLoopProxy::current()),
101 thread_("ForwarderThread") {
102 }
103
Start()104 void Start() {
105 thread_.Start();
106 thread_.message_loop_proxy()->PostTask(
107 FROM_HERE,
108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
109 }
110
111 private:
ThreadHandler()112 void ThreadHandler() {
113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
114 fd_set read_fds;
115 fd_set write_fds;
116
117 // Copy from socket1 to socket2
118 BufferedCopier buffer1(socket1_.get(), socket2_.get());
119 // Copy from socket2 to socket1
120 BufferedCopier buffer2(socket2_.get(), socket1_.get());
121
122 bool run = true;
123 while (run) {
124 FD_ZERO(&read_fds);
125 FD_ZERO(&write_fds);
126
127 buffer1.AddToReadSet(&read_fds);
128 buffer2.AddToReadSet(&read_fds);
129 buffer1.AddToWriteSet(&write_fds);
130 buffer2.AddToWriteSet(&write_fds);
131
132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
133 PLOG(ERROR) << "select";
134 break;
135 }
136 // When a socket in the read set closes the connection, select() returns
137 // with that socket descriptor set as "ready to read". When we call
138 // TryRead() below, it will return false, but the while loop will continue
139 // to run until all the write operations are finished, to make sure the
140 // buffers are completely flushed out.
141
142 // Keep running while we have some operation to do.
143 run = buffer1.TryRead(read_fds);
144 run = run || buffer2.TryRead(read_fds);
145 run = run || buffer1.TryWrite(write_fds);
146 run = run || buffer2.TryWrite(write_fds);
147 }
148
149 // Note that the thread that |destruction_runner_| runs tasks on could be
150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
151 // the sockets now rather than relying on the destructor.
152 socket1_.reset();
153 socket2_.reset();
154
155 // Note that base::Thread must be destroyed on the thread it was created on.
156 destructor_runner_->DeleteSoon(FROM_HERE, this);
157 }
158
159 scoped_ptr<Socket> socket1_;
160 scoped_ptr<Socket> socket2_;
161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
162 base::Thread thread_;
163 };
164
165 } // namespace
166
StartForwarder(scoped_ptr<Socket> socket1,scoped_ptr<Socket> socket2)167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) {
168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start();
169 }
170
171 } // namespace forwarder2
172