1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include <iomanip>
29
30 #include "talk/base/asyncsocket.h"
31 #include "talk/base/logging.h"
32 #include "talk/base/socketfactory.h"
33 #include "talk/base/socketpool.h"
34 #include "talk/base/socketstream.h"
35 #include "talk/base/thread.h"
36
37 namespace talk_base {
38
39 ///////////////////////////////////////////////////////////////////////////////
40 // StreamCache - Caches a set of open streams, defers creation to a separate
41 // StreamPool.
42 ///////////////////////////////////////////////////////////////////////////////
43
StreamCache(StreamPool * pool)44 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) {
45 }
46
~StreamCache()47 StreamCache::~StreamCache() {
48 for (ConnectedList::iterator it = active_.begin(); it != active_.end();
49 ++it) {
50 delete it->second;
51 }
52 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
53 ++it) {
54 delete it->second;
55 }
56 }
57
RequestConnectedStream(const SocketAddress & remote,int * err)58 StreamInterface* StreamCache::RequestConnectedStream(
59 const SocketAddress& remote, int* err) {
60 LOG_F(LS_VERBOSE) << "(" << remote << ")";
61 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
62 ++it) {
63 if (remote == it->first) {
64 it->second->SignalEvent.disconnect(this);
65 // Move from cached_ to active_
66 active_.push_front(*it);
67 cached_.erase(it);
68 if (err)
69 *err = 0;
70 LOG_F(LS_VERBOSE) << "Providing cached stream";
71 return active_.front().second;
72 }
73 }
74 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
75 // We track active streams so that we can remember their address
76 active_.push_front(ConnectedStream(remote, stream));
77 LOG_F(LS_VERBOSE) << "Providing new stream";
78 return active_.front().second;
79 }
80 return NULL;
81 }
82
ReturnConnectedStream(StreamInterface * stream)83 void StreamCache::ReturnConnectedStream(StreamInterface* stream) {
84 for (ConnectedList::iterator it = active_.begin(); it != active_.end();
85 ++it) {
86 if (stream == it->second) {
87 LOG_F(LS_VERBOSE) << "(" << it->first << ")";
88 if (stream->GetState() == SS_CLOSED) {
89 // Return closed streams
90 LOG_F(LS_VERBOSE) << "Returning closed stream";
91 pool_->ReturnConnectedStream(it->second);
92 } else {
93 // Monitor open streams
94 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent);
95 LOG_F(LS_VERBOSE) << "Caching stream";
96 cached_.push_front(*it);
97 }
98 active_.erase(it);
99 return;
100 }
101 }
102 ASSERT(false);
103 }
104
OnStreamEvent(StreamInterface * stream,int events,int err)105 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) {
106 if ((events & SE_CLOSE) == 0) {
107 LOG_F(LS_WARNING) << "(" << events << ", " << err
108 << ") received non-close event";
109 return;
110 }
111 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end();
112 ++it) {
113 if (stream == it->second) {
114 LOG_F(LS_VERBOSE) << "(" << it->first << ")";
115 // We don't cache closed streams, so return it.
116 it->second->SignalEvent.disconnect(this);
117 LOG_F(LS_VERBOSE) << "Returning closed stream";
118 pool_->ReturnConnectedStream(it->second);
119 cached_.erase(it);
120 return;
121 }
122 }
123 ASSERT(false);
124 }
125
126 //////////////////////////////////////////////////////////////////////
127 // NewSocketPool
128 //////////////////////////////////////////////////////////////////////
129
NewSocketPool(SocketFactory * factory)130 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) {
131 }
132
~NewSocketPool()133 NewSocketPool::~NewSocketPool() {
134 }
135
136 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)137 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
138 AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM);
139 if (!socket) {
140 ASSERT(false);
141 if (err)
142 *err = -1;
143 return NULL;
144 }
145 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) {
146 if (err)
147 *err = socket->GetError();
148 delete socket;
149 return NULL;
150 }
151 if (err)
152 *err = 0;
153 return new SocketStream(socket);
154 }
155
156 void
ReturnConnectedStream(StreamInterface * stream)157 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) {
158 Thread::Current()->Dispose(stream);
159 }
160
161 //////////////////////////////////////////////////////////////////////
162 // ReuseSocketPool
163 //////////////////////////////////////////////////////////////////////
164
ReuseSocketPool(SocketFactory * factory)165 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory)
166 : factory_(factory), stream_(NULL), checked_out_(false) {
167 }
168
~ReuseSocketPool()169 ReuseSocketPool::~ReuseSocketPool() {
170 ASSERT(!checked_out_);
171 delete stream_;
172 }
173
174 StreamInterface*
RequestConnectedStream(const SocketAddress & remote,int * err)175 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) {
176 // Only one socket can be used from this "pool" at a time
177 ASSERT(!checked_out_);
178 if (!stream_) {
179 LOG_F(LS_VERBOSE) << "Creating new socket";
180 AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM);
181 if (!socket) {
182 ASSERT(false);
183 if (err)
184 *err = -1;
185 return NULL;
186 }
187 stream_ = new SocketStream(socket);
188 }
189 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) {
190 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_;
191 } else {
192 remote_ = remote;
193 stream_->Close();
194 if ((stream_->GetSocket()->Connect(remote_) != 0)
195 && !stream_->GetSocket()->IsBlocking()) {
196 if (err)
197 *err = stream_->GetSocket()->GetError();
198 return NULL;
199 } else {
200 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_;
201 }
202 }
203 stream_->SignalEvent.disconnect(this);
204 checked_out_ = true;
205 if (err)
206 *err = 0;
207 return stream_;
208 }
209
210 void
ReturnConnectedStream(StreamInterface * stream)211 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) {
212 ASSERT(stream == stream_);
213 ASSERT(checked_out_);
214 checked_out_ = false;
215 // Until the socket is reused, monitor it to determine if it closes.
216 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent);
217 }
218
219 void
OnStreamEvent(StreamInterface * stream,int events,int err)220 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) {
221 ASSERT(stream == stream_);
222 ASSERT(!checked_out_);
223
224 // If the stream was written to and then immediately returned to us then
225 // we may get a writable notification for it, which we should ignore.
226 if (events == SE_WRITE) {
227 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring";
228 return;
229 }
230
231 // If the peer sent data, we can't process it, so drop the connection.
232 // If the socket has closed, clean it up.
233 // In either case, we'll reconnect it the next time it is used.
234 ASSERT(0 != (events & (SE_READ|SE_CLOSE)));
235 if (0 != (events & SE_CLOSE)) {
236 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err;
237 } else {
238 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing";
239 }
240 stream_->Close();
241 }
242
243 ///////////////////////////////////////////////////////////////////////////////
244 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached
245 // LoggingAdapters.
246 ///////////////////////////////////////////////////////////////////////////////
247
LoggingPoolAdapter(StreamPool * pool,LoggingSeverity level,const std::string & label,bool binary_mode)248 LoggingPoolAdapter::LoggingPoolAdapter(
249 StreamPool* pool, LoggingSeverity level, const std::string& label,
250 bool binary_mode)
251 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) {
252 }
253
~LoggingPoolAdapter()254 LoggingPoolAdapter::~LoggingPoolAdapter() {
255 for (StreamList::iterator it = recycle_bin_.begin();
256 it != recycle_bin_.end(); ++it) {
257 delete *it;
258 }
259 }
260
RequestConnectedStream(const SocketAddress & remote,int * err)261 StreamInterface* LoggingPoolAdapter::RequestConnectedStream(
262 const SocketAddress& remote, int* err) {
263 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) {
264 ASSERT(SS_CLOSED != stream->GetState());
265 std::stringstream ss;
266 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8)
267 << stream << ")";
268 LOG_V(level_) << ss.str()
269 << ((SS_OPEN == stream->GetState()) ? " Connected"
270 : " Connecting")
271 << " to " << remote;
272 if (recycle_bin_.empty()) {
273 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_);
274 }
275 LoggingAdapter* logging = recycle_bin_.front();
276 recycle_bin_.pop_front();
277 logging->set_label(ss.str());
278 logging->Attach(stream);
279 return logging;
280 }
281 return NULL;
282 }
283
ReturnConnectedStream(StreamInterface * stream)284 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) {
285 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream);
286 pool_->ReturnConnectedStream(logging->Detach());
287 recycle_bin_.push_back(logging);
288 }
289
290 ///////////////////////////////////////////////////////////////////////////////
291
292 } // namespace talk_base
293