• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/socket/transport_client_socket_pool.h"
6 
7 #include <utility>
8 
9 #include "base/auto_reset.h"
10 #include "base/barrier_closure.h"
11 #include "base/check_op.h"
12 #include "base/compiler_specific.h"
13 #include "base/containers/contains.h"
14 #include "base/format_macros.h"
15 #include "base/functional/bind.h"
16 #include "base/functional/callback_helpers.h"
17 #include "base/location.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/notreached.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/strings/string_util.h"
23 #include "base/task/single_thread_task_runner.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "net/base/host_port_pair.h"
27 #include "net/base/net_errors.h"
28 #include "net/base/proxy_chain.h"
29 #include "net/base/proxy_server.h"
30 #include "net/log/net_log.h"
31 #include "net/log/net_log_event_type.h"
32 #include "net/log/net_log_source.h"
33 #include "net/socket/connect_job_factory.h"
34 #include "net/traffic_annotation/network_traffic_annotation.h"
35 #include "url/gurl.h"
36 
37 namespace net {
38 
39 namespace {
40 
41 // Indicate whether or not we should establish a new transport layer connection
42 // after a certain timeout has passed without receiving an ACK.
43 bool g_connect_backup_jobs_enabled = true;
44 
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)45 base::Value::Dict NetLogCreateConnectJobParams(
46     bool backup_job,
47     const ClientSocketPool::GroupId* group_id) {
48   base::Value::Dict dict;
49   dict.Set("backup_job", backup_job);
50   dict.Set("group_id", group_id->ToString());
51   return dict;
52 }
53 
54 }  // namespace
55 
56 const char TransportClientSocketPool::kCertDatabaseChanged[] =
57     "Cert database changed";
58 const char TransportClientSocketPool::kCertVerifierChanged[] =
59     "Cert verifier changed";
60 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
61     "Connection was closed when it was returned to the pool";
62 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
63     "Data received unexpectedly";
64 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
65     "Idle time limit expired";
66 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
67 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
68     "Remote side closed connection";
69 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
70     "Socket generation out of date";
71 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
72     "Socket pool destroyed";
73 const char TransportClientSocketPool::kSslConfigChanged[] =
74     "SSL configuration changed";
75 
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)76 TransportClientSocketPool::Request::Request(
77     ClientSocketHandle* handle,
78     CompletionOnceCallback callback,
79     const ProxyAuthCallback& proxy_auth_callback,
80     RequestPriority priority,
81     const SocketTag& socket_tag,
82     RespectLimits respect_limits,
83     Flags flags,
84     scoped_refptr<SocketParams> socket_params,
85     const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
86     const NetLogWithSource& net_log)
87     : handle_(handle),
88       callback_(std::move(callback)),
89       proxy_auth_callback_(proxy_auth_callback),
90       priority_(priority),
91       respect_limits_(respect_limits),
92       flags_(flags),
93       socket_params_(std::move(socket_params)),
94       proxy_annotation_tag_(proxy_annotation_tag),
95       net_log_(net_log),
96       socket_tag_(socket_tag) {
97   if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
98     DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
99 }
100 
101 TransportClientSocketPool::Request::~Request() = default;
102 
AssignJob(ConnectJob * job)103 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
104   DCHECK(job);
105   DCHECK(!job_);
106   job_ = job;
107   if (job_->priority() != priority_)
108     job_->ChangePriority(priority_);
109 }
110 
ReleaseJob()111 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
112   DCHECK(job_);
113   ConnectJob* job = job_;
114   job_ = nullptr;
115   return job;
116 }
117 
118 struct TransportClientSocketPool::IdleSocket {
119   // An idle socket can't be used if it is disconnected or has been used
120   // before and has received data unexpectedly (hence no longer idle).  The
121   // unread data would be mistaken for the beginning of the next response if
122   // we were to use the socket for a new request.
123   //
124   // Note that a socket that has never been used before (like a preconnected
125   // socket) may be used even with unread data.  This may be, e.g., a SPDY
126   // SETTINGS frame.
127   //
128   // If the socket is not usable, |net_log_reason_utf8| is set to a string
129   // indicating why the socket is not usable.
130   bool IsUsable(const char** net_log_reason_utf8) const;
131 
132   std::unique_ptr<StreamSocket> socket;
133   base::TimeTicks start_time;
134 };
135 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)136 TransportClientSocketPool::TransportClientSocketPool(
137     int max_sockets,
138     int max_sockets_per_group,
139     base::TimeDelta unused_idle_socket_timeout,
140     const ProxyChain& proxy_chain,
141     bool is_for_websockets,
142     const CommonConnectJobParams* common_connect_job_params,
143     bool cleanup_on_ip_address_change)
144     : TransportClientSocketPool(max_sockets,
145                                 max_sockets_per_group,
146                                 unused_idle_socket_timeout,
147                                 ClientSocketPool::used_idle_socket_timeout(),
148                                 proxy_chain,
149                                 is_for_websockets,
150                                 common_connect_job_params,
151                                 cleanup_on_ip_address_change,
152                                 std::make_unique<ConnectJobFactory>(),
153                                 common_connect_job_params->ssl_client_context,
154                                 /*connect_backup_jobs_enabled=*/true) {}
155 
~TransportClientSocketPool()156 TransportClientSocketPool::~TransportClientSocketPool() {
157   // Clean up any idle sockets and pending connect jobs.  Assert that we have no
158   // remaining active sockets or pending requests.  They should have all been
159   // cleaned up prior to |this| being destroyed.
160   FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
161   DCHECK(group_map_.empty());
162   DCHECK(pending_callback_map_.empty());
163   DCHECK_EQ(0, connecting_socket_count_);
164   DCHECK_EQ(0, handed_out_socket_count_);
165   CHECK(higher_pools_.empty());
166 
167   if (ssl_client_context_)
168     ssl_client_context_->RemoveObserver(this);
169 
170   if (cleanup_on_ip_address_change_)
171     NetworkChangeNotifier::RemoveIPAddressObserver(this);
172 }
173 
174 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)175 TransportClientSocketPool::CreateForTesting(
176     int max_sockets,
177     int max_sockets_per_group,
178     base::TimeDelta unused_idle_socket_timeout,
179     base::TimeDelta used_idle_socket_timeout,
180     const ProxyChain& proxy_chain,
181     bool is_for_websockets,
182     const CommonConnectJobParams* common_connect_job_params,
183     std::unique_ptr<ConnectJobFactory> connect_job_factory,
184     SSLClientContext* ssl_client_context,
185     bool connect_backup_jobs_enabled) {
186   return base::WrapUnique<TransportClientSocketPool>(
187       new TransportClientSocketPool(
188           max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
189           used_idle_socket_timeout, proxy_chain, is_for_websockets,
190           common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
191           std::move(connect_job_factory), ssl_client_context,
192           connect_backup_jobs_enabled));
193 }
194 
CallbackResultPair()195 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
196     : result(OK) {}
197 
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)198 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
199     CompletionOnceCallback callback_in,
200     int result_in)
201     : callback(std::move(callback_in)), result(result_in) {}
202 
203 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
204     TransportClientSocketPool::CallbackResultPair&& other) = default;
205 
206 TransportClientSocketPool::CallbackResultPair&
207 TransportClientSocketPool::CallbackResultPair::operator=(
208     TransportClientSocketPool::CallbackResultPair&& other) = default;
209 
210 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
211 
IsStalled() const212 bool TransportClientSocketPool::IsStalled() const {
213   // If fewer than |max_sockets_| are in use, then clearly |this| is not
214   // stalled.
215   if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
216     return false;
217   // So in order to be stalled, |this| must be using at least |max_sockets_| AND
218   // |this| must have a request that is actually stalled on the global socket
219   // limit.  To find such a request, look for a group that has more requests
220   // than jobs AND where the number of sockets is less than
221   // |max_sockets_per_group_|.  (If the number of sockets is equal to
222   // |max_sockets_per_group_|, then the request is stalled on the group limit,
223   // which does not count.)
224   for (const auto& it : group_map_) {
225     if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
226       return true;
227   }
228   return false;
229 }
230 
AddHigherLayeredPool(HigherLayeredPool * higher_pool)231 void TransportClientSocketPool::AddHigherLayeredPool(
232     HigherLayeredPool* higher_pool) {
233   CHECK(higher_pool);
234   CHECK(!base::Contains(higher_pools_, higher_pool));
235   higher_pools_.insert(higher_pool);
236 }
237 
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)238 void TransportClientSocketPool::RemoveHigherLayeredPool(
239     HigherLayeredPool* higher_pool) {
240   CHECK(higher_pool);
241   CHECK(base::Contains(higher_pools_, higher_pool));
242   higher_pools_.erase(higher_pool);
243 }
244 
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)245 int TransportClientSocketPool::RequestSocket(
246     const GroupId& group_id,
247     scoped_refptr<SocketParams> params,
248     const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
249     RequestPriority priority,
250     const SocketTag& socket_tag,
251     RespectLimits respect_limits,
252     ClientSocketHandle* handle,
253     CompletionOnceCallback callback,
254     const ProxyAuthCallback& proxy_auth_callback,
255     const NetLogWithSource& net_log) {
256   CHECK(callback);
257   CHECK(handle);
258 
259   NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
260 
261   std::unique_ptr<Request> request = std::make_unique<Request>(
262       handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
263       respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
264 
265   // Cleanup any timed-out idle sockets.
266   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
267 
268   request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
269 
270   int rv =
271       RequestSocketInternal(group_id, *request,
272                             /*preconnect_done_closure=*/base::OnceClosure());
273   if (rv != ERR_IO_PENDING) {
274     if (rv == OK) {
275       request->handle()->socket()->ApplySocketTag(request->socket_tag());
276     }
277     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
278                                                 rv);
279     CHECK(!request->handle()->is_initialized());
280     request.reset();
281   } else {
282     Group* group = GetOrCreateGroup(group_id);
283     group->InsertUnboundRequest(std::move(request));
284     // Have to do this asynchronously, as closing sockets in higher level pools
285     // call back in to |this|, which will cause all sorts of fun and exciting
286     // re-entrancy issues if the socket pool is doing something else at the
287     // time.
288     if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
289       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
290           FROM_HERE,
291           base::BindOnce(
292               &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
293               weak_factory_.GetWeakPtr()));
294     }
295   }
296   return rv;
297 }
298 
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)299 int TransportClientSocketPool::RequestSockets(
300     const GroupId& group_id,
301     scoped_refptr<SocketParams> params,
302     const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
303     int num_sockets,
304     CompletionOnceCallback callback,
305     const NetLogWithSource& net_log) {
306   // TODO(eroman): Split out the host and port parameters.
307   net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
308                    [&] { return NetLogGroupIdParams(group_id); });
309 
310   Request request(nullptr /* no handle */, CompletionOnceCallback(),
311                   ProxyAuthCallback(), IDLE, SocketTag(),
312                   RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
313                   proxy_annotation_tag, net_log);
314 
315   // Cleanup any timed-out idle sockets.
316   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
317 
318   if (num_sockets > max_sockets_per_group_) {
319     num_sockets = max_sockets_per_group_;
320   }
321 
322   request.net_log().BeginEventWithIntParams(
323       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
324       num_sockets);
325 
326   Group* group = GetOrCreateGroup(group_id);
327 
328   // RequestSocketsInternal() may delete the group.
329   bool deleted_group = false;
330 
331   int rv = OK;
332 
333   base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
334       num_sockets,
335       base::BindOnce(
336           [](CompletionOnceCallback callback) {
337             base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
338                 FROM_HERE, base::BindOnce(std::move(callback), OK));
339           },
340           std::move(callback)));
341   int pending_connect_job_count = 0;
342   for (int num_iterations_left = num_sockets;
343        group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
344        num_iterations_left--) {
345     rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
346     if (rv == ERR_IO_PENDING) {
347       ++pending_connect_job_count;
348     }
349     if (rv < 0 && rv != ERR_IO_PENDING) {
350       // We're encountering a synchronous error.  Give up.
351       if (!base::Contains(group_map_, group_id))
352         deleted_group = true;
353       break;
354     }
355     if (!base::Contains(group_map_, group_id)) {
356       // Unexpected.  The group should only be getting deleted on synchronous
357       // error.
358       NOTREACHED();
359       deleted_group = true;
360       break;
361     }
362   }
363 
364   if (!deleted_group && group->IsEmpty())
365     RemoveGroup(group_id);
366 
367   if (rv == ERR_IO_PENDING)
368     rv = OK;
369   request.net_log().EndEventWithNetErrorCode(
370       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
371 
372   // Currently we don't handle preconnect errors. So this method returns OK even
373   // if failed to preconnect.
374   // TODO(crbug.com/1330235): Consider support error handlings when needed.
375   if (pending_connect_job_count == 0)
376     return OK;
377   for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
378     preconnect_done_closure.Run();
379   }
380 
381   return ERR_IO_PENDING;
382 }
383 
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)384 int TransportClientSocketPool::RequestSocketInternal(
385     const GroupId& group_id,
386     const Request& request,
387     base::OnceClosure preconnect_done_closure) {
388 #if DCHECK_IS_ON()
389   DCHECK(!request_in_process_);
390   base::AutoReset<bool> auto_reset(&request_in_process_, true);
391 #endif  // DCHECK_IS_ON()
392 
393   ClientSocketHandle* const handle = request.handle();
394   const bool preconnecting = !handle;
395   DCHECK_EQ(preconnecting, !!preconnect_done_closure);
396 
397   Group* group = nullptr;
398   auto group_it = group_map_.find(group_id);
399   if (group_it != group_map_.end()) {
400     group = group_it->second;
401 
402     if (!(request.flags() & NO_IDLE_SOCKETS)) {
403       // Try to reuse a socket.
404       if (AssignIdleSocketToRequest(request, group))
405         return OK;
406     }
407 
408     // If there are more ConnectJobs than pending requests, don't need to do
409     // anything.  Can just wait for the extra job to connect, and then assign it
410     // to the request.
411     if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
412       return ERR_IO_PENDING;
413 
414     // Can we make another active socket now?
415     if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
416         request.respect_limits() == RespectLimits::ENABLED) {
417       // TODO(willchan): Consider whether or not we need to close a socket in a
418       // higher layered group. I don't think this makes sense since we would
419       // just reuse that socket then if we needed one and wouldn't make it down
420       // to this layer.
421       request.net_log().AddEvent(
422           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
423       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
424     }
425   }
426 
427   if (ReachedMaxSocketsLimit() &&
428       request.respect_limits() == RespectLimits::ENABLED) {
429     // NOTE(mmenke):  Wonder if we really need different code for each case
430     // here.  Only reason for them now seems to be preconnects.
431     if (idle_socket_count_ > 0) {
432       // There's an idle socket in this pool. Either that's because there's
433       // still one in this group, but we got here due to preconnecting
434       // bypassing idle sockets, or because there's an idle socket in another
435       // group.
436       bool closed = CloseOneIdleSocketExceptInGroup(group);
437       if (preconnecting && !closed)
438         return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
439     } else {
440       // We could check if we really have a stalled group here, but it
441       // requires a scan of all groups, so just flip a flag here, and do the
442       // check later.
443       request.net_log().AddEvent(
444           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
445       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
446     }
447   }
448 
449   // We couldn't find a socket to reuse, and there's space to allocate one,
450   // so allocate and connect a new one.
451   group = GetOrCreateGroup(group_id);
452   std::unique_ptr<ConnectJob> connect_job(
453       CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
454                        request.proxy_annotation_tag(), request.priority(),
455                        request.socket_tag(), group));
456   connect_job->net_log().AddEvent(
457       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
458         return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
459       });
460 
461   int rv = connect_job->Connect();
462   if (rv == ERR_IO_PENDING) {
463     if (preconnect_done_closure) {
464       DCHECK(preconnecting);
465       connect_job->set_done_closure(std::move(preconnect_done_closure));
466     }
467     // If we didn't have any sockets in this group, set a timer for potentially
468     // creating a new one.  If the SYN is lost, this backup socket may complete
469     // before the slow socket, improving end user latency.
470     if (connect_backup_jobs_enabled_ && group->IsEmpty())
471       group->StartBackupJobTimer(group_id);
472     group->AddJob(std::move(connect_job), preconnecting);
473     connecting_socket_count_++;
474     return rv;
475   }
476 
477   LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
478   if (preconnecting) {
479     if (rv == OK)
480       AddIdleSocket(connect_job->PassSocket(), group);
481   } else {
482     DCHECK(handle);
483     if (rv != OK)
484       handle->SetAdditionalErrorState(connect_job.get());
485     std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
486     if (socket) {
487       HandOutSocket(std::move(socket), ClientSocketHandle::UNUSED,
488                     connect_job->connect_timing(), handle,
489                     base::TimeDelta() /* idle_time */, group,
490                     request.net_log());
491     }
492   }
493   if (group->IsEmpty())
494     RemoveGroup(group_id);
495 
496   return rv;
497 }
498 
AssignIdleSocketToRequest(const Request & request,Group * group)499 bool TransportClientSocketPool::AssignIdleSocketToRequest(
500     const Request& request,
501     Group* group) {
502   std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
503   auto idle_socket_it = idle_sockets->end();
504 
505   // Iterate through the idle sockets forwards (oldest to newest)
506   //   * Delete any disconnected ones.
507   //   * If we find a used idle socket, assign to |idle_socket|.  At the end,
508   //   the |idle_socket_it| will be set to the newest used idle socket.
509   for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
510     // Check whether socket is usable. Note that it's unlikely that the socket
511     // is not usable because this function is always invoked after a
512     // reusability check, but in theory socket can be closed asynchronously.
513     const char* net_log_reason_utf8;
514     if (!it->IsUsable(&net_log_reason_utf8)) {
515       it->socket->NetLog().AddEventWithStringParams(
516           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
517           net_log_reason_utf8);
518       DecrementIdleCount();
519       it = idle_sockets->erase(it);
520       continue;
521     }
522 
523     if (it->socket->WasEverUsed()) {
524       // We found one we can reuse!
525       idle_socket_it = it;
526     }
527 
528     ++it;
529   }
530 
531   // If we haven't found an idle socket, that means there are no used idle
532   // sockets.  Pick the oldest (first) idle socket (FIFO).
533 
534   if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
535     idle_socket_it = idle_sockets->begin();
536 
537   if (idle_socket_it != idle_sockets->end()) {
538     DecrementIdleCount();
539     base::TimeDelta idle_time =
540         base::TimeTicks::Now() - idle_socket_it->start_time;
541     std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
542     idle_sockets->erase(idle_socket_it);
543     // TODO(davidben): If |idle_time| is under some low watermark, consider
544     // treating as UNUSED rather than UNUSED_IDLE. This will avoid
545     // HttpNetworkTransaction retrying on some errors.
546     ClientSocketHandle::SocketReuseType reuse_type =
547         socket->WasEverUsed() ? ClientSocketHandle::REUSED_IDLE
548                               : ClientSocketHandle::UNUSED_IDLE;
549 
550     HandOutSocket(std::move(socket), reuse_type,
551                   LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552                   group, request.net_log());
553     return true;
554   }
555 
556   return false;
557 }
558 
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561     const NetLogSource& connect_job_source,
562     const Request& request) {
563   request.net_log().AddEventReferencingSource(
564       NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566 
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568                                             ClientSocketHandle* handle,
569                                             RequestPriority priority) {
570   auto group_it = group_map_.find(group_id);
571   if (group_it == group_map_.end()) {
572     DCHECK(base::Contains(pending_callback_map_, handle));
573     // The Request has already completed and been destroyed; nothing to
574     // reprioritize.
575     return;
576   }
577 
578   group_it->second->SetPriority(handle, priority);
579 }
580 
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582                                               ClientSocketHandle* handle,
583                                               bool cancel_connect_job) {
584   auto callback_it = pending_callback_map_.find(handle);
585   if (callback_it != pending_callback_map_.end()) {
586     int result = callback_it->second.result;
587     pending_callback_map_.erase(callback_it);
588     std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589     if (socket) {
590       if (result != OK) {
591         socket->Disconnect();
592       } else if (cancel_connect_job) {
593         // Close the socket if |cancel_connect_job| is true and there are no
594         // other pending requests.
595         Group* group = GetOrCreateGroup(group_id);
596         if (group->unbound_request_count() == 0)
597           socket->Disconnect();
598       }
599       ReleaseSocket(handle->group_id(), std::move(socket),
600                     handle->group_generation());
601     }
602     return;
603   }
604 
605   CHECK(base::Contains(group_map_, group_id));
606   Group* group = GetOrCreateGroup(group_id);
607 
608   std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609   if (request) {
610     --connecting_socket_count_;
611     OnAvailableSocketSlot(group_id, group);
612     CheckForStalledSocketGroups();
613     return;
614   }
615 
616   // Search |unbound_requests_| for matching handle.
617   request = group->FindAndRemoveUnboundRequest(handle);
618   if (request) {
619     request->net_log().AddEvent(NetLogEventType::CANCELLED);
620     request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621 
622     // Let the job run, unless |cancel_connect_job| is true, or we're at the
623     // socket limit and there are no other requests waiting on the job.
624     bool reached_limit = ReachedMaxSocketsLimit();
625     if (group->jobs().size() > group->unbound_request_count() &&
626         (cancel_connect_job || reached_limit)) {
627       RemoveConnectJob(group->jobs().begin()->get(), group);
628       if (group->IsEmpty())
629         RemoveGroup(group->group_id());
630       if (reached_limit)
631         CheckForStalledSocketGroups();
632     }
633   }
634 }
635 
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637     const char* net_log_reason_utf8) {
638   CleanupIdleSockets(true, net_log_reason_utf8);
639   DCHECK_EQ(0, idle_socket_count_);
640 }
641 
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643     const GroupId& group_id,
644     const char* net_log_reason_utf8) {
645   if (idle_socket_count_ == 0)
646     return;
647   auto it = group_map_.find(group_id);
648   if (it == group_map_.end())
649     return;
650   CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651                             net_log_reason_utf8);
652   if (it->second->IsEmpty())
653     RemoveGroup(it);
654 }
655 
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657   return idle_socket_count_;
658 }
659 
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661     const GroupId& group_id) const {
662   auto i = group_map_.find(group_id);
663   CHECK(i != group_map_.end());
664 
665   return i->second->idle_sockets().size();
666 }
667 
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669     const GroupId& group_id,
670     const ClientSocketHandle* handle) const {
671   if (base::Contains(pending_callback_map_, handle))
672     return LOAD_STATE_CONNECTING;
673 
674   auto group_it = group_map_.find(group_id);
675   if (group_it == group_map_.end()) {
676     // TODO(mmenke):  This is actually reached in the wild, for unknown reasons.
677     // Would be great to understand why, and if it's a bug, fix it.  If not,
678     // should have a test for that case.
679     NOTREACHED();
680     return LOAD_STATE_IDLE;
681   }
682 
683   const Group& group = *group_it->second;
684   ConnectJob* job = group.GetConnectJobForHandle(handle);
685   if (job)
686     return job->GetLoadState();
687 
688   if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
689     return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
690   return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
691 }
692 
GetInfoAsValue(const std::string & name,const std::string & type) const693 base::Value TransportClientSocketPool::GetInfoAsValue(
694     const std::string& name,
695     const std::string& type) const {
696   // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
697   base::Value::Dict dict;
698   dict.Set("name", name);
699   dict.Set("type", type);
700   dict.Set("handed_out_socket_count", handed_out_socket_count_);
701   dict.Set("connecting_socket_count", connecting_socket_count_);
702   dict.Set("idle_socket_count", idle_socket_count_);
703   dict.Set("max_socket_count", max_sockets_);
704   dict.Set("max_sockets_per_group", max_sockets_per_group_);
705 
706   if (group_map_.empty())
707     return base::Value(std::move(dict));
708 
709   base::Value::Dict all_groups_dict;
710   for (const auto& entry : group_map_) {
711     const Group* group = entry.second;
712     base::Value::Dict group_dict;
713 
714     group_dict.Set("pending_request_count",
715                    static_cast<int>(group->unbound_request_count()));
716     if (group->has_unbound_requests()) {
717       group_dict.Set("top_pending_priority",
718                      RequestPriorityToString(group->TopPendingPriority()));
719     }
720 
721     group_dict.Set("active_socket_count", group->active_socket_count());
722 
723     base::Value::List idle_socket_list;
724     for (const auto& idle_socket : group->idle_sockets()) {
725       int source_id = idle_socket.socket->NetLog().source().id;
726       idle_socket_list.Append(source_id);
727     }
728     group_dict.Set("idle_sockets", std::move(idle_socket_list));
729 
730     base::Value::List connect_jobs_list;
731     for (const auto& job : group->jobs()) {
732       int source_id = job->net_log().source().id;
733       connect_jobs_list.Append(source_id);
734     }
735     group_dict.Set("connect_jobs", std::move(connect_jobs_list));
736 
737     group_dict.Set("is_stalled",
738                    group->CanUseAdditionalSocketSlot(max_sockets_per_group_));
739     group_dict.Set("backup_job_timer_is_running",
740                    group->BackupJobTimerIsRunning());
741 
742     all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
743   }
744   dict.Set("groups", std::move(all_groups_dict));
745   return base::Value(std::move(dict));
746 }
747 
HasActiveSocket(const GroupId & group_id) const748 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
749   return HasGroup(group_id);
750 }
751 
IsUsable(const char ** net_log_reason_utf8) const752 bool TransportClientSocketPool::IdleSocket::IsUsable(
753     const char** net_log_reason_utf8) const {
754   DCHECK(net_log_reason_utf8);
755   if (socket->WasEverUsed()) {
756     if (!socket->IsConnectedAndIdle()) {
757       if (!socket->IsConnected()) {
758         *net_log_reason_utf8 = kRemoteSideClosedConnection;
759       } else {
760         *net_log_reason_utf8 = kDataReceivedUnexpectedly;
761       }
762       return false;
763     }
764     return true;
765   }
766 
767   if (!socket->IsConnected()) {
768     *net_log_reason_utf8 = kRemoteSideClosedConnection;
769     return false;
770   }
771   return true;
772 }
773 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)774 TransportClientSocketPool::TransportClientSocketPool(
775     int max_sockets,
776     int max_sockets_per_group,
777     base::TimeDelta unused_idle_socket_timeout,
778     base::TimeDelta used_idle_socket_timeout,
779     const ProxyChain& proxy_chain,
780     bool is_for_websockets,
781     const CommonConnectJobParams* common_connect_job_params,
782     bool cleanup_on_ip_address_change,
783     std::unique_ptr<ConnectJobFactory> connect_job_factory,
784     SSLClientContext* ssl_client_context,
785     bool connect_backup_jobs_enabled)
786     : ClientSocketPool(is_for_websockets,
787                        common_connect_job_params,
788                        std::move(connect_job_factory)),
789       max_sockets_(max_sockets),
790       max_sockets_per_group_(max_sockets_per_group),
791       unused_idle_socket_timeout_(unused_idle_socket_timeout),
792       used_idle_socket_timeout_(used_idle_socket_timeout),
793       proxy_chain_(proxy_chain),
794       cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
795       connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
796                                    g_connect_backup_jobs_enabled),
797       ssl_client_context_(ssl_client_context) {
798   DCHECK_LE(0, max_sockets_per_group);
799   DCHECK_LE(max_sockets_per_group, max_sockets);
800 
801   if (cleanup_on_ip_address_change_)
802     NetworkChangeNotifier::AddIPAddressObserver(this);
803 
804   if (ssl_client_context_)
805     ssl_client_context_->AddObserver(this);
806 }
807 
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)808 void TransportClientSocketPool::OnSSLConfigChanged(
809     SSLClientContext::SSLConfigChangeType change_type) {
810   const char* message = nullptr;
811   // When the SSL config or cert verifier config changes, flush all idle
812   // sockets so they won't get re-used, and allow any active sockets to finish,
813   // but don't put them back in the socket pool.
814   switch (change_type) {
815     case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
816       message = kNetworkChanged;
817       break;
818     case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
819       message = kCertDatabaseChanged;
820       break;
821     case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
822       message = kCertVerifierChanged;
823       break;
824   };
825 
826   base::TimeTicks now = base::TimeTicks::Now();
827   for (auto it = group_map_.begin(); it != group_map_.end();) {
828     it = RefreshGroup(it, now, message);
829   }
830   CheckForStalledSocketGroups();
831 }
832 
833 // TODO(crbug.com/1206799): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)834 void TransportClientSocketPool::OnSSLConfigForServersChanged(
835     const base::flat_set<HostPortPair>& servers) {
836   // Current time value. Retrieving it once at the function start rather than
837   // inside the inner loop, since it shouldn't change by any meaningful amount.
838   //
839   // TODO(davidben): This value is not actually needed because
840   // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
841   // interfaces so the parameter is not necessary.
842   base::TimeTicks now = base::TimeTicks::Now();
843 
844   // If the proxy chain includes a server from `servers` and uses SSL settings
845   // (HTTPS or QUIC), refresh every group.
846   // TODO(https://crbug.com/1491092): Check each ProxyServer in `proxy_chain_`.
847   bool proxy_matches = false;
848   for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
849     if (proxy_server.is_http_like() && !proxy_server.is_http() &&
850         servers.contains(proxy_server.host_port_pair())) {
851       proxy_matches = true;
852     }
853   }
854 
855   bool refreshed_any = false;
856   for (auto it = group_map_.begin(); it != group_map_.end();) {
857     if (proxy_matches ||
858         (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
859          servers.contains(
860              HostPortPair::FromSchemeHostPort(it->first.destination())))) {
861       refreshed_any = true;
862       // Note this call may destroy the group and invalidate |to_refresh|.
863       it = RefreshGroup(it, now, kSslConfigChanged);
864     } else {
865       ++it;
866     }
867   }
868 
869   if (refreshed_any) {
870     // Check to see if any group can use the freed up socket slots. It would be
871     // more efficient to give the slots to the refreshed groups, if the still
872     // exists and need them, but this should be rare enough that it doesn't
873     // matter. This will also make sure the slots are given to the group with
874     // the highest priority request without an assigned ConnectJob.
875     CheckForStalledSocketGroups();
876   }
877 }
878 
HasGroup(const GroupId & group_id) const879 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
880   return base::Contains(group_map_, group_id);
881 }
882 
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)883 void TransportClientSocketPool::CleanupIdleSockets(
884     bool force,
885     const char* net_log_reason_utf8) {
886   if (idle_socket_count_ == 0)
887     return;
888 
889   // Current time value. Retrieving it once at the function start rather than
890   // inside the inner loop, since it shouldn't change by any meaningful amount.
891   base::TimeTicks now = base::TimeTicks::Now();
892 
893   for (auto i = group_map_.begin(); i != group_map_.end();) {
894     Group* group = i->second;
895     CHECK(group);
896     CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
897     // Delete group if no longer needed.
898     if (group->IsEmpty()) {
899       i = RemoveGroup(i);
900     } else {
901       ++i;
902     }
903   }
904 }
905 
CloseOneIdleSocket()906 bool TransportClientSocketPool::CloseOneIdleSocket() {
907   if (idle_socket_count_ == 0)
908     return false;
909   return CloseOneIdleSocketExceptInGroup(nullptr);
910 }
911 
CloseOneIdleConnectionInHigherLayeredPool()912 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
913   // This pool doesn't have any idle sockets. It's possible that a pool at a
914   // higher layer is holding one of this sockets active, but it's actually idle.
915   // Query the higher layers.
916   for (auto* higher_pool : higher_pools_) {
917     if (higher_pool->CloseOneIdleConnection())
918       return true;
919   }
920   return false;
921 }
922 
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)923 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
924     bool force,
925     Group* group,
926     const base::TimeTicks& now,
927     const char* net_log_reason_utf8) {
928   // If |force| is true, a reason must be provided.
929   DCHECK(!force || net_log_reason_utf8);
930 
931   auto idle_socket_it = group->mutable_idle_sockets()->begin();
932   while (idle_socket_it != group->idle_sockets().end()) {
933     bool should_clean_up = force;
934     const char* reason_for_closing_socket = net_log_reason_utf8;
935     base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
936                                   ? used_idle_socket_timeout_
937                                   : unused_idle_socket_timeout_;
938 
939     // Timeout errors take precedence over the reason for flushing sockets in
940     // the group, if applicable.
941     if (now - idle_socket_it->start_time >= timeout) {
942       should_clean_up = true;
943       reason_for_closing_socket = kIdleTimeLimitExpired;
944     }
945 
946     // Usability errors take precedence over over other errors.
947     if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
948       should_clean_up = true;
949 
950     if (should_clean_up) {
951       DCHECK(reason_for_closing_socket);
952       idle_socket_it->socket->NetLog().AddEventWithStringParams(
953           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
954           reason_for_closing_socket);
955       idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
956       DecrementIdleCount();
957     } else {
958       DCHECK(!reason_for_closing_socket);
959       ++idle_socket_it;
960     }
961   }
962 }
963 
GetOrCreateGroup(const GroupId & group_id)964 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
965     const GroupId& group_id) {
966   auto it = group_map_.find(group_id);
967   if (it != group_map_.end())
968     return it->second;
969   Group* group = new Group(group_id, this);
970   group_map_[group_id] = group;
971   return group;
972 }
973 
RemoveGroup(const GroupId & group_id)974 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
975   auto it = group_map_.find(group_id);
976   CHECK(it != group_map_.end());
977 
978   RemoveGroup(it);
979 }
980 
981 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)982 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
983   delete it->second;
984   return group_map_.erase(it);
985 }
986 
987 // static
connect_backup_jobs_enabled()988 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
989   return g_connect_backup_jobs_enabled;
990 }
991 
992 // static
set_connect_backup_jobs_enabled(bool enabled)993 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
994   bool old_value = g_connect_backup_jobs_enabled;
995   g_connect_backup_jobs_enabled = enabled;
996   return old_value;
997 }
998 
IncrementIdleCount()999 void TransportClientSocketPool::IncrementIdleCount() {
1000   ++idle_socket_count_;
1001 }
1002 
DecrementIdleCount()1003 void TransportClientSocketPool::DecrementIdleCount() {
1004   --idle_socket_count_;
1005 }
1006 
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1007 void TransportClientSocketPool::ReleaseSocket(
1008     const GroupId& group_id,
1009     std::unique_ptr<StreamSocket> socket,
1010     int64_t group_generation) {
1011   auto i = group_map_.find(group_id);
1012   CHECK(i != group_map_.end());
1013 
1014   Group* group = i->second;
1015   CHECK(group);
1016 
1017   CHECK_GT(handed_out_socket_count_, 0);
1018   handed_out_socket_count_--;
1019 
1020   CHECK_GT(group->active_socket_count(), 0);
1021   group->DecrementActiveSocketCount();
1022 
1023   bool can_resuse_socket = false;
1024   base::StringPiece not_reusable_reason;
1025   if (!socket->IsConnectedAndIdle()) {
1026     if (!socket->IsConnected()) {
1027       not_reusable_reason = kClosedConnectionReturnedToPool;
1028     } else {
1029       not_reusable_reason = kDataReceivedUnexpectedly;
1030     }
1031   } else if (group_generation != group->generation()) {
1032     not_reusable_reason = kSocketGenerationOutOfDate;
1033   } else {
1034     can_resuse_socket = true;
1035   }
1036 
1037   if (can_resuse_socket) {
1038     DCHECK(not_reusable_reason.empty());
1039 
1040     // Add it to the idle list.
1041     AddIdleSocket(std::move(socket), group);
1042     OnAvailableSocketSlot(group_id, group);
1043   } else {
1044     DCHECK(!not_reusable_reason.empty());
1045 
1046     socket->NetLog().AddEventWithStringParams(
1047         NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1048         not_reusable_reason);
1049     if (group->IsEmpty())
1050       RemoveGroup(i);
1051     socket.reset();
1052   }
1053 
1054   CheckForStalledSocketGroups();
1055 }
1056 
CheckForStalledSocketGroups()1057 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1058   // Loop until there's nothing more to do.
1059   while (true) {
1060     // If we have idle sockets, see if we can give one to the top-stalled group.
1061     GroupId top_group_id;
1062     Group* top_group = nullptr;
1063     if (!FindTopStalledGroup(&top_group, &top_group_id))
1064       return;
1065 
1066     if (ReachedMaxSocketsLimit()) {
1067       if (idle_socket_count_ > 0) {
1068         CloseOneIdleSocket();
1069       } else {
1070         // We can't activate more sockets since we're already at our global
1071         // limit.
1072         return;
1073       }
1074     }
1075 
1076     // Note that this may delete top_group.
1077     OnAvailableSocketSlot(top_group_id, top_group);
1078   }
1079 }
1080 
1081 // Search for the highest priority pending request, amongst the groups that
1082 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1083 // the same priority, the winner is based on group hash ordering (and not
1084 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1085 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1086                                                     GroupId* group_id) const {
1087   CHECK((group && group_id) || (!group && !group_id));
1088   Group* top_group = nullptr;
1089   const GroupId* top_group_id = nullptr;
1090   bool has_stalled_group = false;
1091   for (const auto& it : group_map_) {
1092     Group* curr_group = it.second;
1093     if (!curr_group->has_unbound_requests())
1094       continue;
1095     if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1096       if (!group)
1097         return true;
1098       has_stalled_group = true;
1099       bool has_higher_priority =
1100           !top_group ||
1101           curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1102       if (has_higher_priority) {
1103         top_group = curr_group;
1104         top_group_id = &it.first;
1105       }
1106     }
1107   }
1108 
1109   if (top_group) {
1110     CHECK(group);
1111     *group = top_group;
1112     *group_id = *top_group_id;
1113   } else {
1114     CHECK(!has_stalled_group);
1115   }
1116   return has_stalled_group;
1117 }
1118 
OnIPAddressChanged()1119 void TransportClientSocketPool::OnIPAddressChanged() {
1120   DCHECK(cleanup_on_ip_address_change_);
1121   FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1122 }
1123 
FlushWithError(int error,const char * net_log_reason_utf8)1124 void TransportClientSocketPool::FlushWithError(
1125     int error,
1126     const char* net_log_reason_utf8) {
1127   CancelAllConnectJobs();
1128   CloseIdleSockets(net_log_reason_utf8);
1129   CancelAllRequestsWithError(error);
1130   for (const auto& group : group_map_) {
1131     group.second->IncrementGeneration();
1132   }
1133 }
1134 
RemoveConnectJob(ConnectJob * job,Group * group)1135 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1136                                                  Group* group) {
1137   CHECK_GT(connecting_socket_count_, 0);
1138   connecting_socket_count_--;
1139 
1140   DCHECK(group);
1141   group->RemoveUnboundJob(job);
1142 }
1143 
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1144 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1145                                                       Group* group) {
1146   DCHECK(base::Contains(group_map_, group_id));
1147   if (group->IsEmpty()) {
1148     RemoveGroup(group_id);
1149   } else if (group->has_unbound_requests()) {
1150     ProcessPendingRequest(group_id, group);
1151   }
1152 }
1153 
ProcessPendingRequest(const GroupId & group_id,Group * group)1154 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1155                                                       Group* group) {
1156   const Request* next_request = group->GetNextUnboundRequest();
1157   DCHECK(next_request);
1158 
1159   // If the group has no idle sockets, and can't make use of an additional slot,
1160   // either because it's at the limit or because it's at the socket per group
1161   // limit, then there's nothing to do.
1162   if (group->idle_sockets().empty() &&
1163       !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1164     return;
1165   }
1166 
1167   int rv =
1168       RequestSocketInternal(group_id, *next_request,
1169                             /*preconnect_done_closure=*/base::OnceClosure());
1170   if (rv != ERR_IO_PENDING) {
1171     std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1172     DCHECK(request);
1173     if (group->IsEmpty())
1174       RemoveGroup(group_id);
1175 
1176     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1177                                                 rv);
1178     InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1179                             request->socket_tag());
1180   }
1181 }
1182 
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1183 void TransportClientSocketPool::HandOutSocket(
1184     std::unique_ptr<StreamSocket> socket,
1185     ClientSocketHandle::SocketReuseType reuse_type,
1186     const LoadTimingInfo::ConnectTiming& connect_timing,
1187     ClientSocketHandle* handle,
1188     base::TimeDelta idle_time,
1189     Group* group,
1190     const NetLogWithSource& net_log) {
1191   DCHECK(socket);
1192   handle->SetSocket(std::move(socket));
1193   handle->set_reuse_type(reuse_type);
1194   handle->set_idle_time(idle_time);
1195   handle->set_group_generation(group->generation());
1196   handle->set_connect_timing(connect_timing);
1197 
1198   if (reuse_type == ClientSocketHandle::REUSED_IDLE) {
1199     net_log.AddEventWithIntParams(
1200         NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1201         static_cast<int>(idle_time.InMilliseconds()));
1202   }
1203 
1204   net_log.AddEventReferencingSource(
1205       NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1206       handle->socket()->NetLog().source());
1207 
1208   handed_out_socket_count_++;
1209   group->IncrementActiveSocketCount();
1210 }
1211 
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1212 void TransportClientSocketPool::AddIdleSocket(
1213     std::unique_ptr<StreamSocket> socket,
1214     Group* group) {
1215   DCHECK(socket);
1216   IdleSocket idle_socket;
1217   idle_socket.socket = std::move(socket);
1218   idle_socket.start_time = base::TimeTicks::Now();
1219 
1220   group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1221   IncrementIdleCount();
1222 }
1223 
CancelAllConnectJobs()1224 void TransportClientSocketPool::CancelAllConnectJobs() {
1225   for (auto i = group_map_.begin(); i != group_map_.end();) {
1226     Group* group = i->second;
1227     CHECK(group);
1228     connecting_socket_count_ -= group->jobs().size();
1229     group->RemoveAllUnboundJobs();
1230 
1231     // Delete group if no longer needed.
1232     if (group->IsEmpty()) {
1233       i = RemoveGroup(i);
1234     } else {
1235       ++i;
1236     }
1237   }
1238 }
1239 
CancelAllRequestsWithError(int error)1240 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1241   for (auto i = group_map_.begin(); i != group_map_.end();) {
1242     Group* group = i->second;
1243     CHECK(group);
1244 
1245     while (true) {
1246       std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1247       if (!request)
1248         break;
1249       InvokeUserCallbackLater(request->handle(), request->release_callback(),
1250                               error, request->socket_tag());
1251     }
1252 
1253     // Mark bound connect jobs as needing to fail. Can't fail them immediately
1254     // because they may have access to objects owned by the ConnectJob, and
1255     // could access them if a user callback invocation is queued. It would also
1256     // result in the consumer handling two messages at once, which in general
1257     // isn't safe for a lot of code.
1258     group->SetPendingErrorForAllBoundRequests(error);
1259 
1260     // Delete group if no longer needed.
1261     if (group->IsEmpty()) {
1262       i = RemoveGroup(i);
1263     } else {
1264       ++i;
1265     }
1266   }
1267 }
1268 
ReachedMaxSocketsLimit() const1269 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1270   // Each connecting socket will eventually connect and be handed out.
1271   int total =
1272       handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1273   // There can be more sockets than the limit since some requests can ignore
1274   // the limit
1275   if (total < max_sockets_)
1276     return false;
1277   return true;
1278 }
1279 
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1280 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1281     const Group* exception_group) {
1282   CHECK_GT(idle_socket_count_, 0);
1283 
1284   for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1285     Group* group = i->second;
1286     CHECK(group);
1287     if (exception_group == group)
1288       continue;
1289     std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1290 
1291     if (!idle_sockets->empty()) {
1292       idle_sockets->pop_front();
1293       DecrementIdleCount();
1294       if (group->IsEmpty())
1295         RemoveGroup(i);
1296 
1297       return true;
1298     }
1299   }
1300 
1301   return false;
1302 }
1303 
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1304 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1305                                                      int result,
1306                                                      ConnectJob* job) {
1307   DCHECK_NE(ERR_IO_PENDING, result);
1308   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1309   DCHECK_EQ(group, group_map_[group->group_id()]);
1310   DCHECK(result != OK || job->socket() != nullptr);
1311 
1312   // Check if the ConnectJob is already bound to a Request. If so, result is
1313   // returned to that specific request.
1314   absl::optional<Group::BoundRequest> bound_request =
1315       group->FindAndRemoveBoundRequestForConnectJob(job);
1316   Request* request = nullptr;
1317   std::unique_ptr<Request> owned_request;
1318   if (bound_request) {
1319     --connecting_socket_count_;
1320 
1321     // If the socket pools were previously flushed with an error, return that
1322     // error to the bound request and discard the socket.
1323     if (bound_request->pending_error != OK) {
1324       InvokeUserCallbackLater(bound_request->request->handle(),
1325                               bound_request->request->release_callback(),
1326                               bound_request->pending_error,
1327                               bound_request->request->socket_tag());
1328       bound_request->request->net_log().EndEventWithNetErrorCode(
1329           NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1330       OnAvailableSocketSlot(group->group_id(), group);
1331       CheckForStalledSocketGroups();
1332       return;
1333     }
1334 
1335     // If the ConnectJob is from a previous generation, add the request back to
1336     // the group, and kick off another request. The socket will be discarded.
1337     if (bound_request->generation != group->generation()) {
1338       group->InsertUnboundRequest(std::move(bound_request->request));
1339       OnAvailableSocketSlot(group->group_id(), group);
1340       CheckForStalledSocketGroups();
1341       return;
1342     }
1343 
1344     request = bound_request->request.get();
1345   } else {
1346     // In this case, RemoveConnectJob(job, _) must be called before exiting this
1347     // method. Otherwise, |job| will be leaked.
1348     owned_request = group->PopNextUnboundRequest();
1349     request = owned_request.get();
1350 
1351     if (!request) {
1352       if (result == OK)
1353         AddIdleSocket(job->PassSocket(), group);
1354       RemoveConnectJob(job, group);
1355       OnAvailableSocketSlot(group->group_id(), group);
1356       CheckForStalledSocketGroups();
1357       return;
1358     }
1359 
1360     LogBoundConnectJobToRequest(job->net_log().source(), *request);
1361   }
1362 
1363   // The case where there's no request is handled above.
1364   DCHECK(request);
1365 
1366   if (result != OK)
1367     request->handle()->SetAdditionalErrorState(job);
1368   if (job->socket()) {
1369     HandOutSocket(job->PassSocket(), ClientSocketHandle::UNUSED,
1370                   job->connect_timing(), request->handle(), base::TimeDelta(),
1371                   group, request->net_log());
1372   }
1373   request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1374                                               result);
1375   InvokeUserCallbackLater(request->handle(), request->release_callback(),
1376                           result, request->socket_tag());
1377   if (!bound_request)
1378     RemoveConnectJob(job, group);
1379   // If no socket was handed out, there's a new socket slot available.
1380   if (!request->handle()->socket()) {
1381     OnAvailableSocketSlot(group->group_id(), group);
1382     CheckForStalledSocketGroups();
1383   }
1384 }
1385 
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1386 void TransportClientSocketPool::OnNeedsProxyAuth(
1387     Group* group,
1388     const HttpResponseInfo& response,
1389     HttpAuthController* auth_controller,
1390     base::OnceClosure restart_with_auth_callback,
1391     ConnectJob* job) {
1392   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1393   DCHECK_EQ(group, group_map_[group->group_id()]);
1394 
1395   const Request* request = group->BindRequestToConnectJob(job);
1396   // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1397   // failure.
1398   if (!request) {
1399     OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1400     return;
1401   }
1402 
1403   request->proxy_auth_callback().Run(response, auth_controller,
1404                                      std::move(restart_with_auth_callback));
1405 }
1406 
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1407 void TransportClientSocketPool::InvokeUserCallbackLater(
1408     ClientSocketHandle* handle,
1409     CompletionOnceCallback callback,
1410     int rv,
1411     const SocketTag& socket_tag) {
1412   CHECK(!base::Contains(pending_callback_map_, handle));
1413   pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1414   if (rv == OK) {
1415     handle->socket()->ApplySocketTag(socket_tag);
1416   }
1417   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1418       FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1419                                 weak_factory_.GetWeakPtr(),
1420                                 // This is safe as `handle` is checked against a
1421                                 // map to verify it's alive before dereference.
1422                                 // This code path must only be reachable by
1423                                 // `handle`s that have had Init called.
1424                                 base::UnsafeDangling(handle)));
1425 }
1426 
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1427 void TransportClientSocketPool::InvokeUserCallback(
1428     MayBeDangling<ClientSocketHandle> handle) {
1429   auto it = pending_callback_map_.find(handle);
1430 
1431   // Exit if the request has already been cancelled.
1432   if (it == pending_callback_map_.end())
1433     return;
1434 
1435   CHECK(!handle->is_initialized());
1436   CompletionOnceCallback callback = std::move(it->second.callback);
1437   int result = it->second.result;
1438   pending_callback_map_.erase(it);
1439   std::move(callback).Run(result);
1440 }
1441 
TryToCloseSocketsInLayeredPools()1442 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1443   while (IsStalled()) {
1444     // Closing a socket will result in calling back into |this| to use the freed
1445     // socket slot, so nothing else is needed.
1446     if (!CloseOneIdleConnectionInHigherLayeredPool())
1447       return;
1448   }
1449 }
1450 
1451 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1452 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1453                                         const base::TimeTicks& now,
1454                                         const char* net_log_reason_utf8) {
1455   Group* group = it->second;
1456   CHECK(group);
1457   CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1458 
1459   connecting_socket_count_ -= group->jobs().size();
1460   group->RemoveAllUnboundJobs();
1461 
1462   // Otherwise, prevent reuse of existing sockets.
1463   group->IncrementGeneration();
1464 
1465   // Delete group if no longer needed.
1466   if (group->IsEmpty()) {
1467     return RemoveGroup(it);
1468   }
1469   return ++it;
1470 }
1471 
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1472 TransportClientSocketPool::Group::Group(
1473     const GroupId& group_id,
1474     TransportClientSocketPool* client_socket_pool)
1475     : group_id_(group_id),
1476       client_socket_pool_(client_socket_pool),
1477       unbound_requests_(NUM_PRIORITIES) {}
1478 
~Group()1479 TransportClientSocketPool::Group::~Group() {
1480   DCHECK_EQ(0u, never_assigned_job_count());
1481   DCHECK_EQ(0u, unassigned_job_count());
1482   DCHECK(unbound_requests_.empty());
1483   DCHECK(jobs_.empty());
1484   DCHECK(bound_requests_.empty());
1485 }
1486 
OnConnectJobComplete(int result,ConnectJob * job)1487 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1488                                                             ConnectJob* job) {
1489   DCHECK_NE(ERR_IO_PENDING, result);
1490   client_socket_pool_->OnConnectJobComplete(this, result, job);
1491 }
1492 
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1493 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1494     const HttpResponseInfo& response,
1495     HttpAuthController* auth_controller,
1496     base::OnceClosure restart_with_auth_callback,
1497     ConnectJob* job) {
1498   client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1499                                         std::move(restart_with_auth_callback),
1500                                         job);
1501 }
1502 
StartBackupJobTimer(const GroupId & group_id)1503 void TransportClientSocketPool::Group::StartBackupJobTimer(
1504     const GroupId& group_id) {
1505   // Only allow one timer to run at a time.
1506   if (BackupJobTimerIsRunning())
1507     return;
1508 
1509   // Unretained here is okay because |backup_job_timer_| is
1510   // automatically cancelled when it's destroyed.
1511   backup_job_timer_.Start(FROM_HERE,
1512                           client_socket_pool_->ConnectRetryInterval(),
1513                           base::BindOnce(&Group::OnBackupJobTimerFired,
1514                                          base::Unretained(this), group_id));
1515 }
1516 
BackupJobTimerIsRunning() const1517 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1518   return backup_job_timer_.IsRunning();
1519 }
1520 
TryToUseNeverAssignedConnectJob()1521 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1522   SanityCheck();
1523 
1524   if (never_assigned_job_count_ == 0)
1525     return false;
1526   --never_assigned_job_count_;
1527   return true;
1528 }
1529 
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1530 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1531                                               bool is_preconnect) {
1532   SanityCheck();
1533 
1534   if (is_preconnect)
1535     ++never_assigned_job_count_;
1536   jobs_.push_back(std::move(job));
1537   TryToAssignUnassignedJob(jobs_.back().get());
1538 
1539   SanityCheck();
1540 }
1541 
RemoveUnboundJob(ConnectJob * job)1542 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1543     ConnectJob* job) {
1544   SanityCheck();
1545 
1546   // Check that |job| is in the list.
1547   auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1548   DCHECK(it != jobs_.end());
1549 
1550   // Check if |job| is in the unassigned jobs list. If so, remove it.
1551   auto it2 = base::ranges::find(unassigned_jobs_, job);
1552   if (it2 != unassigned_jobs_.end()) {
1553     unassigned_jobs_.erase(it2);
1554   } else {
1555     // Otherwise, |job| must be assigned to some Request. Unassign it, then
1556     // try to replace it with another job if possible (either by taking an
1557     // unassigned job or stealing from another request, if any requests after it
1558     // have a job).
1559     RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1560     DCHECK(!request_with_job.is_null());
1561     request_with_job.value()->ReleaseJob();
1562     TryToAssignJobToRequest(request_with_job);
1563   }
1564   std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1565   jobs_.erase(it);
1566 
1567   size_t job_count = jobs_.size();
1568   if (job_count < never_assigned_job_count_)
1569     never_assigned_job_count_ = job_count;
1570 
1571   // If we've got no more jobs for this group, then we no longer need a
1572   // backup job either.
1573   if (jobs_.empty()) {
1574     DCHECK(unassigned_jobs_.empty());
1575     backup_job_timer_.Stop();
1576   }
1577 
1578   SanityCheck();
1579   return owned_job;
1580 }
1581 
OnBackupJobTimerFired(const GroupId & group_id)1582 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1583     const GroupId& group_id) {
1584   // If there are no more jobs pending, there is no work to do.
1585   // If we've done our cleanups correctly, this should not happen.
1586   if (jobs_.empty()) {
1587     NOTREACHED();
1588     return;
1589   }
1590 
1591   // If the old job has already established a connection, don't start a backup
1592   // job. Backup jobs are only for issues establishing the initial TCP
1593   // connection - the timeout they used is tuned for that, and tests expect that
1594   // behavior.
1595   //
1596   // TODO(https://crbug.com/929814): Replace both this and the
1597   // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1598   // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1599   // OnHostResolved callback of any pending requests), and the
1600   // HasEstablishedConnection() callback to stop the timer. That should result
1601   // in a more robust, testable API.
1602   if ((*jobs_.begin())->HasEstablishedConnection())
1603     return;
1604 
1605   // If our old job is waiting on DNS, or if we can't create any sockets
1606   // right now due to limits, just reset the timer.
1607   if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1608       !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1609       (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1610     StartBackupJobTimer(group_id);
1611     return;
1612   }
1613 
1614   if (unbound_requests_.empty())
1615     return;
1616 
1617   Request* request = unbound_requests_.FirstMax().value().get();
1618   std::unique_ptr<ConnectJob> owned_backup_job =
1619       client_socket_pool_->CreateConnectJob(
1620           group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1621           request->proxy_annotation_tag(), request->priority(),
1622           request->socket_tag(), this);
1623   owned_backup_job->net_log().AddEvent(
1624       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1625         return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1626       });
1627   ConnectJob* backup_job = owned_backup_job.get();
1628   AddJob(std::move(owned_backup_job), false);
1629   client_socket_pool_->connecting_socket_count_++;
1630   int rv = backup_job->Connect();
1631   if (rv != ERR_IO_PENDING) {
1632     client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1633   }
1634 }
1635 
SanityCheck() const1636 void TransportClientSocketPool::Group::SanityCheck() const {
1637 #if DCHECK_IS_ON()
1638   DCHECK_LE(never_assigned_job_count(), jobs_.size());
1639   DCHECK_LE(unassigned_job_count(), jobs_.size());
1640 
1641   // Check that |unassigned_jobs_| is empty iff there are at least as many
1642   // requests as jobs.
1643   DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1644 
1645   size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1646 
1647   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1648   for (size_t i = 0; i < unbound_requests_.size();
1649        ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1650     DCHECK(!pointer.is_null());
1651     DCHECK(pointer.value());
1652     // Check that the first |num_assigned_jobs| requests have valid job
1653     // assignments.
1654     if (i < num_assigned_jobs) {
1655       // The request has a job.
1656       ConnectJob* job = pointer.value()->job();
1657       DCHECK(job);
1658       // The request's job is not in |unassigned_jobs_|
1659       DCHECK(!base::Contains(unassigned_jobs_, job));
1660       // The request's job is in |jobs_|
1661       DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1662       // The same job is not assigned to any other request with a job.
1663       RequestQueue::Pointer pointer2 =
1664           unbound_requests_.GetNextTowardsLastMin(pointer);
1665       for (size_t j = i + 1; j < num_assigned_jobs;
1666            ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1667         DCHECK(!pointer2.is_null());
1668         ConnectJob* job2 = pointer2.value()->job();
1669         DCHECK(job2);
1670         DCHECK_NE(job, job2);
1671       }
1672       DCHECK_EQ(pointer.value()->priority(), job->priority());
1673     } else {
1674       // Check that any subsequent requests do not have a job.
1675       DCHECK(!pointer.value()->job());
1676     }
1677   }
1678 
1679   for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1680     // Check that all unassigned jobs are in |jobs_|
1681     ConnectJob* job = *it;
1682     DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1683     // Check that there are no duplicated entries in |unassigned_jobs_|
1684     for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1685       DCHECK_NE(job, *it2);
1686     }
1687 
1688     // Check that no |unassigned_jobs_| are in |bound_requests_|.
1689     DCHECK(!base::Contains(bound_requests_, job,
1690                            [](const BoundRequest& bound_request) {
1691                              return bound_request.connect_job.get();
1692                            }));
1693   }
1694 #endif
1695 }
1696 
RemoveAllUnboundJobs()1697 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1698   SanityCheck();
1699 
1700   // Remove jobs from any requests that have them.
1701   if (!unbound_requests_.empty()) {
1702     for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1703          !pointer.is_null() && pointer.value()->job();
1704          pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1705       pointer.value()->ReleaseJob();
1706     }
1707   }
1708   unassigned_jobs_.clear();
1709   never_assigned_job_count_ = 0;
1710 
1711   // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1712   // removal from `TransportClientSocketPool::group_map_`, so if this check
1713   // fails, `this` has been deleted, likely through some reentrancy issue.
1714   CHECK(client_socket_pool_->HasGroup(group_id_));
1715 
1716   // Delete active jobs.
1717   jobs_.clear();
1718   // Stop backup job timer.
1719   backup_job_timer_.Stop();
1720 
1721   SanityCheck();
1722 }
1723 
ConnectJobCount() const1724 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1725   return bound_requests_.size() + jobs_.size();
1726 }
1727 
GetConnectJobForHandle(const ClientSocketHandle * handle) const1728 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1729     const ClientSocketHandle* handle) const {
1730   // Search through bound requests for |handle|.
1731   for (const auto& bound_pair : bound_requests_) {
1732     if (handle == bound_pair.request->handle())
1733       return bound_pair.connect_job.get();
1734   }
1735 
1736   // Search through the unbound requests that have corresponding jobs for a
1737   // request with |handle|.
1738   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1739        !pointer.is_null() && pointer.value()->job();
1740        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1741     if (pointer.value()->handle() == handle)
1742       return pointer.value()->job();
1743   }
1744 
1745   return nullptr;
1746 }
1747 
InsertUnboundRequest(std::unique_ptr<Request> request)1748 void TransportClientSocketPool::Group::InsertUnboundRequest(
1749     std::unique_ptr<Request> request) {
1750   SanityCheck();
1751 
1752   // Should not have a job because it is not already in |unbound_requests_|
1753   DCHECK(!request->job());
1754   // This value must be cached before we release |request|.
1755   RequestPriority priority = request->priority();
1756 
1757   RequestQueue::Pointer new_position;
1758   if (request->respect_limits() == RespectLimits::DISABLED) {
1759     // Put requests with RespectLimits::DISABLED (which should have
1760     // priority == MAXIMUM_PRIORITY) ahead of other requests with
1761     // MAXIMUM_PRIORITY.
1762     DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1763     new_position =
1764         unbound_requests_.InsertAtFront(std::move(request), priority);
1765   } else {
1766     new_position = unbound_requests_.Insert(std::move(request), priority);
1767   }
1768   DCHECK(!unbound_requests_.empty());
1769 
1770   TryToAssignJobToRequest(new_position);
1771 
1772   SanityCheck();
1773 }
1774 
1775 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1776 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1777   return unbound_requests_.empty() ? nullptr
1778                                    : unbound_requests_.FirstMax().value().get();
1779 }
1780 
1781 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1782 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1783   if (unbound_requests_.empty())
1784     return nullptr;
1785   return RemoveUnboundRequest(unbound_requests_.FirstMax());
1786 }
1787 
1788 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1789 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1790     ClientSocketHandle* handle) {
1791   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1792        !pointer.is_null();
1793        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1794     if (pointer.value()->handle() == handle) {
1795       DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1796                 pointer.value()->priority());
1797       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1798       return request;
1799     }
1800   }
1801   return nullptr;
1802 }
1803 
SetPendingErrorForAllBoundRequests(int pending_error)1804 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1805     int pending_error) {
1806   for (auto& bound_request : bound_requests_) {
1807     // Earlier errors take precedence.
1808     if (bound_request.pending_error == OK)
1809       bound_request.pending_error = pending_error;
1810   }
1811 }
1812 
1813 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1814 TransportClientSocketPool::Group::BindRequestToConnectJob(
1815     ConnectJob* connect_job) {
1816   // Check if |job| is already bound to a Request.
1817   for (const auto& bound_pair : bound_requests_) {
1818     if (bound_pair.connect_job.get() == connect_job)
1819       return bound_pair.request.get();
1820   }
1821 
1822   // If not, try to bind it to a Request.
1823   const Request* request = GetNextUnboundRequest();
1824   // If there are no pending requests, or the highest priority request has no
1825   // callback to handle auth challenges, return nullptr.
1826   if (!request || request->proxy_auth_callback().is_null())
1827     return nullptr;
1828 
1829   // Otherwise, bind the ConnectJob to the Request.
1830   std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1831   DCHECK_EQ(owned_request.get(), request);
1832   std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1833   LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1834   bound_requests_.emplace_back(BoundRequest(
1835       std::move(owned_connect_job), std::move(owned_request), generation()));
1836   return request;
1837 }
1838 
1839 absl::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1840 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1841     ConnectJob* connect_job) {
1842   for (auto bound_pair = bound_requests_.begin();
1843        bound_pair != bound_requests_.end(); ++bound_pair) {
1844     if (bound_pair->connect_job.get() != connect_job)
1845       continue;
1846     BoundRequest ret = std::move(*bound_pair);
1847     bound_requests_.erase(bound_pair);
1848     return std::move(ret);
1849   }
1850   return absl::nullopt;
1851 }
1852 
1853 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1854 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1855     ClientSocketHandle* client_socket_handle) {
1856   for (auto bound_pair = bound_requests_.begin();
1857        bound_pair != bound_requests_.end(); ++bound_pair) {
1858     if (bound_pair->request->handle() != client_socket_handle)
1859       continue;
1860     std::unique_ptr<Request> request = std::move(bound_pair->request);
1861     bound_requests_.erase(bound_pair);
1862     return request;
1863   }
1864   return nullptr;
1865 }
1866 
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1867 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1868                                                    RequestPriority priority) {
1869   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1870        !pointer.is_null();
1871        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1872     if (pointer.value()->handle() == handle) {
1873       if (pointer.value()->priority() == priority)
1874         return;
1875 
1876       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1877 
1878       // Requests that ignore limits much be created and remain at the highest
1879       // priority, and should not be reprioritized.
1880       DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1881 
1882       request->set_priority(priority);
1883       InsertUnboundRequest(std::move(request));
1884       return;
1885     }
1886   }
1887 
1888   // This function must be called with a valid ClientSocketHandle.
1889   NOTREACHED();
1890 }
1891 
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1892 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1893     const ClientSocketHandle* handle) const {
1894   SanityCheck();
1895   if (GetConnectJobForHandle(handle))
1896     return true;
1897 
1898   // There's no corresponding ConnectJob. Verify that the handle is at least
1899   // owned by a request.
1900   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1901   for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1902     if (pointer.value()->handle() == handle)
1903       return false;
1904     pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1905   }
1906   NOTREACHED();
1907   return false;
1908 }
1909 
BoundRequest()1910 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1911     : pending_error(OK) {}
1912 
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1913 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1914     std::unique_ptr<ConnectJob> connect_job,
1915     std::unique_ptr<Request> request,
1916     int64_t generation)
1917     : connect_job(std::move(connect_job)),
1918       request(std::move(request)),
1919       generation(generation),
1920       pending_error(OK) {}
1921 
1922 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1923     BoundRequest&& other) = default;
1924 
1925 TransportClientSocketPool::Group::BoundRequest&
1926 TransportClientSocketPool::Group::BoundRequest::operator=(
1927     BoundRequest&& other) = default;
1928 
1929 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1930 
1931 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1932 TransportClientSocketPool::Group::RemoveUnboundRequest(
1933     const RequestQueue::Pointer& pointer) {
1934   SanityCheck();
1935 
1936   std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1937   if (request->job()) {
1938     TryToAssignUnassignedJob(request->ReleaseJob());
1939   }
1940   // If there are no more unbound requests, kill the backup timer.
1941   if (unbound_requests_.empty())
1942     backup_job_timer_.Stop();
1943 
1944   SanityCheck();
1945   return request;
1946 }
1947 
1948 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1949 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1950     const ConnectJob* job) const {
1951   SanityCheck();
1952 
1953   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1954        !pointer.is_null() && pointer.value()->job();
1955        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1956     if (pointer.value()->job() == job)
1957       return pointer;
1958   }
1959   // If a request with the job was not found, it must be in |unassigned_jobs_|.
1960   DCHECK(base::Contains(unassigned_jobs_, job));
1961   return RequestQueue::Pointer();
1962 }
1963 
1964 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1965 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1966   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1967   size_t i = 0;
1968   for (; !pointer.is_null() && pointer.value()->job();
1969        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1970     ++i;
1971   }
1972   DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1973   DCHECK(pointer.is_null() || !pointer.value()->job());
1974   return pointer;
1975 }
1976 
TryToAssignUnassignedJob(ConnectJob * job)1977 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1978     ConnectJob* job) {
1979   unassigned_jobs_.push_back(job);
1980   RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1981   if (!first_request_without_job.is_null()) {
1982     first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1983     unassigned_jobs_.pop_back();
1984   }
1985 }
1986 
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1987 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1988     TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1989   DCHECK(!request_pointer.value()->job());
1990   if (!unassigned_jobs_.empty()) {
1991     request_pointer.value()->AssignJob(unassigned_jobs_.front());
1992     unassigned_jobs_.pop_front();
1993     return;
1994   }
1995 
1996   // If the next request in the queue does not have a job, then there are no
1997   // requests with a job after |request_pointer| from which we can steal.
1998   RequestQueue::Pointer next_request =
1999       unbound_requests_.GetNextTowardsLastMin(request_pointer);
2000   if (next_request.is_null() || !next_request.value()->job())
2001     return;
2002 
2003   // Walk down the queue to find the last request with a job.
2004   RequestQueue::Pointer cur = next_request;
2005   RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2006   while (!next.is_null() && next.value()->job()) {
2007     cur = next;
2008     next = unbound_requests_.GetNextTowardsLastMin(next);
2009   }
2010   // Steal the job from the last request with a job.
2011   TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2012 }
2013 
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2014 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2015     TransportClientSocketPool::Request* source,
2016     TransportClientSocketPool::Request* dest) {
2017   DCHECK(!dest->job());
2018   DCHECK(source->job());
2019   dest->AssignJob(source->ReleaseJob());
2020 }
2021 
2022 }  // namespace net
2023