• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/socket/websocket_transport_client_socket_pool.h"
6 
7 #include <algorithm>
8 
9 #include "base/compiler_specific.h"
10 #include "base/logging.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/string_util.h"
13 #include "base/time/time.h"
14 #include "base/values.h"
15 #include "net/base/net_errors.h"
16 #include "net/base/net_log.h"
17 #include "net/socket/client_socket_handle.h"
18 #include "net/socket/client_socket_pool_base.h"
19 #include "net/socket/websocket_endpoint_lock_manager.h"
20 #include "net/socket/websocket_transport_connect_sub_job.h"
21 
22 namespace net {
23 
24 namespace {
25 
26 using base::TimeDelta;
27 
28 // TODO(ricea): For now, we implement a global timeout for compatability with
29 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
30 // selection process more tightly, it could do something smarter here.
31 const int kTransportConnectJobTimeoutInSeconds = 240;  // 4 minutes.
32 
33 }  // namespace
34 
WebSocketTransportConnectJob(const std::string & group_name,RequestPriority priority,const scoped_refptr<TransportSocketParams> & params,TimeDelta timeout_duration,const CompletionCallback & callback,ClientSocketFactory * client_socket_factory,HostResolver * host_resolver,ClientSocketHandle * handle,Delegate * delegate,NetLog * pool_net_log,const BoundNetLog & request_net_log)35 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
36     const std::string& group_name,
37     RequestPriority priority,
38     const scoped_refptr<TransportSocketParams>& params,
39     TimeDelta timeout_duration,
40     const CompletionCallback& callback,
41     ClientSocketFactory* client_socket_factory,
42     HostResolver* host_resolver,
43     ClientSocketHandle* handle,
44     Delegate* delegate,
45     NetLog* pool_net_log,
46     const BoundNetLog& request_net_log)
47     : ConnectJob(group_name,
48                  timeout_duration,
49                  priority,
50                  delegate,
51                  BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
52       helper_(params, client_socket_factory, host_resolver, &connect_timing_),
53       race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN),
54       handle_(handle),
55       callback_(callback),
56       request_net_log_(request_net_log),
57       had_ipv4_(false),
58       had_ipv6_(false) {
59   helper_.SetOnIOComplete(this);
60 }
61 
~WebSocketTransportConnectJob()62 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
63 
GetLoadState() const64 LoadState WebSocketTransportConnectJob::GetLoadState() const {
65   LoadState load_state = LOAD_STATE_RESOLVING_HOST;
66   if (ipv6_job_)
67     load_state = ipv6_job_->GetLoadState();
68   // This method should return LOAD_STATE_CONNECTING in preference to
69   // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET when possible because "waiting for
70   // available socket" implies that nothing is happening.
71   if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
72     load_state = ipv4_job_->GetLoadState();
73   return load_state;
74 }
75 
DoResolveHost()76 int WebSocketTransportConnectJob::DoResolveHost() {
77   return helper_.DoResolveHost(priority(), net_log());
78 }
79 
DoResolveHostComplete(int result)80 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
81   return helper_.DoResolveHostComplete(result, net_log());
82 }
83 
DoTransportConnect()84 int WebSocketTransportConnectJob::DoTransportConnect() {
85   AddressList ipv4_addresses;
86   AddressList ipv6_addresses;
87   int result = ERR_UNEXPECTED;
88   helper_.set_next_state(
89       TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE);
90 
91   for (AddressList::const_iterator it = helper_.addresses().begin();
92        it != helper_.addresses().end();
93        ++it) {
94     switch (it->GetFamily()) {
95       case ADDRESS_FAMILY_IPV4:
96         ipv4_addresses.push_back(*it);
97         break;
98 
99       case ADDRESS_FAMILY_IPV6:
100         ipv6_addresses.push_back(*it);
101         break;
102 
103       default:
104         DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
105         break;
106     }
107   }
108 
109   if (!ipv4_addresses.empty()) {
110     had_ipv4_ = true;
111     ipv4_job_.reset(new WebSocketTransportConnectSubJob(
112         ipv4_addresses, this, SUB_JOB_IPV4));
113   }
114 
115   if (!ipv6_addresses.empty()) {
116     had_ipv6_ = true;
117     ipv6_job_.reset(new WebSocketTransportConnectSubJob(
118         ipv6_addresses, this, SUB_JOB_IPV6));
119     result = ipv6_job_->Start();
120     switch (result) {
121       case OK:
122         SetSocket(ipv6_job_->PassSocket());
123         race_result_ =
124             had_ipv4_
125                 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
126                 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
127         return result;
128 
129       case ERR_IO_PENDING:
130         if (ipv4_job_) {
131           // This use of base::Unretained is safe because |fallback_timer_| is
132           // owned by this object.
133           fallback_timer_.Start(
134               FROM_HERE,
135               TimeDelta::FromMilliseconds(
136                   TransportConnectJobHelper::kIPv6FallbackTimerInMs),
137               base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
138                          base::Unretained(this)));
139         }
140         return result;
141 
142       default:
143         ipv6_job_.reset();
144     }
145   }
146 
147   DCHECK(!ipv6_job_);
148   if (ipv4_job_) {
149     result = ipv4_job_->Start();
150     if (result == OK) {
151       SetSocket(ipv4_job_->PassSocket());
152       race_result_ =
153           had_ipv6_
154               ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
155               : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
156     }
157   }
158 
159   return result;
160 }
161 
DoTransportConnectComplete(int result)162 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
163   if (result == OK)
164     helper_.HistogramDuration(race_result_);
165   return result;
166 }
167 
OnSubJobComplete(int result,WebSocketTransportConnectSubJob * job)168 void WebSocketTransportConnectJob::OnSubJobComplete(
169     int result,
170     WebSocketTransportConnectSubJob* job) {
171   if (result == OK) {
172     switch (job->type()) {
173       case SUB_JOB_IPV4:
174         race_result_ =
175             had_ipv6_
176                 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
177                 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
178         break;
179 
180       case SUB_JOB_IPV6:
181         race_result_ =
182             had_ipv4_
183                 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
184                 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
185         break;
186     }
187     SetSocket(job->PassSocket());
188 
189     // Make sure all connections are cancelled even if this object fails to be
190     // deleted.
191     ipv4_job_.reset();
192     ipv6_job_.reset();
193   } else {
194     switch (job->type()) {
195       case SUB_JOB_IPV4:
196         ipv4_job_.reset();
197         break;
198 
199       case SUB_JOB_IPV6:
200         ipv6_job_.reset();
201         if (ipv4_job_ && !ipv4_job_->started()) {
202           fallback_timer_.Stop();
203           result = ipv4_job_->Start();
204           if (result != ERR_IO_PENDING) {
205             OnSubJobComplete(result, ipv4_job_.get());
206             return;
207           }
208         }
209         break;
210     }
211     if (ipv4_job_ || ipv6_job_)
212       return;
213   }
214   helper_.OnIOComplete(this, result);
215 }
216 
StartIPv4JobAsync()217 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
218   DCHECK(ipv4_job_);
219   int result = ipv4_job_->Start();
220   if (result != ERR_IO_PENDING)
221     OnSubJobComplete(result, ipv4_job_.get());
222 }
223 
ConnectInternal()224 int WebSocketTransportConnectJob::ConnectInternal() {
225   return helper_.DoConnectInternal(this);
226 }
227 
WebSocketTransportClientSocketPool(int max_sockets,int max_sockets_per_group,ClientSocketPoolHistograms * histograms,HostResolver * host_resolver,ClientSocketFactory * client_socket_factory,NetLog * net_log)228 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
229     int max_sockets,
230     int max_sockets_per_group,
231     ClientSocketPoolHistograms* histograms,
232     HostResolver* host_resolver,
233     ClientSocketFactory* client_socket_factory,
234     NetLog* net_log)
235     : TransportClientSocketPool(max_sockets,
236                                 max_sockets_per_group,
237                                 histograms,
238                                 host_resolver,
239                                 client_socket_factory,
240                                 net_log),
241       connect_job_delegate_(this),
242       histograms_(histograms),
243       pool_net_log_(net_log),
244       client_socket_factory_(client_socket_factory),
245       host_resolver_(host_resolver),
246       max_sockets_(max_sockets),
247       handed_out_socket_count_(0),
248       flushing_(false),
249       weak_factory_(this) {}
250 
~WebSocketTransportClientSocketPool()251 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
252   // Clean up any pending connect jobs.
253   FlushWithError(ERR_ABORTED);
254   DCHECK(pending_connects_.empty());
255   DCHECK_EQ(0, handed_out_socket_count_);
256   DCHECK(stalled_request_queue_.empty());
257   DCHECK(stalled_request_map_.empty());
258 }
259 
260 // static
UnlockEndpoint(ClientSocketHandle * handle)261 void WebSocketTransportClientSocketPool::UnlockEndpoint(
262     ClientSocketHandle* handle) {
263   DCHECK(handle->is_initialized());
264   DCHECK(handle->socket());
265   IPEndPoint address;
266   if (handle->socket()->GetPeerAddress(&address) == OK)
267     WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(address);
268 }
269 
RequestSocket(const std::string & group_name,const void * params,RequestPriority priority,ClientSocketHandle * handle,const CompletionCallback & callback,const BoundNetLog & request_net_log)270 int WebSocketTransportClientSocketPool::RequestSocket(
271     const std::string& group_name,
272     const void* params,
273     RequestPriority priority,
274     ClientSocketHandle* handle,
275     const CompletionCallback& callback,
276     const BoundNetLog& request_net_log) {
277   DCHECK(params);
278   const scoped_refptr<TransportSocketParams>& casted_params =
279       *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
280 
281   NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
282 
283   CHECK(!callback.is_null());
284   CHECK(handle);
285 
286   request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
287 
288   if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
289     request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
290     // TODO(ricea): Use emplace_back when C++11 becomes allowed.
291     StalledRequest request(
292         casted_params, priority, handle, callback, request_net_log);
293     stalled_request_queue_.push_back(request);
294     StalledRequestQueue::iterator iterator = stalled_request_queue_.end();
295     --iterator;
296     DCHECK_EQ(handle, iterator->handle);
297     // Because StalledRequestQueue is a std::list, its iterators are guaranteed
298     // to remain valid as long as the elements are not removed. As long as
299     // stalled_request_queue_ and stalled_request_map_ are updated in sync, it
300     // is safe to dereference an iterator in stalled_request_map_ to find the
301     // corresponding list element.
302     stalled_request_map_.insert(
303         StalledRequestMap::value_type(handle, iterator));
304     return ERR_IO_PENDING;
305   }
306 
307   scoped_ptr<WebSocketTransportConnectJob> connect_job(
308       new WebSocketTransportConnectJob(group_name,
309                                        priority,
310                                        casted_params,
311                                        ConnectionTimeout(),
312                                        callback,
313                                        client_socket_factory_,
314                                        host_resolver_,
315                                        handle,
316                                        &connect_job_delegate_,
317                                        pool_net_log_,
318                                        request_net_log));
319 
320   int rv = connect_job->Connect();
321   // Regardless of the outcome of |connect_job|, it will always be bound to
322   // |handle|, since this pool uses early-binding. So the binding is logged
323   // here, without waiting for the result.
324   request_net_log.AddEvent(
325       NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
326       connect_job->net_log().source().ToEventParametersCallback());
327   if (rv == OK) {
328     HandOutSocket(connect_job->PassSocket(),
329                   connect_job->connect_timing(),
330                   handle,
331                   request_net_log);
332     request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
333   } else if (rv == ERR_IO_PENDING) {
334     // TODO(ricea): Implement backup job timer?
335     AddJob(handle, connect_job.Pass());
336   } else {
337     scoped_ptr<StreamSocket> error_socket;
338     connect_job->GetAdditionalErrorState(handle);
339     error_socket = connect_job->PassSocket();
340     if (error_socket) {
341       HandOutSocket(error_socket.Pass(),
342                     connect_job->connect_timing(),
343                     handle,
344                     request_net_log);
345     }
346   }
347 
348   if (rv != ERR_IO_PENDING) {
349     request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
350   }
351 
352   return rv;
353 }
354 
RequestSockets(const std::string & group_name,const void * params,int num_sockets,const BoundNetLog & net_log)355 void WebSocketTransportClientSocketPool::RequestSockets(
356     const std::string& group_name,
357     const void* params,
358     int num_sockets,
359     const BoundNetLog& net_log) {
360   NOTIMPLEMENTED();
361 }
362 
CancelRequest(const std::string & group_name,ClientSocketHandle * handle)363 void WebSocketTransportClientSocketPool::CancelRequest(
364     const std::string& group_name,
365     ClientSocketHandle* handle) {
366   DCHECK(!handle->is_initialized());
367   if (DeleteStalledRequest(handle))
368     return;
369   scoped_ptr<StreamSocket> socket = handle->PassSocket();
370   if (socket)
371     ReleaseSocket(handle->group_name(), socket.Pass(), handle->id());
372   if (!DeleteJob(handle))
373     pending_callbacks_.erase(handle);
374   if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty())
375     ActivateStalledRequest();
376 }
377 
ReleaseSocket(const std::string & group_name,scoped_ptr<StreamSocket> socket,int id)378 void WebSocketTransportClientSocketPool::ReleaseSocket(
379     const std::string& group_name,
380     scoped_ptr<StreamSocket> socket,
381     int id) {
382   WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
383   CHECK_GT(handed_out_socket_count_, 0);
384   --handed_out_socket_count_;
385   if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty())
386     ActivateStalledRequest();
387 }
388 
FlushWithError(int error)389 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
390   // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking
391   // sockets waiting for the endpoint lock. If they connected synchronously,
392   // then OnConnectJobComplete(). The |flushing_| flag tells this object to
393   // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those
394   // calls because this method will delete the jobs and call their callbacks
395   // anyway.
396   flushing_ = true;
397   for (PendingConnectsMap::iterator it = pending_connects_.begin();
398        it != pending_connects_.end();
399        ++it) {
400     InvokeUserCallbackLater(
401         it->second->handle(), it->second->callback(), error);
402     delete it->second, it->second = NULL;
403   }
404   pending_connects_.clear();
405   for (StalledRequestQueue::iterator it = stalled_request_queue_.begin();
406        it != stalled_request_queue_.end();
407        ++it) {
408     InvokeUserCallbackLater(it->handle, it->callback, error);
409   }
410   stalled_request_map_.clear();
411   stalled_request_queue_.clear();
412   flushing_ = false;
413 }
414 
CloseIdleSockets()415 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
416   // We have no idle sockets.
417 }
418 
IdleSocketCount() const419 int WebSocketTransportClientSocketPool::IdleSocketCount() const {
420   return 0;
421 }
422 
IdleSocketCountInGroup(const std::string & group_name) const423 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
424     const std::string& group_name) const {
425   return 0;
426 }
427 
GetLoadState(const std::string & group_name,const ClientSocketHandle * handle) const428 LoadState WebSocketTransportClientSocketPool::GetLoadState(
429     const std::string& group_name,
430     const ClientSocketHandle* handle) const {
431   if (stalled_request_map_.find(handle) != stalled_request_map_.end())
432     return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
433   if (pending_callbacks_.count(handle))
434     return LOAD_STATE_CONNECTING;
435   return LookupConnectJob(handle)->GetLoadState();
436 }
437 
GetInfoAsValue(const std::string & name,const std::string & type,bool include_nested_pools) const438 base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
439     const std::string& name,
440     const std::string& type,
441     bool include_nested_pools) const {
442   base::DictionaryValue* dict = new base::DictionaryValue();
443   dict->SetString("name", name);
444   dict->SetString("type", type);
445   dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
446   dict->SetInteger("connecting_socket_count", pending_connects_.size());
447   dict->SetInteger("idle_socket_count", 0);
448   dict->SetInteger("max_socket_count", max_sockets_);
449   dict->SetInteger("max_sockets_per_group", max_sockets_);
450   dict->SetInteger("pool_generation_number", 0);
451   return dict;
452 }
453 
ConnectionTimeout() const454 TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
455   return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
456 }
457 
histograms() const458 ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
459     const {
460   return histograms_;
461 }
462 
IsStalled() const463 bool WebSocketTransportClientSocketPool::IsStalled() const {
464   return !stalled_request_queue_.empty();
465 }
466 
OnConnectJobComplete(int result,WebSocketTransportConnectJob * job)467 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
468     int result,
469     WebSocketTransportConnectJob* job) {
470   DCHECK_NE(ERR_IO_PENDING, result);
471 
472   scoped_ptr<StreamSocket> socket = job->PassSocket();
473 
474   // See comment in FlushWithError.
475   if (flushing_) {
476     WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
477     return;
478   }
479 
480   BoundNetLog request_net_log = job->request_net_log();
481   CompletionCallback callback = job->callback();
482   LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
483 
484   ClientSocketHandle* const handle = job->handle();
485   bool handed_out_socket = false;
486 
487   if (result == OK) {
488     DCHECK(socket.get());
489     handed_out_socket = true;
490     HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
491     request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
492   } else {
493     // If we got a socket, it must contain error information so pass that
494     // up so that the caller can retrieve it.
495     job->GetAdditionalErrorState(handle);
496     if (socket.get()) {
497       handed_out_socket = true;
498       HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
499     }
500     request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
501   }
502   bool delete_succeeded = DeleteJob(handle);
503   DCHECK(delete_succeeded);
504   if (!handed_out_socket && !stalled_request_queue_.empty() &&
505       !ReachedMaxSocketsLimit())
506     ActivateStalledRequest();
507   InvokeUserCallbackLater(handle, callback, result);
508 }
509 
InvokeUserCallbackLater(ClientSocketHandle * handle,const CompletionCallback & callback,int rv)510 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
511     ClientSocketHandle* handle,
512     const CompletionCallback& callback,
513     int rv) {
514   DCHECK(!pending_callbacks_.count(handle));
515   pending_callbacks_.insert(handle);
516   base::MessageLoop::current()->PostTask(
517       FROM_HERE,
518       base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
519                  weak_factory_.GetWeakPtr(),
520                  handle,
521                  callback,
522                  rv));
523 }
524 
InvokeUserCallback(ClientSocketHandle * handle,const CompletionCallback & callback,int rv)525 void WebSocketTransportClientSocketPool::InvokeUserCallback(
526     ClientSocketHandle* handle,
527     const CompletionCallback& callback,
528     int rv) {
529   if (pending_callbacks_.erase(handle))
530     callback.Run(rv);
531 }
532 
ReachedMaxSocketsLimit() const533 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
534   return handed_out_socket_count_ >= max_sockets_ ||
535          base::checked_cast<int>(pending_connects_.size()) >=
536              max_sockets_ - handed_out_socket_count_;
537 }
538 
HandOutSocket(scoped_ptr<StreamSocket> socket,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,const BoundNetLog & net_log)539 void WebSocketTransportClientSocketPool::HandOutSocket(
540     scoped_ptr<StreamSocket> socket,
541     const LoadTimingInfo::ConnectTiming& connect_timing,
542     ClientSocketHandle* handle,
543     const BoundNetLog& net_log) {
544   DCHECK(socket);
545   handle->SetSocket(socket.Pass());
546   DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type());
547   DCHECK_EQ(0, handle->idle_time().InMicroseconds());
548   handle->set_pool_id(0);
549   handle->set_connect_timing(connect_timing);
550 
551   net_log.AddEvent(
552       NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
553       handle->socket()->NetLog().source().ToEventParametersCallback());
554 
555   ++handed_out_socket_count_;
556 }
557 
AddJob(ClientSocketHandle * handle,scoped_ptr<WebSocketTransportConnectJob> connect_job)558 void WebSocketTransportClientSocketPool::AddJob(
559     ClientSocketHandle* handle,
560     scoped_ptr<WebSocketTransportConnectJob> connect_job) {
561   bool inserted =
562       pending_connects_.insert(PendingConnectsMap::value_type(
563                                    handle, connect_job.release())).second;
564   DCHECK(inserted);
565 }
566 
DeleteJob(ClientSocketHandle * handle)567 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
568   PendingConnectsMap::iterator it = pending_connects_.find(handle);
569   if (it == pending_connects_.end())
570     return false;
571   // Deleting a ConnectJob which holds an endpoint lock can lead to a different
572   // ConnectJob proceeding to connect. If the connect proceeds synchronously
573   // (usually because of a failure) then it can trigger that job to be
574   // deleted. |it| remains valid because std::map guarantees that erase() does
575   // not invalid iterators to other entries.
576   delete it->second, it->second = NULL;
577   DCHECK(pending_connects_.find(handle) == it);
578   pending_connects_.erase(it);
579   return true;
580 }
581 
582 const WebSocketTransportConnectJob*
LookupConnectJob(const ClientSocketHandle * handle) const583 WebSocketTransportClientSocketPool::LookupConnectJob(
584     const ClientSocketHandle* handle) const {
585   PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
586   CHECK(it != pending_connects_.end());
587   return it->second;
588 }
589 
ActivateStalledRequest()590 void WebSocketTransportClientSocketPool::ActivateStalledRequest() {
591   DCHECK(!stalled_request_queue_.empty());
592   DCHECK(!ReachedMaxSocketsLimit());
593   // Usually we will only be able to activate one stalled request at a time,
594   // however if all the connects fail synchronously for some reason, we may be
595   // able to clear the whole queue at once.
596   while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) {
597     StalledRequest request(stalled_request_queue_.front());
598     stalled_request_queue_.pop_front();
599     stalled_request_map_.erase(request.handle);
600     int rv = RequestSocket("ignored",
601                            &request.params,
602                            request.priority,
603                            request.handle,
604                            request.callback,
605                            request.net_log);
606     // ActivateStalledRequest() never returns synchronously, so it is never
607     // called re-entrantly.
608     if (rv != ERR_IO_PENDING)
609       InvokeUserCallbackLater(request.handle, request.callback, rv);
610   }
611 }
612 
DeleteStalledRequest(ClientSocketHandle * handle)613 bool WebSocketTransportClientSocketPool::DeleteStalledRequest(
614     ClientSocketHandle* handle) {
615   StalledRequestMap::iterator it = stalled_request_map_.find(handle);
616   if (it == stalled_request_map_.end())
617     return false;
618   stalled_request_queue_.erase(it->second);
619   stalled_request_map_.erase(it);
620   return true;
621 }
622 
ConnectJobDelegate(WebSocketTransportClientSocketPool * owner)623 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
624     WebSocketTransportClientSocketPool* owner)
625     : owner_(owner) {}
626 
~ConnectJobDelegate()627 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
628 
629 void
OnConnectJobComplete(int result,ConnectJob * job)630 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
631     int result,
632     ConnectJob* job) {
633   owner_->OnConnectJobComplete(result,
634                                static_cast<WebSocketTransportConnectJob*>(job));
635 }
636 
StalledRequest(const scoped_refptr<TransportSocketParams> & params,RequestPriority priority,ClientSocketHandle * handle,const CompletionCallback & callback,const BoundNetLog & net_log)637 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest(
638     const scoped_refptr<TransportSocketParams>& params,
639     RequestPriority priority,
640     ClientSocketHandle* handle,
641     const CompletionCallback& callback,
642     const BoundNetLog& net_log)
643     : params(params),
644       priority(priority),
645       handle(handle),
646       callback(callback),
647       net_log(net_log) {}
648 
~StalledRequest()649 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {}
650 
651 }  // namespace net
652