• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/socket/transport_client_socket_pool.h"
6 
7 #include <string_view>
8 #include <utility>
9 
10 #include "base/auto_reset.h"
11 #include "base/barrier_closure.h"
12 #include "base/check_op.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/contains.h"
15 #include "base/format_macros.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback_helpers.h"
18 #include "base/location.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/not_fatal_until.h"
22 #include "base/notreached.h"
23 #include "base/ranges/algorithm.h"
24 #include "base/strings/string_util.h"
25 #include "base/task/single_thread_task_runner.h"
26 #include "base/time/time.h"
27 #include "base/values.h"
28 #include "net/base/host_port_pair.h"
29 #include "net/base/net_errors.h"
30 #include "net/base/proxy_chain.h"
31 #include "net/base/proxy_server.h"
32 #include "net/log/net_log.h"
33 #include "net/log/net_log_event_type.h"
34 #include "net/log/net_log_source.h"
35 #include "net/socket/connect_job_factory.h"
36 #include "net/traffic_annotation/network_traffic_annotation.h"
37 #include "url/gurl.h"
38 
39 namespace net {
40 
41 namespace {
42 
43 // Indicate whether or not we should establish a new transport layer connection
44 // after a certain timeout has passed without receiving an ACK.
45 bool g_connect_backup_jobs_enabled = true;
46 
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)47 base::Value::Dict NetLogCreateConnectJobParams(
48     bool backup_job,
49     const ClientSocketPool::GroupId* group_id) {
50   return base::Value::Dict()
51       .Set("backup_job", backup_job)
52       .Set("group_id", group_id->ToString());
53 }
54 
55 }  // namespace
56 
57 const char TransportClientSocketPool::kCertDatabaseChanged[] =
58     "Cert database changed";
59 const char TransportClientSocketPool::kCertVerifierChanged[] =
60     "Cert verifier changed";
61 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
62     "Connection was closed when it was returned to the pool";
63 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
64     "Data received unexpectedly";
65 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
66     "Idle time limit expired";
67 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
68 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
69     "Remote side closed connection";
70 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
71     "Socket generation out of date";
72 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
73     "Socket pool destroyed";
74 const char TransportClientSocketPool::kSslConfigChanged[] =
75     "SSL configuration changed";
76 
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)77 TransportClientSocketPool::Request::Request(
78     ClientSocketHandle* handle,
79     CompletionOnceCallback callback,
80     const ProxyAuthCallback& proxy_auth_callback,
81     RequestPriority priority,
82     const SocketTag& socket_tag,
83     RespectLimits respect_limits,
84     Flags flags,
85     scoped_refptr<SocketParams> socket_params,
86     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
87     const NetLogWithSource& net_log)
88     : handle_(handle),
89       callback_(std::move(callback)),
90       proxy_auth_callback_(proxy_auth_callback),
91       priority_(priority),
92       respect_limits_(respect_limits),
93       flags_(flags),
94       socket_params_(std::move(socket_params)),
95       proxy_annotation_tag_(proxy_annotation_tag),
96       net_log_(net_log),
97       socket_tag_(socket_tag) {
98   if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
99     DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
100 }
101 
102 TransportClientSocketPool::Request::~Request() = default;
103 
AssignJob(ConnectJob * job)104 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
105   DCHECK(job);
106   DCHECK(!job_);
107   job_ = job;
108   if (job_->priority() != priority_)
109     job_->ChangePriority(priority_);
110 }
111 
ReleaseJob()112 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
113   DCHECK(job_);
114   ConnectJob* job = job_;
115   job_ = nullptr;
116   return job;
117 }
118 
119 struct TransportClientSocketPool::IdleSocket {
120   // An idle socket can't be used if it is disconnected or has been used
121   // before and has received data unexpectedly (hence no longer idle).  The
122   // unread data would be mistaken for the beginning of the next response if
123   // we were to use the socket for a new request.
124   //
125   // Note that a socket that has never been used before (like a preconnected
126   // socket) may be used even with unread data.  This may be, e.g., a SPDY
127   // SETTINGS frame.
128   //
129   // If the socket is not usable, |net_log_reason_utf8| is set to a string
130   // indicating why the socket is not usable.
131   bool IsUsable(const char** net_log_reason_utf8) const;
132 
133   std::unique_ptr<StreamSocket> socket;
134   base::TimeTicks start_time;
135 };
136 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)137 TransportClientSocketPool::TransportClientSocketPool(
138     int max_sockets,
139     int max_sockets_per_group,
140     base::TimeDelta unused_idle_socket_timeout,
141     const ProxyChain& proxy_chain,
142     bool is_for_websockets,
143     const CommonConnectJobParams* common_connect_job_params,
144     bool cleanup_on_ip_address_change)
145     : TransportClientSocketPool(max_sockets,
146                                 max_sockets_per_group,
147                                 unused_idle_socket_timeout,
148                                 ClientSocketPool::used_idle_socket_timeout(),
149                                 proxy_chain,
150                                 is_for_websockets,
151                                 common_connect_job_params,
152                                 cleanup_on_ip_address_change,
153                                 std::make_unique<ConnectJobFactory>(),
154                                 common_connect_job_params->ssl_client_context,
155                                 /*connect_backup_jobs_enabled=*/true) {}
156 
~TransportClientSocketPool()157 TransportClientSocketPool::~TransportClientSocketPool() {
158   // Clean up any idle sockets and pending connect jobs.  Assert that we have no
159   // remaining active sockets or pending requests.  They should have all been
160   // cleaned up prior to |this| being destroyed.
161   FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
162   DCHECK(group_map_.empty());
163   DCHECK(pending_callback_map_.empty());
164   DCHECK_EQ(0, connecting_socket_count_);
165   DCHECK_EQ(0, handed_out_socket_count_);
166   CHECK(higher_pools_.empty());
167 
168   if (ssl_client_context_)
169     ssl_client_context_->RemoveObserver(this);
170 
171   if (cleanup_on_ip_address_change_)
172     NetworkChangeNotifier::RemoveIPAddressObserver(this);
173 }
174 
175 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)176 TransportClientSocketPool::CreateForTesting(
177     int max_sockets,
178     int max_sockets_per_group,
179     base::TimeDelta unused_idle_socket_timeout,
180     base::TimeDelta used_idle_socket_timeout,
181     const ProxyChain& proxy_chain,
182     bool is_for_websockets,
183     const CommonConnectJobParams* common_connect_job_params,
184     std::unique_ptr<ConnectJobFactory> connect_job_factory,
185     SSLClientContext* ssl_client_context,
186     bool connect_backup_jobs_enabled) {
187   return base::WrapUnique<TransportClientSocketPool>(
188       new TransportClientSocketPool(
189           max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
190           used_idle_socket_timeout, proxy_chain, is_for_websockets,
191           common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
192           std::move(connect_job_factory), ssl_client_context,
193           connect_backup_jobs_enabled));
194 }
195 
CallbackResultPair()196 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
197     : result(OK) {}
198 
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)199 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
200     CompletionOnceCallback callback_in,
201     int result_in)
202     : callback(std::move(callback_in)), result(result_in) {}
203 
204 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
205     TransportClientSocketPool::CallbackResultPair&& other) = default;
206 
207 TransportClientSocketPool::CallbackResultPair&
208 TransportClientSocketPool::CallbackResultPair::operator=(
209     TransportClientSocketPool::CallbackResultPair&& other) = default;
210 
211 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
212 
IsStalled() const213 bool TransportClientSocketPool::IsStalled() const {
214   // If fewer than |max_sockets_| are in use, then clearly |this| is not
215   // stalled.
216   if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
217     return false;
218   // So in order to be stalled, |this| must be using at least |max_sockets_| AND
219   // |this| must have a request that is actually stalled on the global socket
220   // limit.  To find such a request, look for a group that has more requests
221   // than jobs AND where the number of sockets is less than
222   // |max_sockets_per_group_|.  (If the number of sockets is equal to
223   // |max_sockets_per_group_|, then the request is stalled on the group limit,
224   // which does not count.)
225   for (const auto& it : group_map_) {
226     if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
227       return true;
228   }
229   return false;
230 }
231 
AddHigherLayeredPool(HigherLayeredPool * higher_pool)232 void TransportClientSocketPool::AddHigherLayeredPool(
233     HigherLayeredPool* higher_pool) {
234   CHECK(higher_pool);
235   CHECK(!base::Contains(higher_pools_, higher_pool));
236   higher_pools_.insert(higher_pool);
237 }
238 
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)239 void TransportClientSocketPool::RemoveHigherLayeredPool(
240     HigherLayeredPool* higher_pool) {
241   CHECK(higher_pool);
242   CHECK(base::Contains(higher_pools_, higher_pool));
243   higher_pools_.erase(higher_pool);
244 }
245 
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)246 int TransportClientSocketPool::RequestSocket(
247     const GroupId& group_id,
248     scoped_refptr<SocketParams> params,
249     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
250     RequestPriority priority,
251     const SocketTag& socket_tag,
252     RespectLimits respect_limits,
253     ClientSocketHandle* handle,
254     CompletionOnceCallback callback,
255     const ProxyAuthCallback& proxy_auth_callback,
256     const NetLogWithSource& net_log) {
257   CHECK(callback);
258   CHECK(handle);
259 
260   NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
261 
262   std::unique_ptr<Request> request = std::make_unique<Request>(
263       handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
264       respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
265 
266   // Cleanup any timed-out idle sockets.
267   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
268 
269   request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
270 
271   int rv =
272       RequestSocketInternal(group_id, *request,
273                             /*preconnect_done_closure=*/base::OnceClosure());
274   if (rv != ERR_IO_PENDING) {
275     if (rv == OK) {
276       request->handle()->socket()->ApplySocketTag(request->socket_tag());
277     }
278     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
279                                                 rv);
280     CHECK(!request->handle()->is_initialized());
281     request.reset();
282   } else {
283     Group* group = GetOrCreateGroup(group_id);
284     group->InsertUnboundRequest(std::move(request));
285     // Have to do this asynchronously, as closing sockets in higher level pools
286     // call back in to |this|, which will cause all sorts of fun and exciting
287     // re-entrancy issues if the socket pool is doing something else at the
288     // time.
289     if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
290       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
291           FROM_HERE,
292           base::BindOnce(
293               &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
294               weak_factory_.GetWeakPtr()));
295     }
296   }
297   return rv;
298 }
299 
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)300 int TransportClientSocketPool::RequestSockets(
301     const GroupId& group_id,
302     scoped_refptr<SocketParams> params,
303     const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
304     int num_sockets,
305     CompletionOnceCallback callback,
306     const NetLogWithSource& net_log) {
307   // TODO(eroman): Split out the host and port parameters.
308   net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
309                    [&] { return NetLogGroupIdParams(group_id); });
310 
311   Request request(nullptr /* no handle */, CompletionOnceCallback(),
312                   ProxyAuthCallback(), IDLE, SocketTag(),
313                   RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
314                   proxy_annotation_tag, net_log);
315 
316   // Cleanup any timed-out idle sockets.
317   CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
318 
319   if (num_sockets > max_sockets_per_group_) {
320     num_sockets = max_sockets_per_group_;
321   }
322 
323   request.net_log().BeginEventWithIntParams(
324       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
325       num_sockets);
326 
327   Group* group = GetOrCreateGroup(group_id);
328 
329   // RequestSocketsInternal() may delete the group.
330   bool deleted_group = false;
331 
332   int rv = OK;
333 
334   base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
335       num_sockets,
336       base::BindOnce(
337           [](CompletionOnceCallback callback) {
338             base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
339                 FROM_HERE, base::BindOnce(std::move(callback), OK));
340           },
341           std::move(callback)));
342   int pending_connect_job_count = 0;
343   for (int num_iterations_left = num_sockets;
344        group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
345        num_iterations_left--) {
346     rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
347     if (rv == ERR_IO_PENDING) {
348       ++pending_connect_job_count;
349     }
350     if (rv < 0 && rv != ERR_IO_PENDING) {
351       // We're encountering a synchronous error.  Give up.
352       if (!base::Contains(group_map_, group_id))
353         deleted_group = true;
354       break;
355     }
356     if (!base::Contains(group_map_, group_id)) {
357       // Unexpected.  The group should only be getting deleted on synchronous
358       // error.
359       NOTREACHED();
360     }
361   }
362 
363   if (!deleted_group && group->IsEmpty())
364     RemoveGroup(group_id);
365 
366   if (rv == ERR_IO_PENDING)
367     rv = OK;
368   request.net_log().EndEventWithNetErrorCode(
369       NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
370 
371   // Currently we don't handle preconnect errors. So this method returns OK even
372   // if failed to preconnect.
373   // TODO(crbug.com/40843081): Consider support error handlings when needed.
374   if (pending_connect_job_count == 0)
375     return OK;
376   for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
377     preconnect_done_closure.Run();
378   }
379 
380   return ERR_IO_PENDING;
381 }
382 
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)383 int TransportClientSocketPool::RequestSocketInternal(
384     const GroupId& group_id,
385     const Request& request,
386     base::OnceClosure preconnect_done_closure) {
387 #if DCHECK_IS_ON()
388   DCHECK(!request_in_process_);
389   base::AutoReset<bool> auto_reset(&request_in_process_, true);
390 #endif  // DCHECK_IS_ON()
391 
392   ClientSocketHandle* const handle = request.handle();
393   const bool preconnecting = !handle;
394   DCHECK_EQ(preconnecting, !!preconnect_done_closure);
395 
396   Group* group = nullptr;
397   auto group_it = group_map_.find(group_id);
398   if (group_it != group_map_.end()) {
399     group = group_it->second;
400 
401     if (!(request.flags() & NO_IDLE_SOCKETS)) {
402       // Try to reuse a socket.
403       if (AssignIdleSocketToRequest(request, group))
404         return OK;
405     }
406 
407     // If there are more ConnectJobs than pending requests, don't need to do
408     // anything.  Can just wait for the extra job to connect, and then assign it
409     // to the request.
410     if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
411       return ERR_IO_PENDING;
412 
413     // Can we make another active socket now?
414     if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
415         request.respect_limits() == RespectLimits::ENABLED) {
416       // TODO(willchan): Consider whether or not we need to close a socket in a
417       // higher layered group. I don't think this makes sense since we would
418       // just reuse that socket then if we needed one and wouldn't make it down
419       // to this layer.
420       request.net_log().AddEvent(
421           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
422       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
423     }
424   }
425 
426   if (ReachedMaxSocketsLimit() &&
427       request.respect_limits() == RespectLimits::ENABLED) {
428     // NOTE(mmenke):  Wonder if we really need different code for each case
429     // here.  Only reason for them now seems to be preconnects.
430     if (idle_socket_count_ > 0) {
431       // There's an idle socket in this pool. Either that's because there's
432       // still one in this group, but we got here due to preconnecting
433       // bypassing idle sockets, or because there's an idle socket in another
434       // group.
435       bool closed = CloseOneIdleSocketExceptInGroup(group);
436       if (preconnecting && !closed)
437         return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
438     } else {
439       // We could check if we really have a stalled group here, but it
440       // requires a scan of all groups, so just flip a flag here, and do the
441       // check later.
442       request.net_log().AddEvent(
443           NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
444       return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
445     }
446   }
447 
448   // We couldn't find a socket to reuse, and there's space to allocate one,
449   // so allocate and connect a new one.
450   group = GetOrCreateGroup(group_id);
451   std::unique_ptr<ConnectJob> connect_job(
452       CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
453                        request.proxy_annotation_tag(), request.priority(),
454                        request.socket_tag(), group));
455   connect_job->net_log().AddEvent(
456       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
457         return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
458       });
459 
460   int rv = connect_job->Connect();
461   if (rv == ERR_IO_PENDING) {
462     if (preconnect_done_closure) {
463       DCHECK(preconnecting);
464       connect_job->set_done_closure(std::move(preconnect_done_closure));
465     }
466     // If we didn't have any sockets in this group, set a timer for potentially
467     // creating a new one.  If the SYN is lost, this backup socket may complete
468     // before the slow socket, improving end user latency.
469     if (connect_backup_jobs_enabled_ && group->IsEmpty())
470       group->StartBackupJobTimer(group_id);
471     group->AddJob(std::move(connect_job), preconnecting);
472     connecting_socket_count_++;
473     return rv;
474   }
475 
476   LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
477   if (preconnecting) {
478     if (rv == OK)
479       AddIdleSocket(connect_job->PassSocket(), group);
480   } else {
481     DCHECK(handle);
482     if (rv != OK)
483       handle->SetAdditionalErrorState(connect_job.get());
484     std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
485     if (socket) {
486       HandOutSocket(std::move(socket),
487                     StreamSocketHandle::SocketReuseType::kUnused,
488                     connect_job->connect_timing(), handle,
489                     /*time_idle=*/base::TimeDelta(), group, request.net_log());
490     }
491   }
492   if (group->IsEmpty())
493     RemoveGroup(group_id);
494 
495   return rv;
496 }
497 
AssignIdleSocketToRequest(const Request & request,Group * group)498 bool TransportClientSocketPool::AssignIdleSocketToRequest(
499     const Request& request,
500     Group* group) {
501   std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
502   auto idle_socket_it = idle_sockets->end();
503 
504   // Iterate through the idle sockets forwards (oldest to newest)
505   //   * Delete any disconnected ones.
506   //   * If we find a used idle socket, assign to |idle_socket|.  At the end,
507   //   the |idle_socket_it| will be set to the newest used idle socket.
508   for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
509     // Check whether socket is usable. Note that it's unlikely that the socket
510     // is not usable because this function is always invoked after a
511     // reusability check, but in theory socket can be closed asynchronously.
512     const char* net_log_reason_utf8;
513     if (!it->IsUsable(&net_log_reason_utf8)) {
514       it->socket->NetLog().AddEventWithStringParams(
515           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
516           net_log_reason_utf8);
517       DecrementIdleCount();
518       it = idle_sockets->erase(it);
519       continue;
520     }
521 
522     if (it->socket->WasEverUsed()) {
523       // We found one we can reuse!
524       idle_socket_it = it;
525     }
526 
527     ++it;
528   }
529 
530   // If we haven't found an idle socket, that means there are no used idle
531   // sockets.  Pick the oldest (first) idle socket (FIFO).
532 
533   if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
534     idle_socket_it = idle_sockets->begin();
535 
536   if (idle_socket_it != idle_sockets->end()) {
537     DecrementIdleCount();
538     base::TimeDelta idle_time =
539         base::TimeTicks::Now() - idle_socket_it->start_time;
540     std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
541     idle_sockets->erase(idle_socket_it);
542     // TODO(davidben): If |idle_time| is under some low watermark, consider
543     // treating as UNUSED rather than UNUSED_IDLE. This will avoid
544     // HttpNetworkTransaction retrying on some errors.
545     ClientSocketHandle::SocketReuseType reuse_type =
546         socket->WasEverUsed()
547             ? StreamSocketHandle::SocketReuseType::kReusedIdle
548             : StreamSocketHandle::SocketReuseType::kUnusedIdle;
549 
550     HandOutSocket(std::move(socket), reuse_type,
551                   LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552                   group, request.net_log());
553     return true;
554   }
555 
556   return false;
557 }
558 
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561     const NetLogSource& connect_job_source,
562     const Request& request) {
563   request.net_log().AddEventReferencingSource(
564       NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566 
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568                                             ClientSocketHandle* handle,
569                                             RequestPriority priority) {
570   auto group_it = group_map_.find(group_id);
571   if (group_it == group_map_.end()) {
572     DCHECK(base::Contains(pending_callback_map_, handle));
573     // The Request has already completed and been destroyed; nothing to
574     // reprioritize.
575     return;
576   }
577 
578   group_it->second->SetPriority(handle, priority);
579 }
580 
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582                                               ClientSocketHandle* handle,
583                                               bool cancel_connect_job) {
584   auto callback_it = pending_callback_map_.find(handle);
585   if (callback_it != pending_callback_map_.end()) {
586     int result = callback_it->second.result;
587     pending_callback_map_.erase(callback_it);
588     std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589     if (socket) {
590       if (result != OK) {
591         socket->Disconnect();
592       } else if (cancel_connect_job) {
593         // Close the socket if |cancel_connect_job| is true and there are no
594         // other pending requests.
595         Group* group = GetOrCreateGroup(group_id);
596         if (group->unbound_request_count() == 0)
597           socket->Disconnect();
598       }
599       ReleaseSocket(handle->group_id(), std::move(socket),
600                     handle->group_generation());
601     }
602     return;
603   }
604 
605   CHECK(base::Contains(group_map_, group_id));
606   Group* group = GetOrCreateGroup(group_id);
607 
608   std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609   if (request) {
610     --connecting_socket_count_;
611     OnAvailableSocketSlot(group_id, group);
612     CheckForStalledSocketGroups();
613     return;
614   }
615 
616   // Search |unbound_requests_| for matching handle.
617   request = group->FindAndRemoveUnboundRequest(handle);
618   if (request) {
619     request->net_log().AddEvent(NetLogEventType::CANCELLED);
620     request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621 
622     // Let the job run, unless |cancel_connect_job| is true, or we're at the
623     // socket limit and there are no other requests waiting on the job.
624     bool reached_limit = ReachedMaxSocketsLimit();
625     if (group->jobs().size() > group->unbound_request_count() &&
626         (cancel_connect_job || reached_limit)) {
627       RemoveConnectJob(group->jobs().begin()->get(), group);
628       if (group->IsEmpty())
629         RemoveGroup(group->group_id());
630       if (reached_limit)
631         CheckForStalledSocketGroups();
632     }
633   }
634 }
635 
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637     const char* net_log_reason_utf8) {
638   CleanupIdleSockets(true, net_log_reason_utf8);
639   DCHECK_EQ(0, idle_socket_count_);
640 }
641 
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643     const GroupId& group_id,
644     const char* net_log_reason_utf8) {
645   if (idle_socket_count_ == 0)
646     return;
647   auto it = group_map_.find(group_id);
648   if (it == group_map_.end())
649     return;
650   CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651                             net_log_reason_utf8);
652   if (it->second->IsEmpty())
653     RemoveGroup(it);
654 }
655 
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657   return idle_socket_count_;
658 }
659 
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661     const GroupId& group_id) const {
662   auto i = group_map_.find(group_id);
663   CHECK(i != group_map_.end());
664 
665   return i->second->idle_sockets().size();
666 }
667 
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669     const GroupId& group_id,
670     const ClientSocketHandle* handle) const {
671   if (base::Contains(pending_callback_map_, handle))
672     return LOAD_STATE_CONNECTING;
673 
674   auto group_it = group_map_.find(group_id);
675   if (group_it == group_map_.end()) {
676     // TODO(mmenke):  This is actually reached in the wild, for unknown reasons.
677     // Would be great to understand why, and if it's a bug, fix it.  If not,
678     // should have a test for that case.
679     NOTREACHED();
680   }
681 
682   const Group& group = *group_it->second;
683   ConnectJob* job = group.GetConnectJobForHandle(handle);
684   if (job)
685     return job->GetLoadState();
686 
687   if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
688     return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
689   return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
690 }
691 
GetInfoAsValue(const std::string & name,const std::string & type) const692 base::Value TransportClientSocketPool::GetInfoAsValue(
693     const std::string& name,
694     const std::string& type) const {
695   // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
696   auto dict = base::Value::Dict()
697                   .Set("name", name)
698                   .Set("type", type)
699                   .Set("handed_out_socket_count", handed_out_socket_count_)
700                   .Set("connecting_socket_count", connecting_socket_count_)
701                   .Set("idle_socket_count", idle_socket_count_)
702                   .Set("max_socket_count", max_sockets_)
703                   .Set("max_sockets_per_group", max_sockets_per_group_);
704 
705   if (group_map_.empty())
706     return base::Value(std::move(dict));
707 
708   base::Value::Dict all_groups_dict;
709   for (const auto& entry : group_map_) {
710     const Group* group = entry.second;
711 
712     base::Value::List idle_socket_list;
713     for (const auto& idle_socket : group->idle_sockets()) {
714       int source_id = idle_socket.socket->NetLog().source().id;
715       idle_socket_list.Append(source_id);
716     }
717 
718     base::Value::List connect_jobs_list;
719     for (const auto& job : group->jobs()) {
720       int source_id = job->net_log().source().id;
721       connect_jobs_list.Append(source_id);
722     }
723 
724     auto group_dict =
725         base::Value::Dict()
726             .Set("pending_request_count",
727                  static_cast<int>(group->unbound_request_count()))
728             .Set("active_socket_count", group->active_socket_count())
729             .Set("idle_sockets", std::move(idle_socket_list))
730             .Set("connect_jobs", std::move(connect_jobs_list))
731             .Set("is_stalled",
732                  group->CanUseAdditionalSocketSlot(max_sockets_per_group_))
733             .Set("backup_job_timer_is_running",
734                  group->BackupJobTimerIsRunning());
735 
736     if (group->has_unbound_requests()) {
737       group_dict.Set("top_pending_priority",
738                      RequestPriorityToString(group->TopPendingPriority()));
739     }
740 
741     all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
742   }
743   dict.Set("groups", std::move(all_groups_dict));
744   return base::Value(std::move(dict));
745 }
746 
HasActiveSocket(const GroupId & group_id) const747 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
748   return HasGroup(group_id);
749 }
750 
IsUsable(const char ** net_log_reason_utf8) const751 bool TransportClientSocketPool::IdleSocket::IsUsable(
752     const char** net_log_reason_utf8) const {
753   DCHECK(net_log_reason_utf8);
754   if (socket->WasEverUsed()) {
755     if (!socket->IsConnectedAndIdle()) {
756       if (!socket->IsConnected()) {
757         *net_log_reason_utf8 = kRemoteSideClosedConnection;
758       } else {
759         *net_log_reason_utf8 = kDataReceivedUnexpectedly;
760       }
761       return false;
762     }
763     return true;
764   }
765 
766   if (!socket->IsConnected()) {
767     *net_log_reason_utf8 = kRemoteSideClosedConnection;
768     return false;
769   }
770   return true;
771 }
772 
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)773 TransportClientSocketPool::TransportClientSocketPool(
774     int max_sockets,
775     int max_sockets_per_group,
776     base::TimeDelta unused_idle_socket_timeout,
777     base::TimeDelta used_idle_socket_timeout,
778     const ProxyChain& proxy_chain,
779     bool is_for_websockets,
780     const CommonConnectJobParams* common_connect_job_params,
781     bool cleanup_on_ip_address_change,
782     std::unique_ptr<ConnectJobFactory> connect_job_factory,
783     SSLClientContext* ssl_client_context,
784     bool connect_backup_jobs_enabled)
785     : ClientSocketPool(is_for_websockets,
786                        common_connect_job_params,
787                        std::move(connect_job_factory)),
788       max_sockets_(max_sockets),
789       max_sockets_per_group_(max_sockets_per_group),
790       unused_idle_socket_timeout_(unused_idle_socket_timeout),
791       used_idle_socket_timeout_(used_idle_socket_timeout),
792       proxy_chain_(proxy_chain),
793       cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
794       connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
795                                    g_connect_backup_jobs_enabled),
796       ssl_client_context_(ssl_client_context) {
797   DCHECK_LE(0, max_sockets_per_group);
798   DCHECK_LE(max_sockets_per_group, max_sockets);
799 
800   if (cleanup_on_ip_address_change_)
801     NetworkChangeNotifier::AddIPAddressObserver(this);
802 
803   if (ssl_client_context_)
804     ssl_client_context_->AddObserver(this);
805 }
806 
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)807 void TransportClientSocketPool::OnSSLConfigChanged(
808     SSLClientContext::SSLConfigChangeType change_type) {
809   const char* message = nullptr;
810   // When the SSL config or cert verifier config changes, flush all idle
811   // sockets so they won't get re-used, and allow any active sockets to finish,
812   // but don't put them back in the socket pool.
813   switch (change_type) {
814     case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
815       message = kNetworkChanged;
816       break;
817     case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
818       message = kCertDatabaseChanged;
819       break;
820     case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
821       message = kCertVerifierChanged;
822       break;
823   };
824 
825   base::TimeTicks now = base::TimeTicks::Now();
826   for (auto it = group_map_.begin(); it != group_map_.end();) {
827     it = RefreshGroup(it, now, message);
828   }
829   CheckForStalledSocketGroups();
830 }
831 
832 // TODO(crbug.com/40181080): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)833 void TransportClientSocketPool::OnSSLConfigForServersChanged(
834     const base::flat_set<HostPortPair>& servers) {
835   // Current time value. Retrieving it once at the function start rather than
836   // inside the inner loop, since it shouldn't change by any meaningful amount.
837   //
838   // TODO(davidben): This value is not actually needed because
839   // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
840   // interfaces so the parameter is not necessary.
841   base::TimeTicks now = base::TimeTicks::Now();
842 
843   // If the proxy chain includes a server from `servers` and uses SSL settings
844   // (HTTPS or QUIC), refresh every group.
845   bool proxy_matches = false;
846   for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
847     if (proxy_server.is_secure_http_like() &&
848         servers.contains(proxy_server.host_port_pair())) {
849       proxy_matches = true;
850     }
851   }
852 
853   bool refreshed_any = false;
854   for (auto it = group_map_.begin(); it != group_map_.end();) {
855     if (proxy_matches ||
856         (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
857          servers.contains(
858              HostPortPair::FromSchemeHostPort(it->first.destination())))) {
859       refreshed_any = true;
860       // Note this call may destroy the group and invalidate |to_refresh|.
861       it = RefreshGroup(it, now, kSslConfigChanged);
862     } else {
863       ++it;
864     }
865   }
866 
867   if (refreshed_any) {
868     // Check to see if any group can use the freed up socket slots. It would be
869     // more efficient to give the slots to the refreshed groups, if the still
870     // exists and need them, but this should be rare enough that it doesn't
871     // matter. This will also make sure the slots are given to the group with
872     // the highest priority request without an assigned ConnectJob.
873     CheckForStalledSocketGroups();
874   }
875 }
876 
HasGroup(const GroupId & group_id) const877 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
878   return base::Contains(group_map_, group_id);
879 }
880 
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)881 void TransportClientSocketPool::CleanupIdleSockets(
882     bool force,
883     const char* net_log_reason_utf8) {
884   if (idle_socket_count_ == 0)
885     return;
886 
887   // Current time value. Retrieving it once at the function start rather than
888   // inside the inner loop, since it shouldn't change by any meaningful amount.
889   base::TimeTicks now = base::TimeTicks::Now();
890 
891   for (auto i = group_map_.begin(); i != group_map_.end();) {
892     Group* group = i->second;
893     CHECK(group);
894     CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
895     // Delete group if no longer needed.
896     if (group->IsEmpty()) {
897       i = RemoveGroup(i);
898     } else {
899       ++i;
900     }
901   }
902 }
903 
CloseOneIdleSocket()904 bool TransportClientSocketPool::CloseOneIdleSocket() {
905   if (idle_socket_count_ == 0)
906     return false;
907   return CloseOneIdleSocketExceptInGroup(nullptr);
908 }
909 
CloseOneIdleConnectionInHigherLayeredPool()910 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
911   // This pool doesn't have any idle sockets. It's possible that a pool at a
912   // higher layer is holding one of this sockets active, but it's actually idle.
913   // Query the higher layers.
914   for (HigherLayeredPool* higher_pool : higher_pools_) {
915     if (higher_pool->CloseOneIdleConnection())
916       return true;
917   }
918   return false;
919 }
920 
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)921 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
922     bool force,
923     Group* group,
924     const base::TimeTicks& now,
925     const char* net_log_reason_utf8) {
926   // If |force| is true, a reason must be provided.
927   DCHECK(!force || net_log_reason_utf8);
928 
929   auto idle_socket_it = group->mutable_idle_sockets()->begin();
930   while (idle_socket_it != group->idle_sockets().end()) {
931     bool should_clean_up = force;
932     const char* reason_for_closing_socket = net_log_reason_utf8;
933     base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
934                                   ? used_idle_socket_timeout_
935                                   : unused_idle_socket_timeout_;
936 
937     // Timeout errors take precedence over the reason for flushing sockets in
938     // the group, if applicable.
939     if (now - idle_socket_it->start_time >= timeout) {
940       should_clean_up = true;
941       reason_for_closing_socket = kIdleTimeLimitExpired;
942     }
943 
944     // Usability errors take precedence over over other errors.
945     if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
946       should_clean_up = true;
947 
948     if (should_clean_up) {
949       DCHECK(reason_for_closing_socket);
950       idle_socket_it->socket->NetLog().AddEventWithStringParams(
951           NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
952           reason_for_closing_socket);
953       idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
954       DecrementIdleCount();
955     } else {
956       DCHECK(!reason_for_closing_socket);
957       ++idle_socket_it;
958     }
959   }
960 }
961 
GetOrCreateGroup(const GroupId & group_id)962 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
963     const GroupId& group_id) {
964   auto it = group_map_.find(group_id);
965   if (it != group_map_.end())
966     return it->second;
967   Group* group = new Group(group_id, this);
968   group_map_[group_id] = group;
969   return group;
970 }
971 
RemoveGroup(const GroupId & group_id)972 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
973   auto it = group_map_.find(group_id);
974   CHECK(it != group_map_.end());
975 
976   RemoveGroup(it);
977 }
978 
979 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)980 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
981   delete it->second;
982   return group_map_.erase(it);
983 }
984 
985 // static
connect_backup_jobs_enabled()986 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
987   return g_connect_backup_jobs_enabled;
988 }
989 
990 // static
set_connect_backup_jobs_enabled(bool enabled)991 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
992   bool old_value = g_connect_backup_jobs_enabled;
993   g_connect_backup_jobs_enabled = enabled;
994   return old_value;
995 }
996 
IncrementIdleCount()997 void TransportClientSocketPool::IncrementIdleCount() {
998   ++idle_socket_count_;
999 }
1000 
DecrementIdleCount()1001 void TransportClientSocketPool::DecrementIdleCount() {
1002   --idle_socket_count_;
1003 }
1004 
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1005 void TransportClientSocketPool::ReleaseSocket(
1006     const GroupId& group_id,
1007     std::unique_ptr<StreamSocket> socket,
1008     int64_t group_generation) {
1009   auto i = group_map_.find(group_id);
1010   CHECK(i != group_map_.end());
1011 
1012   Group* group = i->second;
1013   CHECK(group);
1014 
1015   CHECK_GT(handed_out_socket_count_, 0);
1016   handed_out_socket_count_--;
1017 
1018   CHECK_GT(group->active_socket_count(), 0);
1019   group->DecrementActiveSocketCount();
1020 
1021   bool can_resuse_socket = false;
1022   std::string_view not_reusable_reason;
1023   if (!socket->IsConnectedAndIdle()) {
1024     if (!socket->IsConnected()) {
1025       not_reusable_reason = kClosedConnectionReturnedToPool;
1026     } else {
1027       not_reusable_reason = kDataReceivedUnexpectedly;
1028     }
1029   } else if (group_generation != group->generation()) {
1030     not_reusable_reason = kSocketGenerationOutOfDate;
1031   } else {
1032     can_resuse_socket = true;
1033   }
1034 
1035   if (can_resuse_socket) {
1036     DCHECK(not_reusable_reason.empty());
1037 
1038     // Add it to the idle list.
1039     AddIdleSocket(std::move(socket), group);
1040     OnAvailableSocketSlot(group_id, group);
1041   } else {
1042     DCHECK(!not_reusable_reason.empty());
1043 
1044     socket->NetLog().AddEventWithStringParams(
1045         NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1046         not_reusable_reason);
1047     if (group->IsEmpty())
1048       RemoveGroup(i);
1049     socket.reset();
1050   }
1051 
1052   CheckForStalledSocketGroups();
1053 }
1054 
CheckForStalledSocketGroups()1055 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1056   // Loop until there's nothing more to do.
1057   while (true) {
1058     // If we have idle sockets, see if we can give one to the top-stalled group.
1059     Group* top_group = nullptr;
1060     GroupId top_group_id;
1061     if (!FindTopStalledGroup(&top_group, &top_group_id))
1062       return;
1063 
1064     if (ReachedMaxSocketsLimit()) {
1065       if (idle_socket_count_ > 0) {
1066         CloseOneIdleSocket();
1067       } else {
1068         // We can't activate more sockets since we're already at our global
1069         // limit.
1070         return;
1071       }
1072     }
1073 
1074     // Note that this may delete top_group.
1075     OnAvailableSocketSlot(top_group_id, top_group);
1076   }
1077 }
1078 
1079 // Search for the highest priority pending request, amongst the groups that
1080 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1081 // the same priority, the winner is based on group hash ordering (and not
1082 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1083 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1084                                                     GroupId* group_id) const {
1085   CHECK(group);
1086   CHECK(group_id);
1087   Group* top_group = nullptr;
1088   const GroupId* top_group_id = nullptr;
1089   bool has_stalled_group = false;
1090   for (const auto& it : group_map_) {
1091     Group* curr_group = it.second;
1092     if (!curr_group->has_unbound_requests())
1093       continue;
1094     if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1095       has_stalled_group = true;
1096       bool has_higher_priority =
1097           !top_group ||
1098           curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1099       if (has_higher_priority) {
1100         top_group = curr_group;
1101         top_group_id = &it.first;
1102       }
1103     }
1104   }
1105 
1106   if (top_group) {
1107     *group = top_group;
1108     *group_id = *top_group_id;
1109   } else {
1110     CHECK(!has_stalled_group);
1111   }
1112   return has_stalled_group;
1113 }
1114 
OnIPAddressChanged()1115 void TransportClientSocketPool::OnIPAddressChanged() {
1116   DCHECK(cleanup_on_ip_address_change_);
1117   FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1118 }
1119 
FlushWithError(int error,const char * net_log_reason_utf8)1120 void TransportClientSocketPool::FlushWithError(
1121     int error,
1122     const char* net_log_reason_utf8) {
1123   CancelAllConnectJobs();
1124   CloseIdleSockets(net_log_reason_utf8);
1125   CancelAllRequestsWithError(error);
1126   for (const auto& group : group_map_) {
1127     group.second->IncrementGeneration();
1128   }
1129 }
1130 
RemoveConnectJob(ConnectJob * job,Group * group)1131 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1132                                                  Group* group) {
1133   CHECK_GT(connecting_socket_count_, 0);
1134   connecting_socket_count_--;
1135 
1136   DCHECK(group);
1137   group->RemoveUnboundJob(job);
1138 }
1139 
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1140 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1141                                                       Group* group) {
1142   DCHECK(base::Contains(group_map_, group_id));
1143   if (group->IsEmpty()) {
1144     RemoveGroup(group_id);
1145   } else if (group->has_unbound_requests()) {
1146     ProcessPendingRequest(group_id, group);
1147   }
1148 }
1149 
ProcessPendingRequest(const GroupId & group_id,Group * group)1150 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1151                                                       Group* group) {
1152   const Request* next_request = group->GetNextUnboundRequest();
1153   DCHECK(next_request);
1154 
1155   // If the group has no idle sockets, and can't make use of an additional slot,
1156   // either because it's at the limit or because it's at the socket per group
1157   // limit, then there's nothing to do.
1158   if (group->idle_sockets().empty() &&
1159       !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1160     return;
1161   }
1162 
1163   int rv =
1164       RequestSocketInternal(group_id, *next_request,
1165                             /*preconnect_done_closure=*/base::OnceClosure());
1166   if (rv != ERR_IO_PENDING) {
1167     std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1168     DCHECK(request);
1169     if (group->IsEmpty())
1170       RemoveGroup(group_id);
1171 
1172     request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1173                                                 rv);
1174     InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1175                             request->socket_tag());
1176   }
1177 }
1178 
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1179 void TransportClientSocketPool::HandOutSocket(
1180     std::unique_ptr<StreamSocket> socket,
1181     ClientSocketHandle::SocketReuseType reuse_type,
1182     const LoadTimingInfo::ConnectTiming& connect_timing,
1183     ClientSocketHandle* handle,
1184     base::TimeDelta idle_time,
1185     Group* group,
1186     const NetLogWithSource& net_log) {
1187   DCHECK(socket);
1188   handle->SetSocket(std::move(socket));
1189   handle->set_reuse_type(reuse_type);
1190   handle->set_idle_time(idle_time);
1191   handle->set_group_generation(group->generation());
1192   handle->set_connect_timing(connect_timing);
1193 
1194   if (reuse_type == StreamSocketHandle::SocketReuseType::kReusedIdle) {
1195     net_log.AddEventWithIntParams(
1196         NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1197         static_cast<int>(idle_time.InMilliseconds()));
1198   }
1199 
1200   net_log.AddEventReferencingSource(
1201       NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1202       handle->socket()->NetLog().source());
1203 
1204   handed_out_socket_count_++;
1205   group->IncrementActiveSocketCount();
1206 }
1207 
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1208 void TransportClientSocketPool::AddIdleSocket(
1209     std::unique_ptr<StreamSocket> socket,
1210     Group* group) {
1211   DCHECK(socket);
1212   IdleSocket idle_socket;
1213   idle_socket.socket = std::move(socket);
1214   idle_socket.start_time = base::TimeTicks::Now();
1215 
1216   group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1217   IncrementIdleCount();
1218 }
1219 
CancelAllConnectJobs()1220 void TransportClientSocketPool::CancelAllConnectJobs() {
1221   for (auto i = group_map_.begin(); i != group_map_.end();) {
1222     Group* group = i->second;
1223     CHECK(group);
1224     connecting_socket_count_ -= group->jobs().size();
1225     group->RemoveAllUnboundJobs();
1226 
1227     // Delete group if no longer needed.
1228     if (group->IsEmpty()) {
1229       i = RemoveGroup(i);
1230     } else {
1231       ++i;
1232     }
1233   }
1234 }
1235 
CancelAllRequestsWithError(int error)1236 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1237   for (auto i = group_map_.begin(); i != group_map_.end();) {
1238     Group* group = i->second;
1239     CHECK(group);
1240 
1241     while (true) {
1242       std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1243       if (!request)
1244         break;
1245       InvokeUserCallbackLater(request->handle(), request->release_callback(),
1246                               error, request->socket_tag());
1247     }
1248 
1249     // Mark bound connect jobs as needing to fail. Can't fail them immediately
1250     // because they may have access to objects owned by the ConnectJob, and
1251     // could access them if a user callback invocation is queued. It would also
1252     // result in the consumer handling two messages at once, which in general
1253     // isn't safe for a lot of code.
1254     group->SetPendingErrorForAllBoundRequests(error);
1255 
1256     // Delete group if no longer needed.
1257     if (group->IsEmpty()) {
1258       i = RemoveGroup(i);
1259     } else {
1260       ++i;
1261     }
1262   }
1263 }
1264 
ReachedMaxSocketsLimit() const1265 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1266   // Each connecting socket will eventually connect and be handed out.
1267   int total =
1268       handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1269   // There can be more sockets than the limit since some requests can ignore
1270   // the limit
1271   if (total < max_sockets_)
1272     return false;
1273   return true;
1274 }
1275 
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1276 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1277     const Group* exception_group) {
1278   CHECK_GT(idle_socket_count_, 0);
1279 
1280   for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1281     Group* group = i->second;
1282     CHECK(group);
1283     if (exception_group == group)
1284       continue;
1285     std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1286 
1287     if (!idle_sockets->empty()) {
1288       idle_sockets->pop_front();
1289       DecrementIdleCount();
1290       if (group->IsEmpty())
1291         RemoveGroup(i);
1292 
1293       return true;
1294     }
1295   }
1296 
1297   return false;
1298 }
1299 
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1300 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1301                                                      int result,
1302                                                      ConnectJob* job) {
1303   DCHECK_NE(ERR_IO_PENDING, result);
1304   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1305   DCHECK_EQ(group, group_map_[group->group_id()]);
1306   DCHECK(result != OK || job->socket() != nullptr);
1307 
1308   // Check if the ConnectJob is already bound to a Request. If so, result is
1309   // returned to that specific request.
1310   std::optional<Group::BoundRequest> bound_request =
1311       group->FindAndRemoveBoundRequestForConnectJob(job);
1312   Request* request = nullptr;
1313   std::unique_ptr<Request> owned_request;
1314   if (bound_request) {
1315     --connecting_socket_count_;
1316 
1317     // If the socket pools were previously flushed with an error, return that
1318     // error to the bound request and discard the socket.
1319     if (bound_request->pending_error != OK) {
1320       InvokeUserCallbackLater(bound_request->request->handle(),
1321                               bound_request->request->release_callback(),
1322                               bound_request->pending_error,
1323                               bound_request->request->socket_tag());
1324       bound_request->request->net_log().EndEventWithNetErrorCode(
1325           NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1326       OnAvailableSocketSlot(group->group_id(), group);
1327       CheckForStalledSocketGroups();
1328       return;
1329     }
1330 
1331     // If the ConnectJob is from a previous generation, add the request back to
1332     // the group, and kick off another request. The socket will be discarded.
1333     if (bound_request->generation != group->generation()) {
1334       group->InsertUnboundRequest(std::move(bound_request->request));
1335       OnAvailableSocketSlot(group->group_id(), group);
1336       CheckForStalledSocketGroups();
1337       return;
1338     }
1339 
1340     request = bound_request->request.get();
1341   } else {
1342     // In this case, RemoveConnectJob(job, _) must be called before exiting this
1343     // method. Otherwise, |job| will be leaked.
1344     owned_request = group->PopNextUnboundRequest();
1345     request = owned_request.get();
1346 
1347     if (!request) {
1348       if (result == OK)
1349         AddIdleSocket(job->PassSocket(), group);
1350       RemoveConnectJob(job, group);
1351       OnAvailableSocketSlot(group->group_id(), group);
1352       CheckForStalledSocketGroups();
1353       return;
1354     }
1355 
1356     LogBoundConnectJobToRequest(job->net_log().source(), *request);
1357   }
1358 
1359   // The case where there's no request is handled above.
1360   DCHECK(request);
1361 
1362   if (result != OK)
1363     request->handle()->SetAdditionalErrorState(job);
1364   if (job->socket()) {
1365     HandOutSocket(job->PassSocket(),
1366                   StreamSocketHandle::SocketReuseType::kUnused,
1367                   job->connect_timing(), request->handle(), base::TimeDelta(),
1368                   group, request->net_log());
1369   }
1370   request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1371                                               result);
1372   InvokeUserCallbackLater(request->handle(), request->release_callback(),
1373                           result, request->socket_tag());
1374   if (!bound_request)
1375     RemoveConnectJob(job, group);
1376   // If no socket was handed out, there's a new socket slot available.
1377   if (!request->handle()->socket()) {
1378     OnAvailableSocketSlot(group->group_id(), group);
1379     CheckForStalledSocketGroups();
1380   }
1381 }
1382 
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1383 void TransportClientSocketPool::OnNeedsProxyAuth(
1384     Group* group,
1385     const HttpResponseInfo& response,
1386     HttpAuthController* auth_controller,
1387     base::OnceClosure restart_with_auth_callback,
1388     ConnectJob* job) {
1389   DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1390   DCHECK_EQ(group, group_map_[group->group_id()]);
1391 
1392   const Request* request = group->BindRequestToConnectJob(job);
1393   // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1394   // failure.
1395   if (!request) {
1396     OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1397     return;
1398   }
1399 
1400   request->proxy_auth_callback().Run(response, auth_controller,
1401                                      std::move(restart_with_auth_callback));
1402 }
1403 
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1404 void TransportClientSocketPool::InvokeUserCallbackLater(
1405     ClientSocketHandle* handle,
1406     CompletionOnceCallback callback,
1407     int rv,
1408     const SocketTag& socket_tag) {
1409   CHECK(!base::Contains(pending_callback_map_, handle));
1410   pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1411   if (rv == OK) {
1412     handle->socket()->ApplySocketTag(socket_tag);
1413   }
1414   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1415       FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1416                                 weak_factory_.GetWeakPtr(),
1417                                 // This is safe as `handle` is checked against a
1418                                 // map to verify it's alive before dereference.
1419                                 // This code path must only be reachable by
1420                                 // `handle`s that have had Init called.
1421                                 base::UnsafeDangling(handle)));
1422 }
1423 
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1424 void TransportClientSocketPool::InvokeUserCallback(
1425     MayBeDangling<ClientSocketHandle> handle) {
1426   auto it = pending_callback_map_.find(handle);
1427 
1428   // Exit if the request has already been cancelled.
1429   if (it == pending_callback_map_.end())
1430     return;
1431 
1432   CHECK(!handle->is_initialized());
1433   CompletionOnceCallback callback = std::move(it->second.callback);
1434   int result = it->second.result;
1435   pending_callback_map_.erase(it);
1436   std::move(callback).Run(result);
1437 }
1438 
TryToCloseSocketsInLayeredPools()1439 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1440   while (IsStalled()) {
1441     // Closing a socket will result in calling back into |this| to use the freed
1442     // socket slot, so nothing else is needed.
1443     if (!CloseOneIdleConnectionInHigherLayeredPool())
1444       return;
1445   }
1446 }
1447 
1448 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1449 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1450                                         const base::TimeTicks& now,
1451                                         const char* net_log_reason_utf8) {
1452   Group* group = it->second;
1453   CHECK(group);
1454   CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1455 
1456   connecting_socket_count_ -= group->jobs().size();
1457   group->RemoveAllUnboundJobs();
1458 
1459   // Otherwise, prevent reuse of existing sockets.
1460   group->IncrementGeneration();
1461 
1462   // Delete group if no longer needed.
1463   if (group->IsEmpty()) {
1464     return RemoveGroup(it);
1465   }
1466   return ++it;
1467 }
1468 
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1469 TransportClientSocketPool::Group::Group(
1470     const GroupId& group_id,
1471     TransportClientSocketPool* client_socket_pool)
1472     : group_id_(group_id),
1473       client_socket_pool_(client_socket_pool),
1474       unbound_requests_(NUM_PRIORITIES) {}
1475 
~Group()1476 TransportClientSocketPool::Group::~Group() {
1477   DCHECK_EQ(0u, never_assigned_job_count());
1478   DCHECK_EQ(0u, unassigned_job_count());
1479   DCHECK(unbound_requests_.empty());
1480   DCHECK(jobs_.empty());
1481   DCHECK(bound_requests_.empty());
1482 }
1483 
OnConnectJobComplete(int result,ConnectJob * job)1484 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1485                                                             ConnectJob* job) {
1486   DCHECK_NE(ERR_IO_PENDING, result);
1487   client_socket_pool_->OnConnectJobComplete(this, result, job);
1488 }
1489 
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1490 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1491     const HttpResponseInfo& response,
1492     HttpAuthController* auth_controller,
1493     base::OnceClosure restart_with_auth_callback,
1494     ConnectJob* job) {
1495   client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1496                                         std::move(restart_with_auth_callback),
1497                                         job);
1498 }
1499 
StartBackupJobTimer(const GroupId & group_id)1500 void TransportClientSocketPool::Group::StartBackupJobTimer(
1501     const GroupId& group_id) {
1502   // Only allow one timer to run at a time.
1503   if (BackupJobTimerIsRunning())
1504     return;
1505 
1506   // Unretained here is okay because |backup_job_timer_| is
1507   // automatically cancelled when it's destroyed.
1508   backup_job_timer_.Start(FROM_HERE,
1509                           client_socket_pool_->ConnectRetryInterval(),
1510                           base::BindOnce(&Group::OnBackupJobTimerFired,
1511                                          base::Unretained(this), group_id));
1512 }
1513 
BackupJobTimerIsRunning() const1514 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1515   return backup_job_timer_.IsRunning();
1516 }
1517 
TryToUseNeverAssignedConnectJob()1518 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1519   SanityCheck();
1520 
1521   if (never_assigned_job_count_ == 0)
1522     return false;
1523   --never_assigned_job_count_;
1524   return true;
1525 }
1526 
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1527 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1528                                               bool is_preconnect) {
1529   SanityCheck();
1530 
1531   if (is_preconnect)
1532     ++never_assigned_job_count_;
1533   jobs_.push_back(std::move(job));
1534   TryToAssignUnassignedJob(jobs_.back().get());
1535 
1536   SanityCheck();
1537 }
1538 
RemoveUnboundJob(ConnectJob * job)1539 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1540     ConnectJob* job) {
1541   SanityCheck();
1542 
1543   // Check that |job| is in the list.
1544   auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1545   CHECK(it != jobs_.end(), base::NotFatalUntil::M130);
1546 
1547   // Check if |job| is in the unassigned jobs list. If so, remove it.
1548   auto it2 = base::ranges::find(unassigned_jobs_, job);
1549   if (it2 != unassigned_jobs_.end()) {
1550     unassigned_jobs_.erase(it2);
1551   } else {
1552     // Otherwise, |job| must be assigned to some Request. Unassign it, then
1553     // try to replace it with another job if possible (either by taking an
1554     // unassigned job or stealing from another request, if any requests after it
1555     // have a job).
1556     RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1557     DCHECK(!request_with_job.is_null());
1558     request_with_job.value()->ReleaseJob();
1559     TryToAssignJobToRequest(request_with_job);
1560   }
1561   std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1562   jobs_.erase(it);
1563 
1564   size_t job_count = jobs_.size();
1565   if (job_count < never_assigned_job_count_)
1566     never_assigned_job_count_ = job_count;
1567 
1568   // If we've got no more jobs for this group, then we no longer need a
1569   // backup job either.
1570   if (jobs_.empty()) {
1571     DCHECK(unassigned_jobs_.empty());
1572     backup_job_timer_.Stop();
1573   }
1574 
1575   SanityCheck();
1576   return owned_job;
1577 }
1578 
OnBackupJobTimerFired(const GroupId & group_id)1579 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1580     const GroupId& group_id) {
1581   // If there are no more jobs pending, there is no work to do.
1582   // If we've done our cleanups correctly, this should not happen.
1583   if (jobs_.empty()) {
1584     NOTREACHED();
1585   }
1586 
1587   // If the old job has already established a connection, don't start a backup
1588   // job. Backup jobs are only for issues establishing the initial TCP
1589   // connection - the timeout they used is tuned for that, and tests expect that
1590   // behavior.
1591   //
1592   // TODO(crbug.com/41440018): Replace both this and the
1593   // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1594   // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1595   // OnHostResolved callback of any pending requests), and the
1596   // HasEstablishedConnection() callback to stop the timer. That should result
1597   // in a more robust, testable API.
1598   if ((*jobs_.begin())->HasEstablishedConnection())
1599     return;
1600 
1601   // If our old job is waiting on DNS, or if we can't create any sockets
1602   // right now due to limits, just reset the timer.
1603   if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1604       !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1605       (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1606     StartBackupJobTimer(group_id);
1607     return;
1608   }
1609 
1610   if (unbound_requests_.empty())
1611     return;
1612 
1613   Request* request = unbound_requests_.FirstMax().value().get();
1614   std::unique_ptr<ConnectJob> owned_backup_job =
1615       client_socket_pool_->CreateConnectJob(
1616           group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1617           request->proxy_annotation_tag(), request->priority(),
1618           request->socket_tag(), this);
1619   owned_backup_job->net_log().AddEvent(
1620       NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1621         return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1622       });
1623   ConnectJob* backup_job = owned_backup_job.get();
1624   AddJob(std::move(owned_backup_job), false);
1625   client_socket_pool_->connecting_socket_count_++;
1626   int rv = backup_job->Connect();
1627   if (rv != ERR_IO_PENDING) {
1628     client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1629   }
1630 }
1631 
SanityCheck() const1632 void TransportClientSocketPool::Group::SanityCheck() const {
1633 #if DCHECK_IS_ON()
1634   DCHECK_LE(never_assigned_job_count(), jobs_.size());
1635   DCHECK_LE(unassigned_job_count(), jobs_.size());
1636 
1637   // Check that |unassigned_jobs_| is empty iff there are at least as many
1638   // requests as jobs.
1639   DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1640 
1641   size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1642 
1643   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1644   for (size_t i = 0; i < unbound_requests_.size();
1645        ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1646     DCHECK(!pointer.is_null());
1647     DCHECK(pointer.value());
1648     // Check that the first |num_assigned_jobs| requests have valid job
1649     // assignments.
1650     if (i < num_assigned_jobs) {
1651       // The request has a job.
1652       ConnectJob* job = pointer.value()->job();
1653       DCHECK(job);
1654       // The request's job is not in |unassigned_jobs_|
1655       DCHECK(!base::Contains(unassigned_jobs_, job));
1656       // The request's job is in |jobs_|
1657       DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1658       // The same job is not assigned to any other request with a job.
1659       RequestQueue::Pointer pointer2 =
1660           unbound_requests_.GetNextTowardsLastMin(pointer);
1661       for (size_t j = i + 1; j < num_assigned_jobs;
1662            ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1663         DCHECK(!pointer2.is_null());
1664         ConnectJob* job2 = pointer2.value()->job();
1665         DCHECK(job2);
1666         DCHECK_NE(job, job2);
1667       }
1668       DCHECK_EQ(pointer.value()->priority(), job->priority());
1669     } else {
1670       // Check that any subsequent requests do not have a job.
1671       DCHECK(!pointer.value()->job());
1672     }
1673   }
1674 
1675   for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1676     // Check that all unassigned jobs are in |jobs_|
1677     ConnectJob* job = *it;
1678     DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1679     // Check that there are no duplicated entries in |unassigned_jobs_|
1680     for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1681       DCHECK_NE(job, *it2);
1682     }
1683 
1684     // Check that no |unassigned_jobs_| are in |bound_requests_|.
1685     DCHECK(!base::Contains(bound_requests_, job,
1686                            [](const BoundRequest& bound_request) {
1687                              return bound_request.connect_job.get();
1688                            }));
1689   }
1690 #endif
1691 }
1692 
RemoveAllUnboundJobs()1693 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1694   SanityCheck();
1695 
1696   // Remove jobs from any requests that have them.
1697   if (!unbound_requests_.empty()) {
1698     for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1699          !pointer.is_null() && pointer.value()->job();
1700          pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1701       pointer.value()->ReleaseJob();
1702     }
1703   }
1704   unassigned_jobs_.clear();
1705   never_assigned_job_count_ = 0;
1706 
1707   // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1708   // removal from `TransportClientSocketPool::group_map_`, so if this check
1709   // fails, `this` has been deleted, likely through some reentrancy issue.
1710   CHECK(client_socket_pool_->HasGroup(group_id_));
1711 
1712   // Delete active jobs.
1713   jobs_.clear();
1714   // Stop backup job timer.
1715   backup_job_timer_.Stop();
1716 
1717   SanityCheck();
1718 }
1719 
ConnectJobCount() const1720 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1721   return bound_requests_.size() + jobs_.size();
1722 }
1723 
GetConnectJobForHandle(const ClientSocketHandle * handle) const1724 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1725     const ClientSocketHandle* handle) const {
1726   // Search through bound requests for |handle|.
1727   for (const auto& bound_pair : bound_requests_) {
1728     if (handle == bound_pair.request->handle())
1729       return bound_pair.connect_job.get();
1730   }
1731 
1732   // Search through the unbound requests that have corresponding jobs for a
1733   // request with |handle|.
1734   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1735        !pointer.is_null() && pointer.value()->job();
1736        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1737     if (pointer.value()->handle() == handle)
1738       return pointer.value()->job();
1739   }
1740 
1741   return nullptr;
1742 }
1743 
InsertUnboundRequest(std::unique_ptr<Request> request)1744 void TransportClientSocketPool::Group::InsertUnboundRequest(
1745     std::unique_ptr<Request> request) {
1746   SanityCheck();
1747 
1748   // Should not have a job because it is not already in |unbound_requests_|
1749   DCHECK(!request->job());
1750   // This value must be cached before we release |request|.
1751   RequestPriority priority = request->priority();
1752 
1753   RequestQueue::Pointer new_position;
1754   if (request->respect_limits() == RespectLimits::DISABLED) {
1755     // Put requests with RespectLimits::DISABLED (which should have
1756     // priority == MAXIMUM_PRIORITY) ahead of other requests with
1757     // MAXIMUM_PRIORITY.
1758     DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1759     new_position =
1760         unbound_requests_.InsertAtFront(std::move(request), priority);
1761   } else {
1762     new_position = unbound_requests_.Insert(std::move(request), priority);
1763   }
1764   DCHECK(!unbound_requests_.empty());
1765 
1766   TryToAssignJobToRequest(new_position);
1767 
1768   SanityCheck();
1769 }
1770 
1771 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1772 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1773   return unbound_requests_.empty() ? nullptr
1774                                    : unbound_requests_.FirstMax().value().get();
1775 }
1776 
1777 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1778 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1779   if (unbound_requests_.empty())
1780     return nullptr;
1781   return RemoveUnboundRequest(unbound_requests_.FirstMax());
1782 }
1783 
1784 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1785 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1786     ClientSocketHandle* handle) {
1787   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1788        !pointer.is_null();
1789        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1790     if (pointer.value()->handle() == handle) {
1791       DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1792                 pointer.value()->priority());
1793       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1794       return request;
1795     }
1796   }
1797   return nullptr;
1798 }
1799 
SetPendingErrorForAllBoundRequests(int pending_error)1800 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1801     int pending_error) {
1802   for (auto& bound_request : bound_requests_) {
1803     // Earlier errors take precedence.
1804     if (bound_request.pending_error == OK)
1805       bound_request.pending_error = pending_error;
1806   }
1807 }
1808 
1809 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1810 TransportClientSocketPool::Group::BindRequestToConnectJob(
1811     ConnectJob* connect_job) {
1812   // Check if |job| is already bound to a Request.
1813   for (const auto& bound_pair : bound_requests_) {
1814     if (bound_pair.connect_job.get() == connect_job)
1815       return bound_pair.request.get();
1816   }
1817 
1818   // If not, try to bind it to a Request.
1819   const Request* request = GetNextUnboundRequest();
1820   // If there are no pending requests, or the highest priority request has no
1821   // callback to handle auth challenges, return nullptr.
1822   if (!request || request->proxy_auth_callback().is_null())
1823     return nullptr;
1824 
1825   // Otherwise, bind the ConnectJob to the Request.
1826   std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1827   DCHECK_EQ(owned_request.get(), request);
1828   std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1829   LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1830   bound_requests_.emplace_back(BoundRequest(
1831       std::move(owned_connect_job), std::move(owned_request), generation()));
1832   return request;
1833 }
1834 
1835 std::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1836 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1837     ConnectJob* connect_job) {
1838   for (auto bound_pair = bound_requests_.begin();
1839        bound_pair != bound_requests_.end(); ++bound_pair) {
1840     if (bound_pair->connect_job.get() != connect_job)
1841       continue;
1842     BoundRequest ret = std::move(*bound_pair);
1843     bound_requests_.erase(bound_pair);
1844     return std::move(ret);
1845   }
1846   return std::nullopt;
1847 }
1848 
1849 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1850 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1851     ClientSocketHandle* client_socket_handle) {
1852   for (auto bound_pair = bound_requests_.begin();
1853        bound_pair != bound_requests_.end(); ++bound_pair) {
1854     if (bound_pair->request->handle() != client_socket_handle)
1855       continue;
1856     std::unique_ptr<Request> request = std::move(bound_pair->request);
1857     bound_requests_.erase(bound_pair);
1858     return request;
1859   }
1860   return nullptr;
1861 }
1862 
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1863 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1864                                                    RequestPriority priority) {
1865   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1866        !pointer.is_null();
1867        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1868     if (pointer.value()->handle() == handle) {
1869       if (pointer.value()->priority() == priority)
1870         return;
1871 
1872       std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1873 
1874       // Requests that ignore limits much be created and remain at the highest
1875       // priority, and should not be reprioritized.
1876       DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1877 
1878       request->set_priority(priority);
1879       InsertUnboundRequest(std::move(request));
1880       return;
1881     }
1882   }
1883 
1884   // This function must be called with a valid ClientSocketHandle.
1885   NOTREACHED();
1886 }
1887 
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1888 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1889     const ClientSocketHandle* handle) const {
1890   SanityCheck();
1891   if (GetConnectJobForHandle(handle))
1892     return true;
1893 
1894   // There's no corresponding ConnectJob. Verify that the handle is at least
1895   // owned by a request.
1896   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1897   for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1898     if (pointer.value()->handle() == handle)
1899       return false;
1900     pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1901   }
1902   NOTREACHED();
1903 }
1904 
BoundRequest()1905 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1906     : pending_error(OK) {}
1907 
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1908 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1909     std::unique_ptr<ConnectJob> connect_job,
1910     std::unique_ptr<Request> request,
1911     int64_t generation)
1912     : connect_job(std::move(connect_job)),
1913       request(std::move(request)),
1914       generation(generation),
1915       pending_error(OK) {}
1916 
1917 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1918     BoundRequest&& other) = default;
1919 
1920 TransportClientSocketPool::Group::BoundRequest&
1921 TransportClientSocketPool::Group::BoundRequest::operator=(
1922     BoundRequest&& other) = default;
1923 
1924 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1925 
1926 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1927 TransportClientSocketPool::Group::RemoveUnboundRequest(
1928     const RequestQueue::Pointer& pointer) {
1929   SanityCheck();
1930 
1931   std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1932   if (request->job()) {
1933     TryToAssignUnassignedJob(request->ReleaseJob());
1934   }
1935   // If there are no more unbound requests, kill the backup timer.
1936   if (unbound_requests_.empty())
1937     backup_job_timer_.Stop();
1938 
1939   SanityCheck();
1940   return request;
1941 }
1942 
1943 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1944 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1945     const ConnectJob* job) const {
1946   SanityCheck();
1947 
1948   for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1949        !pointer.is_null() && pointer.value()->job();
1950        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1951     if (pointer.value()->job() == job)
1952       return pointer;
1953   }
1954   // If a request with the job was not found, it must be in |unassigned_jobs_|.
1955   DCHECK(base::Contains(unassigned_jobs_, job));
1956   return RequestQueue::Pointer();
1957 }
1958 
1959 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1960 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1961   RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1962   size_t i = 0;
1963   for (; !pointer.is_null() && pointer.value()->job();
1964        pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1965     ++i;
1966   }
1967   DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1968   DCHECK(pointer.is_null() || !pointer.value()->job());
1969   return pointer;
1970 }
1971 
TryToAssignUnassignedJob(ConnectJob * job)1972 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1973     ConnectJob* job) {
1974   unassigned_jobs_.push_back(job);
1975   RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1976   if (!first_request_without_job.is_null()) {
1977     first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1978     unassigned_jobs_.pop_back();
1979   }
1980 }
1981 
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1982 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1983     TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1984   DCHECK(!request_pointer.value()->job());
1985   if (!unassigned_jobs_.empty()) {
1986     request_pointer.value()->AssignJob(unassigned_jobs_.front());
1987     unassigned_jobs_.pop_front();
1988     return;
1989   }
1990 
1991   // If the next request in the queue does not have a job, then there are no
1992   // requests with a job after |request_pointer| from which we can steal.
1993   RequestQueue::Pointer next_request =
1994       unbound_requests_.GetNextTowardsLastMin(request_pointer);
1995   if (next_request.is_null() || !next_request.value()->job())
1996     return;
1997 
1998   // Walk down the queue to find the last request with a job.
1999   RequestQueue::Pointer cur = next_request;
2000   RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2001   while (!next.is_null() && next.value()->job()) {
2002     cur = next;
2003     next = unbound_requests_.GetNextTowardsLastMin(next);
2004   }
2005   // Steal the job from the last request with a job.
2006   TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2007 }
2008 
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2009 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2010     TransportClientSocketPool::Request* source,
2011     TransportClientSocketPool::Request* dest) {
2012   DCHECK(!dest->job());
2013   DCHECK(source->job());
2014   dest->AssignJob(source->ReleaseJob());
2015 }
2016 
2017 }  // namespace net
2018