1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include <iomanip>
12
13 #include "webrtc/base/asyncsocket.h"
14 #include "webrtc/base/logging.h"
15 #include "webrtc/base/socketfactory.h"
16 #include "webrtc/base/socketpool.h"
17 #include "webrtc/base/socketstream.h"
18 #include "webrtc/base/thread.h"
19
20 namespace rtc {
21
22 ///////////////////////////////////////////////////////////////////////////////
23 // StreamCache - Caches a set of open streams, defers creation to a separate
24 // StreamPool.
25 ///////////////////////////////////////////////////////////////////////////////
26
StreamCache(StreamPool * pool)27 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) {
28 }
29
~StreamCache()30 StreamCache::~StreamCache() {
31 for (ConnectedList::iterator it = active_.begin(); it != active_.end();
32 ++it) {
33 delete it->second;
34 }
35 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
36 ++it) {
37 delete it->second;
38 }
39 }
40
RequestConnectedStream(const SocketAddress & remote,int * err)41 StreamInterface* StreamCache::RequestConnectedStream(
42 const SocketAddress& remote, int* err) {
43 LOG_F(LS_VERBOSE) << "(" << remote << ")";
44 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
45 ++it) {
46 if (remote == it->first) {
47 it->second->SignalEvent.disconnect(this);
48 // Move from cached_ to active_
49 active_.push_front(*it);
50 cached_.erase(it);
51 if (err)
52 *err = 0;
53 LOG_F(LS_VERBOSE) << "Providing cached stream";
54 return active_.front().second;
55 }
56 }
57 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
58 // We track active streams so that we can remember their address
59 active_.push_front(ConnectedStream(remote, stream));
60 LOG_F(LS_VERBOSE) << "Providing new stream";
61 return active_.front().second;
62 }
63 return NULL;
64 }
65
ReturnConnectedStream(StreamInterface * stream)66 void StreamCache::ReturnConnectedStream(StreamInterface* stream) {
67 for (ConnectedList::iterator it = active_.begin(); it != active_.end();
68 ++it) {
69 if (stream == it->second) {
70 LOG_F(LS_VERBOSE) << "(" << it->first << ")";
71 if (stream->GetState() == SS_CLOSED) {
72 // Return closed streams
73 LOG_F(LS_VERBOSE) << "Returning closed stream";
74 pool_->ReturnConnectedStream(it->second);
75 } else {
76 // Monitor open streams
77 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent);
78 LOG_F(LS_VERBOSE) << "Caching stream";
79 cached_.push_front(*it);
80 }
81 active_.erase(it);
82 return;
83 }
84 }
85 ASSERT(false);
86 }
87
OnStreamEvent(StreamInterface * stream,int events,int err)88 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) {
89 if ((events & SE_CLOSE) == 0) {
90 LOG_F(LS_WARNING) << "(" << events << ", " << err
91 << ") received non-close event";
92 return;
93 }
94 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
95 ++it) {
96 if (stream == it->second) {
97 LOG_F(LS_VERBOSE) << "(" << it->first << ")";
98 // We don't cache closed streams, so return it.
99 it->second->SignalEvent.disconnect(this);
100 LOG_F(LS_VERBOSE) << "Returning closed stream";
101 pool_->ReturnConnectedStream(it->second);
102 cached_.erase(it);
103 return;
104 }
105 }
106 ASSERT(false);
107 }
108
109 //////////////////////////////////////////////////////////////////////
110 // NewSocketPool
111 //////////////////////////////////////////////////////////////////////
112
NewSocketPool(SocketFactory * factory)113 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) {
114 }
115
~NewSocketPool()116 NewSocketPool::~NewSocketPool() {
117 }
118
119 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)120 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
121 AsyncSocket* socket =
122 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM);
123 if (!socket) {
124 if (err)
125 *err = -1;
126 return NULL;
127 }
128 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) {
129 if (err)
130 *err = socket->GetError();
131 delete socket;
132 return NULL;
133 }
134 if (err)
135 *err = 0;
136 return new SocketStream(socket);
137 }
138
139 void
ReturnConnectedStream(StreamInterface * stream)140 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) {
141 Thread::Current()->Dispose(stream);
142 }
143
144 //////////////////////////////////////////////////////////////////////
145 // ReuseSocketPool
146 //////////////////////////////////////////////////////////////////////
147
ReuseSocketPool(SocketFactory * factory)148 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory)
149 : factory_(factory), stream_(NULL), checked_out_(false) {
150 }
151
~ReuseSocketPool()152 ReuseSocketPool::~ReuseSocketPool() {
153 ASSERT(!checked_out_);
154 delete stream_;
155 }
156
157 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)158 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
159 // Only one socket can be used from this "pool" at a time
160 ASSERT(!checked_out_);
161 if (!stream_) {
162 LOG_F(LS_VERBOSE) << "Creating new socket";
163 int family = remote.family();
164 // TODO: Deal with this when we/I clean up DNS resolution.
165 if (remote.IsUnresolvedIP()) {
166 family = AF_INET;
167 }
168 AsyncSocket* socket =
169 factory_->CreateAsyncSocket(family, SOCK_STREAM);
170 if (!socket) {
171 if (err)
172 *err = -1;
173 return NULL;
174 }
175 stream_ = new SocketStream(socket);
176 }
177 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) {
178 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_;
179 } else {
180 remote_ = remote;
181 stream_->Close();
182 if ((stream_->GetSocket()->Connect(remote_) != 0)
183 && !stream_->GetSocket()->IsBlocking()) {
184 if (err)
185 *err = stream_->GetSocket()->GetError();
186 return NULL;
187 } else {
188 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_;
189 }
190 }
191 stream_->SignalEvent.disconnect(this);
192 checked_out_ = true;
193 if (err)
194 *err = 0;
195 return stream_;
196 }
197
198 void
ReturnConnectedStream(StreamInterface * stream)199 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) {
200 ASSERT(stream == stream_);
201 ASSERT(checked_out_);
202 checked_out_ = false;
203 // Until the socket is reused, monitor it to determine if it closes.
204 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent);
205 }
206
207 void
OnStreamEvent(StreamInterface * stream,int events,int err)208 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) {
209 ASSERT(stream == stream_);
210 ASSERT(!checked_out_);
211
212 // If the stream was written to and then immediately returned to us then
213 // we may get a writable notification for it, which we should ignore.
214 if (events == SE_WRITE) {
215 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring";
216 return;
217 }
218
219 // If the peer sent data, we can't process it, so drop the connection.
220 // If the socket has closed, clean it up.
221 // In either case, we'll reconnect it the next time it is used.
222 ASSERT(0 != (events & (SE_READ|SE_CLOSE)));
223 if (0 != (events & SE_CLOSE)) {
224 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err;
225 } else {
226 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing";
227 }
228 stream_->Close();
229 }
230
231 ///////////////////////////////////////////////////////////////////////////////
232 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached
233 // LoggingAdapters.
234 ///////////////////////////////////////////////////////////////////////////////
235
LoggingPoolAdapter(StreamPool * pool,LoggingSeverity level,const std::string & label,bool binary_mode)236 LoggingPoolAdapter::LoggingPoolAdapter(
237 StreamPool* pool, LoggingSeverity level, const std::string& label,
238 bool binary_mode)
239 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) {
240 }
241
~LoggingPoolAdapter()242 LoggingPoolAdapter::~LoggingPoolAdapter() {
243 for (StreamList::iterator it = recycle_bin_.begin();
244 it != recycle_bin_.end(); ++it) {
245 delete *it;
246 }
247 }
248
RequestConnectedStream(const SocketAddress & remote,int * err)249 StreamInterface* LoggingPoolAdapter::RequestConnectedStream(
250 const SocketAddress& remote, int* err) {
251 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
252 ASSERT(SS_CLOSED != stream->GetState());
253 std::stringstream ss;
254 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8)
255 << stream << ")";
256 LOG_V(level_) << ss.str()
257 << ((SS_OPEN == stream->GetState()) ? " Connected"
258 : " Connecting")
259 << " to " << remote;
260 if (recycle_bin_.empty()) {
261 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_);
262 }
263 LoggingAdapter* logging = recycle_bin_.front();
264 recycle_bin_.pop_front();
265 logging->set_label(ss.str());
266 logging->Attach(stream);
267 return logging;
268 }
269 return NULL;
270 }
271
ReturnConnectedStream(StreamInterface * stream)272 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) {
273 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream);
274 pool_->ReturnConnectedStream(logging->Detach());
275 recycle_bin_.push_back(logging);
276 }
277
278 ///////////////////////////////////////////////////////////////////////////////
279
280 } // namespace rtc
281