1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/transport_client_socket_pool.h"
6
7 #include <string_view>
8 #include <utility>
9
10 #include "base/auto_reset.h"
11 #include "base/barrier_closure.h"
12 #include "base/check_op.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/contains.h"
15 #include "base/format_macros.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback_helpers.h"
18 #include "base/location.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/not_fatal_until.h"
22 #include "base/notreached.h"
23 #include "base/ranges/algorithm.h"
24 #include "base/strings/string_util.h"
25 #include "base/task/single_thread_task_runner.h"
26 #include "base/time/time.h"
27 #include "base/values.h"
28 #include "net/base/host_port_pair.h"
29 #include "net/base/net_errors.h"
30 #include "net/base/proxy_chain.h"
31 #include "net/base/proxy_server.h"
32 #include "net/log/net_log.h"
33 #include "net/log/net_log_event_type.h"
34 #include "net/log/net_log_source.h"
35 #include "net/socket/connect_job_factory.h"
36 #include "net/traffic_annotation/network_traffic_annotation.h"
37 #include "url/gurl.h"
38
39 namespace net {
40
41 namespace {
42
43 // Indicate whether or not we should establish a new transport layer connection
44 // after a certain timeout has passed without receiving an ACK.
45 bool g_connect_backup_jobs_enabled = true;
46
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)47 base::Value::Dict NetLogCreateConnectJobParams(
48 bool backup_job,
49 const ClientSocketPool::GroupId* group_id) {
50 return base::Value::Dict()
51 .Set("backup_job", backup_job)
52 .Set("group_id", group_id->ToString());
53 }
54
55 } // namespace
56
57 const char TransportClientSocketPool::kCertDatabaseChanged[] =
58 "Cert database changed";
59 const char TransportClientSocketPool::kCertVerifierChanged[] =
60 "Cert verifier changed";
61 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
62 "Connection was closed when it was returned to the pool";
63 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
64 "Data received unexpectedly";
65 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
66 "Idle time limit expired";
67 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
68 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
69 "Remote side closed connection";
70 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
71 "Socket generation out of date";
72 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
73 "Socket pool destroyed";
74 const char TransportClientSocketPool::kSslConfigChanged[] =
75 "SSL configuration changed";
76
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)77 TransportClientSocketPool::Request::Request(
78 ClientSocketHandle* handle,
79 CompletionOnceCallback callback,
80 const ProxyAuthCallback& proxy_auth_callback,
81 RequestPriority priority,
82 const SocketTag& socket_tag,
83 RespectLimits respect_limits,
84 Flags flags,
85 scoped_refptr<SocketParams> socket_params,
86 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
87 const NetLogWithSource& net_log)
88 : handle_(handle),
89 callback_(std::move(callback)),
90 proxy_auth_callback_(proxy_auth_callback),
91 priority_(priority),
92 respect_limits_(respect_limits),
93 flags_(flags),
94 socket_params_(std::move(socket_params)),
95 proxy_annotation_tag_(proxy_annotation_tag),
96 net_log_(net_log),
97 socket_tag_(socket_tag) {
98 if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
99 DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
100 }
101
102 TransportClientSocketPool::Request::~Request() = default;
103
AssignJob(ConnectJob * job)104 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
105 DCHECK(job);
106 DCHECK(!job_);
107 job_ = job;
108 if (job_->priority() != priority_)
109 job_->ChangePriority(priority_);
110 }
111
ReleaseJob()112 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
113 DCHECK(job_);
114 ConnectJob* job = job_;
115 job_ = nullptr;
116 return job;
117 }
118
119 struct TransportClientSocketPool::IdleSocket {
120 // An idle socket can't be used if it is disconnected or has been used
121 // before and has received data unexpectedly (hence no longer idle). The
122 // unread data would be mistaken for the beginning of the next response if
123 // we were to use the socket for a new request.
124 //
125 // Note that a socket that has never been used before (like a preconnected
126 // socket) may be used even with unread data. This may be, e.g., a SPDY
127 // SETTINGS frame.
128 //
129 // If the socket is not usable, |net_log_reason_utf8| is set to a string
130 // indicating why the socket is not usable.
131 bool IsUsable(const char** net_log_reason_utf8) const;
132
133 std::unique_ptr<StreamSocket> socket;
134 base::TimeTicks start_time;
135 };
136
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)137 TransportClientSocketPool::TransportClientSocketPool(
138 int max_sockets,
139 int max_sockets_per_group,
140 base::TimeDelta unused_idle_socket_timeout,
141 const ProxyChain& proxy_chain,
142 bool is_for_websockets,
143 const CommonConnectJobParams* common_connect_job_params,
144 bool cleanup_on_ip_address_change)
145 : TransportClientSocketPool(max_sockets,
146 max_sockets_per_group,
147 unused_idle_socket_timeout,
148 ClientSocketPool::used_idle_socket_timeout(),
149 proxy_chain,
150 is_for_websockets,
151 common_connect_job_params,
152 cleanup_on_ip_address_change,
153 std::make_unique<ConnectJobFactory>(),
154 common_connect_job_params->ssl_client_context,
155 /*connect_backup_jobs_enabled=*/true) {}
156
~TransportClientSocketPool()157 TransportClientSocketPool::~TransportClientSocketPool() {
158 // Clean up any idle sockets and pending connect jobs. Assert that we have no
159 // remaining active sockets or pending requests. They should have all been
160 // cleaned up prior to |this| being destroyed.
161 FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
162 DCHECK(group_map_.empty());
163 DCHECK(pending_callback_map_.empty());
164 DCHECK_EQ(0, connecting_socket_count_);
165 DCHECK_EQ(0, handed_out_socket_count_);
166 CHECK(higher_pools_.empty());
167
168 if (ssl_client_context_)
169 ssl_client_context_->RemoveObserver(this);
170
171 if (cleanup_on_ip_address_change_)
172 NetworkChangeNotifier::RemoveIPAddressObserver(this);
173 }
174
175 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)176 TransportClientSocketPool::CreateForTesting(
177 int max_sockets,
178 int max_sockets_per_group,
179 base::TimeDelta unused_idle_socket_timeout,
180 base::TimeDelta used_idle_socket_timeout,
181 const ProxyChain& proxy_chain,
182 bool is_for_websockets,
183 const CommonConnectJobParams* common_connect_job_params,
184 std::unique_ptr<ConnectJobFactory> connect_job_factory,
185 SSLClientContext* ssl_client_context,
186 bool connect_backup_jobs_enabled) {
187 return base::WrapUnique<TransportClientSocketPool>(
188 new TransportClientSocketPool(
189 max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
190 used_idle_socket_timeout, proxy_chain, is_for_websockets,
191 common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
192 std::move(connect_job_factory), ssl_client_context,
193 connect_backup_jobs_enabled));
194 }
195
CallbackResultPair()196 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
197 : result(OK) {}
198
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)199 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
200 CompletionOnceCallback callback_in,
201 int result_in)
202 : callback(std::move(callback_in)), result(result_in) {}
203
204 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
205 TransportClientSocketPool::CallbackResultPair&& other) = default;
206
207 TransportClientSocketPool::CallbackResultPair&
208 TransportClientSocketPool::CallbackResultPair::operator=(
209 TransportClientSocketPool::CallbackResultPair&& other) = default;
210
211 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
212
IsStalled() const213 bool TransportClientSocketPool::IsStalled() const {
214 // If fewer than |max_sockets_| are in use, then clearly |this| is not
215 // stalled.
216 if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
217 return false;
218 // So in order to be stalled, |this| must be using at least |max_sockets_| AND
219 // |this| must have a request that is actually stalled on the global socket
220 // limit. To find such a request, look for a group that has more requests
221 // than jobs AND where the number of sockets is less than
222 // |max_sockets_per_group_|. (If the number of sockets is equal to
223 // |max_sockets_per_group_|, then the request is stalled on the group limit,
224 // which does not count.)
225 for (const auto& it : group_map_) {
226 if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
227 return true;
228 }
229 return false;
230 }
231
AddHigherLayeredPool(HigherLayeredPool * higher_pool)232 void TransportClientSocketPool::AddHigherLayeredPool(
233 HigherLayeredPool* higher_pool) {
234 CHECK(higher_pool);
235 CHECK(!base::Contains(higher_pools_, higher_pool));
236 higher_pools_.insert(higher_pool);
237 }
238
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)239 void TransportClientSocketPool::RemoveHigherLayeredPool(
240 HigherLayeredPool* higher_pool) {
241 CHECK(higher_pool);
242 CHECK(base::Contains(higher_pools_, higher_pool));
243 higher_pools_.erase(higher_pool);
244 }
245
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)246 int TransportClientSocketPool::RequestSocket(
247 const GroupId& group_id,
248 scoped_refptr<SocketParams> params,
249 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
250 RequestPriority priority,
251 const SocketTag& socket_tag,
252 RespectLimits respect_limits,
253 ClientSocketHandle* handle,
254 CompletionOnceCallback callback,
255 const ProxyAuthCallback& proxy_auth_callback,
256 const NetLogWithSource& net_log) {
257 CHECK(callback);
258 CHECK(handle);
259
260 NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
261
262 std::unique_ptr<Request> request = std::make_unique<Request>(
263 handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
264 respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
265
266 // Cleanup any timed-out idle sockets.
267 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
268
269 request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
270
271 int rv =
272 RequestSocketInternal(group_id, *request,
273 /*preconnect_done_closure=*/base::OnceClosure());
274 if (rv != ERR_IO_PENDING) {
275 if (rv == OK) {
276 request->handle()->socket()->ApplySocketTag(request->socket_tag());
277 }
278 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
279 rv);
280 CHECK(!request->handle()->is_initialized());
281 request.reset();
282 } else {
283 Group* group = GetOrCreateGroup(group_id);
284 group->InsertUnboundRequest(std::move(request));
285 // Have to do this asynchronously, as closing sockets in higher level pools
286 // call back in to |this|, which will cause all sorts of fun and exciting
287 // re-entrancy issues if the socket pool is doing something else at the
288 // time.
289 if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
290 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
291 FROM_HERE,
292 base::BindOnce(
293 &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
294 weak_factory_.GetWeakPtr()));
295 }
296 }
297 return rv;
298 }
299
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const std::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)300 int TransportClientSocketPool::RequestSockets(
301 const GroupId& group_id,
302 scoped_refptr<SocketParams> params,
303 const std::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
304 int num_sockets,
305 CompletionOnceCallback callback,
306 const NetLogWithSource& net_log) {
307 // TODO(eroman): Split out the host and port parameters.
308 net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
309 [&] { return NetLogGroupIdParams(group_id); });
310
311 Request request(nullptr /* no handle */, CompletionOnceCallback(),
312 ProxyAuthCallback(), IDLE, SocketTag(),
313 RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
314 proxy_annotation_tag, net_log);
315
316 // Cleanup any timed-out idle sockets.
317 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
318
319 if (num_sockets > max_sockets_per_group_) {
320 num_sockets = max_sockets_per_group_;
321 }
322
323 request.net_log().BeginEventWithIntParams(
324 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
325 num_sockets);
326
327 Group* group = GetOrCreateGroup(group_id);
328
329 // RequestSocketsInternal() may delete the group.
330 bool deleted_group = false;
331
332 int rv = OK;
333
334 base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
335 num_sockets,
336 base::BindOnce(
337 [](CompletionOnceCallback callback) {
338 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
339 FROM_HERE, base::BindOnce(std::move(callback), OK));
340 },
341 std::move(callback)));
342 int pending_connect_job_count = 0;
343 for (int num_iterations_left = num_sockets;
344 group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
345 num_iterations_left--) {
346 rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
347 if (rv == ERR_IO_PENDING) {
348 ++pending_connect_job_count;
349 }
350 if (rv < 0 && rv != ERR_IO_PENDING) {
351 // We're encountering a synchronous error. Give up.
352 if (!base::Contains(group_map_, group_id))
353 deleted_group = true;
354 break;
355 }
356 if (!base::Contains(group_map_, group_id)) {
357 // Unexpected. The group should only be getting deleted on synchronous
358 // error.
359 NOTREACHED();
360 }
361 }
362
363 if (!deleted_group && group->IsEmpty())
364 RemoveGroup(group_id);
365
366 if (rv == ERR_IO_PENDING)
367 rv = OK;
368 request.net_log().EndEventWithNetErrorCode(
369 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
370
371 // Currently we don't handle preconnect errors. So this method returns OK even
372 // if failed to preconnect.
373 // TODO(crbug.com/40843081): Consider support error handlings when needed.
374 if (pending_connect_job_count == 0)
375 return OK;
376 for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
377 preconnect_done_closure.Run();
378 }
379
380 return ERR_IO_PENDING;
381 }
382
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)383 int TransportClientSocketPool::RequestSocketInternal(
384 const GroupId& group_id,
385 const Request& request,
386 base::OnceClosure preconnect_done_closure) {
387 #if DCHECK_IS_ON()
388 DCHECK(!request_in_process_);
389 base::AutoReset<bool> auto_reset(&request_in_process_, true);
390 #endif // DCHECK_IS_ON()
391
392 ClientSocketHandle* const handle = request.handle();
393 const bool preconnecting = !handle;
394 DCHECK_EQ(preconnecting, !!preconnect_done_closure);
395
396 Group* group = nullptr;
397 auto group_it = group_map_.find(group_id);
398 if (group_it != group_map_.end()) {
399 group = group_it->second;
400
401 if (!(request.flags() & NO_IDLE_SOCKETS)) {
402 // Try to reuse a socket.
403 if (AssignIdleSocketToRequest(request, group))
404 return OK;
405 }
406
407 // If there are more ConnectJobs than pending requests, don't need to do
408 // anything. Can just wait for the extra job to connect, and then assign it
409 // to the request.
410 if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
411 return ERR_IO_PENDING;
412
413 // Can we make another active socket now?
414 if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
415 request.respect_limits() == RespectLimits::ENABLED) {
416 // TODO(willchan): Consider whether or not we need to close a socket in a
417 // higher layered group. I don't think this makes sense since we would
418 // just reuse that socket then if we needed one and wouldn't make it down
419 // to this layer.
420 request.net_log().AddEvent(
421 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
422 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
423 }
424 }
425
426 if (ReachedMaxSocketsLimit() &&
427 request.respect_limits() == RespectLimits::ENABLED) {
428 // NOTE(mmenke): Wonder if we really need different code for each case
429 // here. Only reason for them now seems to be preconnects.
430 if (idle_socket_count_ > 0) {
431 // There's an idle socket in this pool. Either that's because there's
432 // still one in this group, but we got here due to preconnecting
433 // bypassing idle sockets, or because there's an idle socket in another
434 // group.
435 bool closed = CloseOneIdleSocketExceptInGroup(group);
436 if (preconnecting && !closed)
437 return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
438 } else {
439 // We could check if we really have a stalled group here, but it
440 // requires a scan of all groups, so just flip a flag here, and do the
441 // check later.
442 request.net_log().AddEvent(
443 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
444 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
445 }
446 }
447
448 // We couldn't find a socket to reuse, and there's space to allocate one,
449 // so allocate and connect a new one.
450 group = GetOrCreateGroup(group_id);
451 std::unique_ptr<ConnectJob> connect_job(
452 CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
453 request.proxy_annotation_tag(), request.priority(),
454 request.socket_tag(), group));
455 connect_job->net_log().AddEvent(
456 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
457 return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
458 });
459
460 int rv = connect_job->Connect();
461 if (rv == ERR_IO_PENDING) {
462 if (preconnect_done_closure) {
463 DCHECK(preconnecting);
464 connect_job->set_done_closure(std::move(preconnect_done_closure));
465 }
466 // If we didn't have any sockets in this group, set a timer for potentially
467 // creating a new one. If the SYN is lost, this backup socket may complete
468 // before the slow socket, improving end user latency.
469 if (connect_backup_jobs_enabled_ && group->IsEmpty())
470 group->StartBackupJobTimer(group_id);
471 group->AddJob(std::move(connect_job), preconnecting);
472 connecting_socket_count_++;
473 return rv;
474 }
475
476 LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
477 if (preconnecting) {
478 if (rv == OK)
479 AddIdleSocket(connect_job->PassSocket(), group);
480 } else {
481 DCHECK(handle);
482 if (rv != OK)
483 handle->SetAdditionalErrorState(connect_job.get());
484 std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
485 if (socket) {
486 HandOutSocket(std::move(socket),
487 StreamSocketHandle::SocketReuseType::kUnused,
488 connect_job->connect_timing(), handle,
489 /*time_idle=*/base::TimeDelta(), group, request.net_log());
490 }
491 }
492 if (group->IsEmpty())
493 RemoveGroup(group_id);
494
495 return rv;
496 }
497
AssignIdleSocketToRequest(const Request & request,Group * group)498 bool TransportClientSocketPool::AssignIdleSocketToRequest(
499 const Request& request,
500 Group* group) {
501 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
502 auto idle_socket_it = idle_sockets->end();
503
504 // Iterate through the idle sockets forwards (oldest to newest)
505 // * Delete any disconnected ones.
506 // * If we find a used idle socket, assign to |idle_socket|. At the end,
507 // the |idle_socket_it| will be set to the newest used idle socket.
508 for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
509 // Check whether socket is usable. Note that it's unlikely that the socket
510 // is not usable because this function is always invoked after a
511 // reusability check, but in theory socket can be closed asynchronously.
512 const char* net_log_reason_utf8;
513 if (!it->IsUsable(&net_log_reason_utf8)) {
514 it->socket->NetLog().AddEventWithStringParams(
515 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
516 net_log_reason_utf8);
517 DecrementIdleCount();
518 it = idle_sockets->erase(it);
519 continue;
520 }
521
522 if (it->socket->WasEverUsed()) {
523 // We found one we can reuse!
524 idle_socket_it = it;
525 }
526
527 ++it;
528 }
529
530 // If we haven't found an idle socket, that means there are no used idle
531 // sockets. Pick the oldest (first) idle socket (FIFO).
532
533 if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
534 idle_socket_it = idle_sockets->begin();
535
536 if (idle_socket_it != idle_sockets->end()) {
537 DecrementIdleCount();
538 base::TimeDelta idle_time =
539 base::TimeTicks::Now() - idle_socket_it->start_time;
540 std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
541 idle_sockets->erase(idle_socket_it);
542 // TODO(davidben): If |idle_time| is under some low watermark, consider
543 // treating as UNUSED rather than UNUSED_IDLE. This will avoid
544 // HttpNetworkTransaction retrying on some errors.
545 ClientSocketHandle::SocketReuseType reuse_type =
546 socket->WasEverUsed()
547 ? StreamSocketHandle::SocketReuseType::kReusedIdle
548 : StreamSocketHandle::SocketReuseType::kUnusedIdle;
549
550 HandOutSocket(std::move(socket), reuse_type,
551 LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552 group, request.net_log());
553 return true;
554 }
555
556 return false;
557 }
558
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561 const NetLogSource& connect_job_source,
562 const Request& request) {
563 request.net_log().AddEventReferencingSource(
564 NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568 ClientSocketHandle* handle,
569 RequestPriority priority) {
570 auto group_it = group_map_.find(group_id);
571 if (group_it == group_map_.end()) {
572 DCHECK(base::Contains(pending_callback_map_, handle));
573 // The Request has already completed and been destroyed; nothing to
574 // reprioritize.
575 return;
576 }
577
578 group_it->second->SetPriority(handle, priority);
579 }
580
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582 ClientSocketHandle* handle,
583 bool cancel_connect_job) {
584 auto callback_it = pending_callback_map_.find(handle);
585 if (callback_it != pending_callback_map_.end()) {
586 int result = callback_it->second.result;
587 pending_callback_map_.erase(callback_it);
588 std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589 if (socket) {
590 if (result != OK) {
591 socket->Disconnect();
592 } else if (cancel_connect_job) {
593 // Close the socket if |cancel_connect_job| is true and there are no
594 // other pending requests.
595 Group* group = GetOrCreateGroup(group_id);
596 if (group->unbound_request_count() == 0)
597 socket->Disconnect();
598 }
599 ReleaseSocket(handle->group_id(), std::move(socket),
600 handle->group_generation());
601 }
602 return;
603 }
604
605 CHECK(base::Contains(group_map_, group_id));
606 Group* group = GetOrCreateGroup(group_id);
607
608 std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609 if (request) {
610 --connecting_socket_count_;
611 OnAvailableSocketSlot(group_id, group);
612 CheckForStalledSocketGroups();
613 return;
614 }
615
616 // Search |unbound_requests_| for matching handle.
617 request = group->FindAndRemoveUnboundRequest(handle);
618 if (request) {
619 request->net_log().AddEvent(NetLogEventType::CANCELLED);
620 request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621
622 // Let the job run, unless |cancel_connect_job| is true, or we're at the
623 // socket limit and there are no other requests waiting on the job.
624 bool reached_limit = ReachedMaxSocketsLimit();
625 if (group->jobs().size() > group->unbound_request_count() &&
626 (cancel_connect_job || reached_limit)) {
627 RemoveConnectJob(group->jobs().begin()->get(), group);
628 if (group->IsEmpty())
629 RemoveGroup(group->group_id());
630 if (reached_limit)
631 CheckForStalledSocketGroups();
632 }
633 }
634 }
635
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637 const char* net_log_reason_utf8) {
638 CleanupIdleSockets(true, net_log_reason_utf8);
639 DCHECK_EQ(0, idle_socket_count_);
640 }
641
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643 const GroupId& group_id,
644 const char* net_log_reason_utf8) {
645 if (idle_socket_count_ == 0)
646 return;
647 auto it = group_map_.find(group_id);
648 if (it == group_map_.end())
649 return;
650 CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651 net_log_reason_utf8);
652 if (it->second->IsEmpty())
653 RemoveGroup(it);
654 }
655
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657 return idle_socket_count_;
658 }
659
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661 const GroupId& group_id) const {
662 auto i = group_map_.find(group_id);
663 CHECK(i != group_map_.end());
664
665 return i->second->idle_sockets().size();
666 }
667
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669 const GroupId& group_id,
670 const ClientSocketHandle* handle) const {
671 if (base::Contains(pending_callback_map_, handle))
672 return LOAD_STATE_CONNECTING;
673
674 auto group_it = group_map_.find(group_id);
675 if (group_it == group_map_.end()) {
676 // TODO(mmenke): This is actually reached in the wild, for unknown reasons.
677 // Would be great to understand why, and if it's a bug, fix it. If not,
678 // should have a test for that case.
679 NOTREACHED();
680 }
681
682 const Group& group = *group_it->second;
683 ConnectJob* job = group.GetConnectJobForHandle(handle);
684 if (job)
685 return job->GetLoadState();
686
687 if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
688 return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
689 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
690 }
691
GetInfoAsValue(const std::string & name,const std::string & type) const692 base::Value TransportClientSocketPool::GetInfoAsValue(
693 const std::string& name,
694 const std::string& type) const {
695 // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
696 auto dict = base::Value::Dict()
697 .Set("name", name)
698 .Set("type", type)
699 .Set("handed_out_socket_count", handed_out_socket_count_)
700 .Set("connecting_socket_count", connecting_socket_count_)
701 .Set("idle_socket_count", idle_socket_count_)
702 .Set("max_socket_count", max_sockets_)
703 .Set("max_sockets_per_group", max_sockets_per_group_);
704
705 if (group_map_.empty())
706 return base::Value(std::move(dict));
707
708 base::Value::Dict all_groups_dict;
709 for (const auto& entry : group_map_) {
710 const Group* group = entry.second;
711
712 base::Value::List idle_socket_list;
713 for (const auto& idle_socket : group->idle_sockets()) {
714 int source_id = idle_socket.socket->NetLog().source().id;
715 idle_socket_list.Append(source_id);
716 }
717
718 base::Value::List connect_jobs_list;
719 for (const auto& job : group->jobs()) {
720 int source_id = job->net_log().source().id;
721 connect_jobs_list.Append(source_id);
722 }
723
724 auto group_dict =
725 base::Value::Dict()
726 .Set("pending_request_count",
727 static_cast<int>(group->unbound_request_count()))
728 .Set("active_socket_count", group->active_socket_count())
729 .Set("idle_sockets", std::move(idle_socket_list))
730 .Set("connect_jobs", std::move(connect_jobs_list))
731 .Set("is_stalled",
732 group->CanUseAdditionalSocketSlot(max_sockets_per_group_))
733 .Set("backup_job_timer_is_running",
734 group->BackupJobTimerIsRunning());
735
736 if (group->has_unbound_requests()) {
737 group_dict.Set("top_pending_priority",
738 RequestPriorityToString(group->TopPendingPriority()));
739 }
740
741 all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
742 }
743 dict.Set("groups", std::move(all_groups_dict));
744 return base::Value(std::move(dict));
745 }
746
HasActiveSocket(const GroupId & group_id) const747 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
748 return HasGroup(group_id);
749 }
750
IsUsable(const char ** net_log_reason_utf8) const751 bool TransportClientSocketPool::IdleSocket::IsUsable(
752 const char** net_log_reason_utf8) const {
753 DCHECK(net_log_reason_utf8);
754 if (socket->WasEverUsed()) {
755 if (!socket->IsConnectedAndIdle()) {
756 if (!socket->IsConnected()) {
757 *net_log_reason_utf8 = kRemoteSideClosedConnection;
758 } else {
759 *net_log_reason_utf8 = kDataReceivedUnexpectedly;
760 }
761 return false;
762 }
763 return true;
764 }
765
766 if (!socket->IsConnected()) {
767 *net_log_reason_utf8 = kRemoteSideClosedConnection;
768 return false;
769 }
770 return true;
771 }
772
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)773 TransportClientSocketPool::TransportClientSocketPool(
774 int max_sockets,
775 int max_sockets_per_group,
776 base::TimeDelta unused_idle_socket_timeout,
777 base::TimeDelta used_idle_socket_timeout,
778 const ProxyChain& proxy_chain,
779 bool is_for_websockets,
780 const CommonConnectJobParams* common_connect_job_params,
781 bool cleanup_on_ip_address_change,
782 std::unique_ptr<ConnectJobFactory> connect_job_factory,
783 SSLClientContext* ssl_client_context,
784 bool connect_backup_jobs_enabled)
785 : ClientSocketPool(is_for_websockets,
786 common_connect_job_params,
787 std::move(connect_job_factory)),
788 max_sockets_(max_sockets),
789 max_sockets_per_group_(max_sockets_per_group),
790 unused_idle_socket_timeout_(unused_idle_socket_timeout),
791 used_idle_socket_timeout_(used_idle_socket_timeout),
792 proxy_chain_(proxy_chain),
793 cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
794 connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
795 g_connect_backup_jobs_enabled),
796 ssl_client_context_(ssl_client_context) {
797 DCHECK_LE(0, max_sockets_per_group);
798 DCHECK_LE(max_sockets_per_group, max_sockets);
799
800 if (cleanup_on_ip_address_change_)
801 NetworkChangeNotifier::AddIPAddressObserver(this);
802
803 if (ssl_client_context_)
804 ssl_client_context_->AddObserver(this);
805 }
806
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)807 void TransportClientSocketPool::OnSSLConfigChanged(
808 SSLClientContext::SSLConfigChangeType change_type) {
809 const char* message = nullptr;
810 // When the SSL config or cert verifier config changes, flush all idle
811 // sockets so they won't get re-used, and allow any active sockets to finish,
812 // but don't put them back in the socket pool.
813 switch (change_type) {
814 case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
815 message = kNetworkChanged;
816 break;
817 case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
818 message = kCertDatabaseChanged;
819 break;
820 case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
821 message = kCertVerifierChanged;
822 break;
823 };
824
825 base::TimeTicks now = base::TimeTicks::Now();
826 for (auto it = group_map_.begin(); it != group_map_.end();) {
827 it = RefreshGroup(it, now, message);
828 }
829 CheckForStalledSocketGroups();
830 }
831
832 // TODO(crbug.com/40181080): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)833 void TransportClientSocketPool::OnSSLConfigForServersChanged(
834 const base::flat_set<HostPortPair>& servers) {
835 // Current time value. Retrieving it once at the function start rather than
836 // inside the inner loop, since it shouldn't change by any meaningful amount.
837 //
838 // TODO(davidben): This value is not actually needed because
839 // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
840 // interfaces so the parameter is not necessary.
841 base::TimeTicks now = base::TimeTicks::Now();
842
843 // If the proxy chain includes a server from `servers` and uses SSL settings
844 // (HTTPS or QUIC), refresh every group.
845 bool proxy_matches = false;
846 for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
847 if (proxy_server.is_secure_http_like() &&
848 servers.contains(proxy_server.host_port_pair())) {
849 proxy_matches = true;
850 }
851 }
852
853 bool refreshed_any = false;
854 for (auto it = group_map_.begin(); it != group_map_.end();) {
855 if (proxy_matches ||
856 (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
857 servers.contains(
858 HostPortPair::FromSchemeHostPort(it->first.destination())))) {
859 refreshed_any = true;
860 // Note this call may destroy the group and invalidate |to_refresh|.
861 it = RefreshGroup(it, now, kSslConfigChanged);
862 } else {
863 ++it;
864 }
865 }
866
867 if (refreshed_any) {
868 // Check to see if any group can use the freed up socket slots. It would be
869 // more efficient to give the slots to the refreshed groups, if the still
870 // exists and need them, but this should be rare enough that it doesn't
871 // matter. This will also make sure the slots are given to the group with
872 // the highest priority request without an assigned ConnectJob.
873 CheckForStalledSocketGroups();
874 }
875 }
876
HasGroup(const GroupId & group_id) const877 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
878 return base::Contains(group_map_, group_id);
879 }
880
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)881 void TransportClientSocketPool::CleanupIdleSockets(
882 bool force,
883 const char* net_log_reason_utf8) {
884 if (idle_socket_count_ == 0)
885 return;
886
887 // Current time value. Retrieving it once at the function start rather than
888 // inside the inner loop, since it shouldn't change by any meaningful amount.
889 base::TimeTicks now = base::TimeTicks::Now();
890
891 for (auto i = group_map_.begin(); i != group_map_.end();) {
892 Group* group = i->second;
893 CHECK(group);
894 CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
895 // Delete group if no longer needed.
896 if (group->IsEmpty()) {
897 i = RemoveGroup(i);
898 } else {
899 ++i;
900 }
901 }
902 }
903
CloseOneIdleSocket()904 bool TransportClientSocketPool::CloseOneIdleSocket() {
905 if (idle_socket_count_ == 0)
906 return false;
907 return CloseOneIdleSocketExceptInGroup(nullptr);
908 }
909
CloseOneIdleConnectionInHigherLayeredPool()910 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
911 // This pool doesn't have any idle sockets. It's possible that a pool at a
912 // higher layer is holding one of this sockets active, but it's actually idle.
913 // Query the higher layers.
914 for (HigherLayeredPool* higher_pool : higher_pools_) {
915 if (higher_pool->CloseOneIdleConnection())
916 return true;
917 }
918 return false;
919 }
920
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)921 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
922 bool force,
923 Group* group,
924 const base::TimeTicks& now,
925 const char* net_log_reason_utf8) {
926 // If |force| is true, a reason must be provided.
927 DCHECK(!force || net_log_reason_utf8);
928
929 auto idle_socket_it = group->mutable_idle_sockets()->begin();
930 while (idle_socket_it != group->idle_sockets().end()) {
931 bool should_clean_up = force;
932 const char* reason_for_closing_socket = net_log_reason_utf8;
933 base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
934 ? used_idle_socket_timeout_
935 : unused_idle_socket_timeout_;
936
937 // Timeout errors take precedence over the reason for flushing sockets in
938 // the group, if applicable.
939 if (now - idle_socket_it->start_time >= timeout) {
940 should_clean_up = true;
941 reason_for_closing_socket = kIdleTimeLimitExpired;
942 }
943
944 // Usability errors take precedence over over other errors.
945 if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
946 should_clean_up = true;
947
948 if (should_clean_up) {
949 DCHECK(reason_for_closing_socket);
950 idle_socket_it->socket->NetLog().AddEventWithStringParams(
951 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
952 reason_for_closing_socket);
953 idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
954 DecrementIdleCount();
955 } else {
956 DCHECK(!reason_for_closing_socket);
957 ++idle_socket_it;
958 }
959 }
960 }
961
GetOrCreateGroup(const GroupId & group_id)962 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
963 const GroupId& group_id) {
964 auto it = group_map_.find(group_id);
965 if (it != group_map_.end())
966 return it->second;
967 Group* group = new Group(group_id, this);
968 group_map_[group_id] = group;
969 return group;
970 }
971
RemoveGroup(const GroupId & group_id)972 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
973 auto it = group_map_.find(group_id);
974 CHECK(it != group_map_.end());
975
976 RemoveGroup(it);
977 }
978
979 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)980 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
981 delete it->second;
982 return group_map_.erase(it);
983 }
984
985 // static
connect_backup_jobs_enabled()986 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
987 return g_connect_backup_jobs_enabled;
988 }
989
990 // static
set_connect_backup_jobs_enabled(bool enabled)991 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
992 bool old_value = g_connect_backup_jobs_enabled;
993 g_connect_backup_jobs_enabled = enabled;
994 return old_value;
995 }
996
IncrementIdleCount()997 void TransportClientSocketPool::IncrementIdleCount() {
998 ++idle_socket_count_;
999 }
1000
DecrementIdleCount()1001 void TransportClientSocketPool::DecrementIdleCount() {
1002 --idle_socket_count_;
1003 }
1004
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1005 void TransportClientSocketPool::ReleaseSocket(
1006 const GroupId& group_id,
1007 std::unique_ptr<StreamSocket> socket,
1008 int64_t group_generation) {
1009 auto i = group_map_.find(group_id);
1010 CHECK(i != group_map_.end());
1011
1012 Group* group = i->second;
1013 CHECK(group);
1014
1015 CHECK_GT(handed_out_socket_count_, 0);
1016 handed_out_socket_count_--;
1017
1018 CHECK_GT(group->active_socket_count(), 0);
1019 group->DecrementActiveSocketCount();
1020
1021 bool can_resuse_socket = false;
1022 std::string_view not_reusable_reason;
1023 if (!socket->IsConnectedAndIdle()) {
1024 if (!socket->IsConnected()) {
1025 not_reusable_reason = kClosedConnectionReturnedToPool;
1026 } else {
1027 not_reusable_reason = kDataReceivedUnexpectedly;
1028 }
1029 } else if (group_generation != group->generation()) {
1030 not_reusable_reason = kSocketGenerationOutOfDate;
1031 } else {
1032 can_resuse_socket = true;
1033 }
1034
1035 if (can_resuse_socket) {
1036 DCHECK(not_reusable_reason.empty());
1037
1038 // Add it to the idle list.
1039 AddIdleSocket(std::move(socket), group);
1040 OnAvailableSocketSlot(group_id, group);
1041 } else {
1042 DCHECK(!not_reusable_reason.empty());
1043
1044 socket->NetLog().AddEventWithStringParams(
1045 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1046 not_reusable_reason);
1047 if (group->IsEmpty())
1048 RemoveGroup(i);
1049 socket.reset();
1050 }
1051
1052 CheckForStalledSocketGroups();
1053 }
1054
CheckForStalledSocketGroups()1055 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1056 // Loop until there's nothing more to do.
1057 while (true) {
1058 // If we have idle sockets, see if we can give one to the top-stalled group.
1059 Group* top_group = nullptr;
1060 GroupId top_group_id;
1061 if (!FindTopStalledGroup(&top_group, &top_group_id))
1062 return;
1063
1064 if (ReachedMaxSocketsLimit()) {
1065 if (idle_socket_count_ > 0) {
1066 CloseOneIdleSocket();
1067 } else {
1068 // We can't activate more sockets since we're already at our global
1069 // limit.
1070 return;
1071 }
1072 }
1073
1074 // Note that this may delete top_group.
1075 OnAvailableSocketSlot(top_group_id, top_group);
1076 }
1077 }
1078
1079 // Search for the highest priority pending request, amongst the groups that
1080 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1081 // the same priority, the winner is based on group hash ordering (and not
1082 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1083 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1084 GroupId* group_id) const {
1085 CHECK(group);
1086 CHECK(group_id);
1087 Group* top_group = nullptr;
1088 const GroupId* top_group_id = nullptr;
1089 bool has_stalled_group = false;
1090 for (const auto& it : group_map_) {
1091 Group* curr_group = it.second;
1092 if (!curr_group->has_unbound_requests())
1093 continue;
1094 if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1095 has_stalled_group = true;
1096 bool has_higher_priority =
1097 !top_group ||
1098 curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1099 if (has_higher_priority) {
1100 top_group = curr_group;
1101 top_group_id = &it.first;
1102 }
1103 }
1104 }
1105
1106 if (top_group) {
1107 *group = top_group;
1108 *group_id = *top_group_id;
1109 } else {
1110 CHECK(!has_stalled_group);
1111 }
1112 return has_stalled_group;
1113 }
1114
OnIPAddressChanged()1115 void TransportClientSocketPool::OnIPAddressChanged() {
1116 DCHECK(cleanup_on_ip_address_change_);
1117 FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1118 }
1119
FlushWithError(int error,const char * net_log_reason_utf8)1120 void TransportClientSocketPool::FlushWithError(
1121 int error,
1122 const char* net_log_reason_utf8) {
1123 CancelAllConnectJobs();
1124 CloseIdleSockets(net_log_reason_utf8);
1125 CancelAllRequestsWithError(error);
1126 for (const auto& group : group_map_) {
1127 group.second->IncrementGeneration();
1128 }
1129 }
1130
RemoveConnectJob(ConnectJob * job,Group * group)1131 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1132 Group* group) {
1133 CHECK_GT(connecting_socket_count_, 0);
1134 connecting_socket_count_--;
1135
1136 DCHECK(group);
1137 group->RemoveUnboundJob(job);
1138 }
1139
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1140 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1141 Group* group) {
1142 DCHECK(base::Contains(group_map_, group_id));
1143 if (group->IsEmpty()) {
1144 RemoveGroup(group_id);
1145 } else if (group->has_unbound_requests()) {
1146 ProcessPendingRequest(group_id, group);
1147 }
1148 }
1149
ProcessPendingRequest(const GroupId & group_id,Group * group)1150 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1151 Group* group) {
1152 const Request* next_request = group->GetNextUnboundRequest();
1153 DCHECK(next_request);
1154
1155 // If the group has no idle sockets, and can't make use of an additional slot,
1156 // either because it's at the limit or because it's at the socket per group
1157 // limit, then there's nothing to do.
1158 if (group->idle_sockets().empty() &&
1159 !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1160 return;
1161 }
1162
1163 int rv =
1164 RequestSocketInternal(group_id, *next_request,
1165 /*preconnect_done_closure=*/base::OnceClosure());
1166 if (rv != ERR_IO_PENDING) {
1167 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1168 DCHECK(request);
1169 if (group->IsEmpty())
1170 RemoveGroup(group_id);
1171
1172 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1173 rv);
1174 InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1175 request->socket_tag());
1176 }
1177 }
1178
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1179 void TransportClientSocketPool::HandOutSocket(
1180 std::unique_ptr<StreamSocket> socket,
1181 ClientSocketHandle::SocketReuseType reuse_type,
1182 const LoadTimingInfo::ConnectTiming& connect_timing,
1183 ClientSocketHandle* handle,
1184 base::TimeDelta idle_time,
1185 Group* group,
1186 const NetLogWithSource& net_log) {
1187 DCHECK(socket);
1188 handle->SetSocket(std::move(socket));
1189 handle->set_reuse_type(reuse_type);
1190 handle->set_idle_time(idle_time);
1191 handle->set_group_generation(group->generation());
1192 handle->set_connect_timing(connect_timing);
1193
1194 if (reuse_type == StreamSocketHandle::SocketReuseType::kReusedIdle) {
1195 net_log.AddEventWithIntParams(
1196 NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1197 static_cast<int>(idle_time.InMilliseconds()));
1198 }
1199
1200 net_log.AddEventReferencingSource(
1201 NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1202 handle->socket()->NetLog().source());
1203
1204 handed_out_socket_count_++;
1205 group->IncrementActiveSocketCount();
1206 }
1207
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1208 void TransportClientSocketPool::AddIdleSocket(
1209 std::unique_ptr<StreamSocket> socket,
1210 Group* group) {
1211 DCHECK(socket);
1212 IdleSocket idle_socket;
1213 idle_socket.socket = std::move(socket);
1214 idle_socket.start_time = base::TimeTicks::Now();
1215
1216 group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1217 IncrementIdleCount();
1218 }
1219
CancelAllConnectJobs()1220 void TransportClientSocketPool::CancelAllConnectJobs() {
1221 for (auto i = group_map_.begin(); i != group_map_.end();) {
1222 Group* group = i->second;
1223 CHECK(group);
1224 connecting_socket_count_ -= group->jobs().size();
1225 group->RemoveAllUnboundJobs();
1226
1227 // Delete group if no longer needed.
1228 if (group->IsEmpty()) {
1229 i = RemoveGroup(i);
1230 } else {
1231 ++i;
1232 }
1233 }
1234 }
1235
CancelAllRequestsWithError(int error)1236 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1237 for (auto i = group_map_.begin(); i != group_map_.end();) {
1238 Group* group = i->second;
1239 CHECK(group);
1240
1241 while (true) {
1242 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1243 if (!request)
1244 break;
1245 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1246 error, request->socket_tag());
1247 }
1248
1249 // Mark bound connect jobs as needing to fail. Can't fail them immediately
1250 // because they may have access to objects owned by the ConnectJob, and
1251 // could access them if a user callback invocation is queued. It would also
1252 // result in the consumer handling two messages at once, which in general
1253 // isn't safe for a lot of code.
1254 group->SetPendingErrorForAllBoundRequests(error);
1255
1256 // Delete group if no longer needed.
1257 if (group->IsEmpty()) {
1258 i = RemoveGroup(i);
1259 } else {
1260 ++i;
1261 }
1262 }
1263 }
1264
ReachedMaxSocketsLimit() const1265 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1266 // Each connecting socket will eventually connect and be handed out.
1267 int total =
1268 handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1269 // There can be more sockets than the limit since some requests can ignore
1270 // the limit
1271 if (total < max_sockets_)
1272 return false;
1273 return true;
1274 }
1275
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1276 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1277 const Group* exception_group) {
1278 CHECK_GT(idle_socket_count_, 0);
1279
1280 for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1281 Group* group = i->second;
1282 CHECK(group);
1283 if (exception_group == group)
1284 continue;
1285 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1286
1287 if (!idle_sockets->empty()) {
1288 idle_sockets->pop_front();
1289 DecrementIdleCount();
1290 if (group->IsEmpty())
1291 RemoveGroup(i);
1292
1293 return true;
1294 }
1295 }
1296
1297 return false;
1298 }
1299
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1300 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1301 int result,
1302 ConnectJob* job) {
1303 DCHECK_NE(ERR_IO_PENDING, result);
1304 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1305 DCHECK_EQ(group, group_map_[group->group_id()]);
1306 DCHECK(result != OK || job->socket() != nullptr);
1307
1308 // Check if the ConnectJob is already bound to a Request. If so, result is
1309 // returned to that specific request.
1310 std::optional<Group::BoundRequest> bound_request =
1311 group->FindAndRemoveBoundRequestForConnectJob(job);
1312 Request* request = nullptr;
1313 std::unique_ptr<Request> owned_request;
1314 if (bound_request) {
1315 --connecting_socket_count_;
1316
1317 // If the socket pools were previously flushed with an error, return that
1318 // error to the bound request and discard the socket.
1319 if (bound_request->pending_error != OK) {
1320 InvokeUserCallbackLater(bound_request->request->handle(),
1321 bound_request->request->release_callback(),
1322 bound_request->pending_error,
1323 bound_request->request->socket_tag());
1324 bound_request->request->net_log().EndEventWithNetErrorCode(
1325 NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1326 OnAvailableSocketSlot(group->group_id(), group);
1327 CheckForStalledSocketGroups();
1328 return;
1329 }
1330
1331 // If the ConnectJob is from a previous generation, add the request back to
1332 // the group, and kick off another request. The socket will be discarded.
1333 if (bound_request->generation != group->generation()) {
1334 group->InsertUnboundRequest(std::move(bound_request->request));
1335 OnAvailableSocketSlot(group->group_id(), group);
1336 CheckForStalledSocketGroups();
1337 return;
1338 }
1339
1340 request = bound_request->request.get();
1341 } else {
1342 // In this case, RemoveConnectJob(job, _) must be called before exiting this
1343 // method. Otherwise, |job| will be leaked.
1344 owned_request = group->PopNextUnboundRequest();
1345 request = owned_request.get();
1346
1347 if (!request) {
1348 if (result == OK)
1349 AddIdleSocket(job->PassSocket(), group);
1350 RemoveConnectJob(job, group);
1351 OnAvailableSocketSlot(group->group_id(), group);
1352 CheckForStalledSocketGroups();
1353 return;
1354 }
1355
1356 LogBoundConnectJobToRequest(job->net_log().source(), *request);
1357 }
1358
1359 // The case where there's no request is handled above.
1360 DCHECK(request);
1361
1362 if (result != OK)
1363 request->handle()->SetAdditionalErrorState(job);
1364 if (job->socket()) {
1365 HandOutSocket(job->PassSocket(),
1366 StreamSocketHandle::SocketReuseType::kUnused,
1367 job->connect_timing(), request->handle(), base::TimeDelta(),
1368 group, request->net_log());
1369 }
1370 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1371 result);
1372 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1373 result, request->socket_tag());
1374 if (!bound_request)
1375 RemoveConnectJob(job, group);
1376 // If no socket was handed out, there's a new socket slot available.
1377 if (!request->handle()->socket()) {
1378 OnAvailableSocketSlot(group->group_id(), group);
1379 CheckForStalledSocketGroups();
1380 }
1381 }
1382
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1383 void TransportClientSocketPool::OnNeedsProxyAuth(
1384 Group* group,
1385 const HttpResponseInfo& response,
1386 HttpAuthController* auth_controller,
1387 base::OnceClosure restart_with_auth_callback,
1388 ConnectJob* job) {
1389 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1390 DCHECK_EQ(group, group_map_[group->group_id()]);
1391
1392 const Request* request = group->BindRequestToConnectJob(job);
1393 // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1394 // failure.
1395 if (!request) {
1396 OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1397 return;
1398 }
1399
1400 request->proxy_auth_callback().Run(response, auth_controller,
1401 std::move(restart_with_auth_callback));
1402 }
1403
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1404 void TransportClientSocketPool::InvokeUserCallbackLater(
1405 ClientSocketHandle* handle,
1406 CompletionOnceCallback callback,
1407 int rv,
1408 const SocketTag& socket_tag) {
1409 CHECK(!base::Contains(pending_callback_map_, handle));
1410 pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1411 if (rv == OK) {
1412 handle->socket()->ApplySocketTag(socket_tag);
1413 }
1414 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1415 FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1416 weak_factory_.GetWeakPtr(),
1417 // This is safe as `handle` is checked against a
1418 // map to verify it's alive before dereference.
1419 // This code path must only be reachable by
1420 // `handle`s that have had Init called.
1421 base::UnsafeDangling(handle)));
1422 }
1423
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1424 void TransportClientSocketPool::InvokeUserCallback(
1425 MayBeDangling<ClientSocketHandle> handle) {
1426 auto it = pending_callback_map_.find(handle);
1427
1428 // Exit if the request has already been cancelled.
1429 if (it == pending_callback_map_.end())
1430 return;
1431
1432 CHECK(!handle->is_initialized());
1433 CompletionOnceCallback callback = std::move(it->second.callback);
1434 int result = it->second.result;
1435 pending_callback_map_.erase(it);
1436 std::move(callback).Run(result);
1437 }
1438
TryToCloseSocketsInLayeredPools()1439 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1440 while (IsStalled()) {
1441 // Closing a socket will result in calling back into |this| to use the freed
1442 // socket slot, so nothing else is needed.
1443 if (!CloseOneIdleConnectionInHigherLayeredPool())
1444 return;
1445 }
1446 }
1447
1448 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1449 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1450 const base::TimeTicks& now,
1451 const char* net_log_reason_utf8) {
1452 Group* group = it->second;
1453 CHECK(group);
1454 CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1455
1456 connecting_socket_count_ -= group->jobs().size();
1457 group->RemoveAllUnboundJobs();
1458
1459 // Otherwise, prevent reuse of existing sockets.
1460 group->IncrementGeneration();
1461
1462 // Delete group if no longer needed.
1463 if (group->IsEmpty()) {
1464 return RemoveGroup(it);
1465 }
1466 return ++it;
1467 }
1468
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1469 TransportClientSocketPool::Group::Group(
1470 const GroupId& group_id,
1471 TransportClientSocketPool* client_socket_pool)
1472 : group_id_(group_id),
1473 client_socket_pool_(client_socket_pool),
1474 unbound_requests_(NUM_PRIORITIES) {}
1475
~Group()1476 TransportClientSocketPool::Group::~Group() {
1477 DCHECK_EQ(0u, never_assigned_job_count());
1478 DCHECK_EQ(0u, unassigned_job_count());
1479 DCHECK(unbound_requests_.empty());
1480 DCHECK(jobs_.empty());
1481 DCHECK(bound_requests_.empty());
1482 }
1483
OnConnectJobComplete(int result,ConnectJob * job)1484 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1485 ConnectJob* job) {
1486 DCHECK_NE(ERR_IO_PENDING, result);
1487 client_socket_pool_->OnConnectJobComplete(this, result, job);
1488 }
1489
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1490 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1491 const HttpResponseInfo& response,
1492 HttpAuthController* auth_controller,
1493 base::OnceClosure restart_with_auth_callback,
1494 ConnectJob* job) {
1495 client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1496 std::move(restart_with_auth_callback),
1497 job);
1498 }
1499
StartBackupJobTimer(const GroupId & group_id)1500 void TransportClientSocketPool::Group::StartBackupJobTimer(
1501 const GroupId& group_id) {
1502 // Only allow one timer to run at a time.
1503 if (BackupJobTimerIsRunning())
1504 return;
1505
1506 // Unretained here is okay because |backup_job_timer_| is
1507 // automatically cancelled when it's destroyed.
1508 backup_job_timer_.Start(FROM_HERE,
1509 client_socket_pool_->ConnectRetryInterval(),
1510 base::BindOnce(&Group::OnBackupJobTimerFired,
1511 base::Unretained(this), group_id));
1512 }
1513
BackupJobTimerIsRunning() const1514 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1515 return backup_job_timer_.IsRunning();
1516 }
1517
TryToUseNeverAssignedConnectJob()1518 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1519 SanityCheck();
1520
1521 if (never_assigned_job_count_ == 0)
1522 return false;
1523 --never_assigned_job_count_;
1524 return true;
1525 }
1526
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1527 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1528 bool is_preconnect) {
1529 SanityCheck();
1530
1531 if (is_preconnect)
1532 ++never_assigned_job_count_;
1533 jobs_.push_back(std::move(job));
1534 TryToAssignUnassignedJob(jobs_.back().get());
1535
1536 SanityCheck();
1537 }
1538
RemoveUnboundJob(ConnectJob * job)1539 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1540 ConnectJob* job) {
1541 SanityCheck();
1542
1543 // Check that |job| is in the list.
1544 auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1545 CHECK(it != jobs_.end(), base::NotFatalUntil::M130);
1546
1547 // Check if |job| is in the unassigned jobs list. If so, remove it.
1548 auto it2 = base::ranges::find(unassigned_jobs_, job);
1549 if (it2 != unassigned_jobs_.end()) {
1550 unassigned_jobs_.erase(it2);
1551 } else {
1552 // Otherwise, |job| must be assigned to some Request. Unassign it, then
1553 // try to replace it with another job if possible (either by taking an
1554 // unassigned job or stealing from another request, if any requests after it
1555 // have a job).
1556 RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1557 DCHECK(!request_with_job.is_null());
1558 request_with_job.value()->ReleaseJob();
1559 TryToAssignJobToRequest(request_with_job);
1560 }
1561 std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1562 jobs_.erase(it);
1563
1564 size_t job_count = jobs_.size();
1565 if (job_count < never_assigned_job_count_)
1566 never_assigned_job_count_ = job_count;
1567
1568 // If we've got no more jobs for this group, then we no longer need a
1569 // backup job either.
1570 if (jobs_.empty()) {
1571 DCHECK(unassigned_jobs_.empty());
1572 backup_job_timer_.Stop();
1573 }
1574
1575 SanityCheck();
1576 return owned_job;
1577 }
1578
OnBackupJobTimerFired(const GroupId & group_id)1579 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1580 const GroupId& group_id) {
1581 // If there are no more jobs pending, there is no work to do.
1582 // If we've done our cleanups correctly, this should not happen.
1583 if (jobs_.empty()) {
1584 NOTREACHED();
1585 }
1586
1587 // If the old job has already established a connection, don't start a backup
1588 // job. Backup jobs are only for issues establishing the initial TCP
1589 // connection - the timeout they used is tuned for that, and tests expect that
1590 // behavior.
1591 //
1592 // TODO(crbug.com/41440018): Replace both this and the
1593 // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1594 // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1595 // OnHostResolved callback of any pending requests), and the
1596 // HasEstablishedConnection() callback to stop the timer. That should result
1597 // in a more robust, testable API.
1598 if ((*jobs_.begin())->HasEstablishedConnection())
1599 return;
1600
1601 // If our old job is waiting on DNS, or if we can't create any sockets
1602 // right now due to limits, just reset the timer.
1603 if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1604 !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1605 (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1606 StartBackupJobTimer(group_id);
1607 return;
1608 }
1609
1610 if (unbound_requests_.empty())
1611 return;
1612
1613 Request* request = unbound_requests_.FirstMax().value().get();
1614 std::unique_ptr<ConnectJob> owned_backup_job =
1615 client_socket_pool_->CreateConnectJob(
1616 group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1617 request->proxy_annotation_tag(), request->priority(),
1618 request->socket_tag(), this);
1619 owned_backup_job->net_log().AddEvent(
1620 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1621 return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1622 });
1623 ConnectJob* backup_job = owned_backup_job.get();
1624 AddJob(std::move(owned_backup_job), false);
1625 client_socket_pool_->connecting_socket_count_++;
1626 int rv = backup_job->Connect();
1627 if (rv != ERR_IO_PENDING) {
1628 client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1629 }
1630 }
1631
SanityCheck() const1632 void TransportClientSocketPool::Group::SanityCheck() const {
1633 #if DCHECK_IS_ON()
1634 DCHECK_LE(never_assigned_job_count(), jobs_.size());
1635 DCHECK_LE(unassigned_job_count(), jobs_.size());
1636
1637 // Check that |unassigned_jobs_| is empty iff there are at least as many
1638 // requests as jobs.
1639 DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1640
1641 size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1642
1643 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1644 for (size_t i = 0; i < unbound_requests_.size();
1645 ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1646 DCHECK(!pointer.is_null());
1647 DCHECK(pointer.value());
1648 // Check that the first |num_assigned_jobs| requests have valid job
1649 // assignments.
1650 if (i < num_assigned_jobs) {
1651 // The request has a job.
1652 ConnectJob* job = pointer.value()->job();
1653 DCHECK(job);
1654 // The request's job is not in |unassigned_jobs_|
1655 DCHECK(!base::Contains(unassigned_jobs_, job));
1656 // The request's job is in |jobs_|
1657 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1658 // The same job is not assigned to any other request with a job.
1659 RequestQueue::Pointer pointer2 =
1660 unbound_requests_.GetNextTowardsLastMin(pointer);
1661 for (size_t j = i + 1; j < num_assigned_jobs;
1662 ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1663 DCHECK(!pointer2.is_null());
1664 ConnectJob* job2 = pointer2.value()->job();
1665 DCHECK(job2);
1666 DCHECK_NE(job, job2);
1667 }
1668 DCHECK_EQ(pointer.value()->priority(), job->priority());
1669 } else {
1670 // Check that any subsequent requests do not have a job.
1671 DCHECK(!pointer.value()->job());
1672 }
1673 }
1674
1675 for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1676 // Check that all unassigned jobs are in |jobs_|
1677 ConnectJob* job = *it;
1678 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1679 // Check that there are no duplicated entries in |unassigned_jobs_|
1680 for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1681 DCHECK_NE(job, *it2);
1682 }
1683
1684 // Check that no |unassigned_jobs_| are in |bound_requests_|.
1685 DCHECK(!base::Contains(bound_requests_, job,
1686 [](const BoundRequest& bound_request) {
1687 return bound_request.connect_job.get();
1688 }));
1689 }
1690 #endif
1691 }
1692
RemoveAllUnboundJobs()1693 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1694 SanityCheck();
1695
1696 // Remove jobs from any requests that have them.
1697 if (!unbound_requests_.empty()) {
1698 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1699 !pointer.is_null() && pointer.value()->job();
1700 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1701 pointer.value()->ReleaseJob();
1702 }
1703 }
1704 unassigned_jobs_.clear();
1705 never_assigned_job_count_ = 0;
1706
1707 // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1708 // removal from `TransportClientSocketPool::group_map_`, so if this check
1709 // fails, `this` has been deleted, likely through some reentrancy issue.
1710 CHECK(client_socket_pool_->HasGroup(group_id_));
1711
1712 // Delete active jobs.
1713 jobs_.clear();
1714 // Stop backup job timer.
1715 backup_job_timer_.Stop();
1716
1717 SanityCheck();
1718 }
1719
ConnectJobCount() const1720 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1721 return bound_requests_.size() + jobs_.size();
1722 }
1723
GetConnectJobForHandle(const ClientSocketHandle * handle) const1724 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1725 const ClientSocketHandle* handle) const {
1726 // Search through bound requests for |handle|.
1727 for (const auto& bound_pair : bound_requests_) {
1728 if (handle == bound_pair.request->handle())
1729 return bound_pair.connect_job.get();
1730 }
1731
1732 // Search through the unbound requests that have corresponding jobs for a
1733 // request with |handle|.
1734 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1735 !pointer.is_null() && pointer.value()->job();
1736 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1737 if (pointer.value()->handle() == handle)
1738 return pointer.value()->job();
1739 }
1740
1741 return nullptr;
1742 }
1743
InsertUnboundRequest(std::unique_ptr<Request> request)1744 void TransportClientSocketPool::Group::InsertUnboundRequest(
1745 std::unique_ptr<Request> request) {
1746 SanityCheck();
1747
1748 // Should not have a job because it is not already in |unbound_requests_|
1749 DCHECK(!request->job());
1750 // This value must be cached before we release |request|.
1751 RequestPriority priority = request->priority();
1752
1753 RequestQueue::Pointer new_position;
1754 if (request->respect_limits() == RespectLimits::DISABLED) {
1755 // Put requests with RespectLimits::DISABLED (which should have
1756 // priority == MAXIMUM_PRIORITY) ahead of other requests with
1757 // MAXIMUM_PRIORITY.
1758 DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1759 new_position =
1760 unbound_requests_.InsertAtFront(std::move(request), priority);
1761 } else {
1762 new_position = unbound_requests_.Insert(std::move(request), priority);
1763 }
1764 DCHECK(!unbound_requests_.empty());
1765
1766 TryToAssignJobToRequest(new_position);
1767
1768 SanityCheck();
1769 }
1770
1771 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1772 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1773 return unbound_requests_.empty() ? nullptr
1774 : unbound_requests_.FirstMax().value().get();
1775 }
1776
1777 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1778 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1779 if (unbound_requests_.empty())
1780 return nullptr;
1781 return RemoveUnboundRequest(unbound_requests_.FirstMax());
1782 }
1783
1784 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1785 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1786 ClientSocketHandle* handle) {
1787 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1788 !pointer.is_null();
1789 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1790 if (pointer.value()->handle() == handle) {
1791 DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1792 pointer.value()->priority());
1793 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1794 return request;
1795 }
1796 }
1797 return nullptr;
1798 }
1799
SetPendingErrorForAllBoundRequests(int pending_error)1800 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1801 int pending_error) {
1802 for (auto& bound_request : bound_requests_) {
1803 // Earlier errors take precedence.
1804 if (bound_request.pending_error == OK)
1805 bound_request.pending_error = pending_error;
1806 }
1807 }
1808
1809 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1810 TransportClientSocketPool::Group::BindRequestToConnectJob(
1811 ConnectJob* connect_job) {
1812 // Check if |job| is already bound to a Request.
1813 for (const auto& bound_pair : bound_requests_) {
1814 if (bound_pair.connect_job.get() == connect_job)
1815 return bound_pair.request.get();
1816 }
1817
1818 // If not, try to bind it to a Request.
1819 const Request* request = GetNextUnboundRequest();
1820 // If there are no pending requests, or the highest priority request has no
1821 // callback to handle auth challenges, return nullptr.
1822 if (!request || request->proxy_auth_callback().is_null())
1823 return nullptr;
1824
1825 // Otherwise, bind the ConnectJob to the Request.
1826 std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1827 DCHECK_EQ(owned_request.get(), request);
1828 std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1829 LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1830 bound_requests_.emplace_back(BoundRequest(
1831 std::move(owned_connect_job), std::move(owned_request), generation()));
1832 return request;
1833 }
1834
1835 std::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1836 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1837 ConnectJob* connect_job) {
1838 for (auto bound_pair = bound_requests_.begin();
1839 bound_pair != bound_requests_.end(); ++bound_pair) {
1840 if (bound_pair->connect_job.get() != connect_job)
1841 continue;
1842 BoundRequest ret = std::move(*bound_pair);
1843 bound_requests_.erase(bound_pair);
1844 return std::move(ret);
1845 }
1846 return std::nullopt;
1847 }
1848
1849 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1850 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1851 ClientSocketHandle* client_socket_handle) {
1852 for (auto bound_pair = bound_requests_.begin();
1853 bound_pair != bound_requests_.end(); ++bound_pair) {
1854 if (bound_pair->request->handle() != client_socket_handle)
1855 continue;
1856 std::unique_ptr<Request> request = std::move(bound_pair->request);
1857 bound_requests_.erase(bound_pair);
1858 return request;
1859 }
1860 return nullptr;
1861 }
1862
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1863 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1864 RequestPriority priority) {
1865 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1866 !pointer.is_null();
1867 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1868 if (pointer.value()->handle() == handle) {
1869 if (pointer.value()->priority() == priority)
1870 return;
1871
1872 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1873
1874 // Requests that ignore limits much be created and remain at the highest
1875 // priority, and should not be reprioritized.
1876 DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1877
1878 request->set_priority(priority);
1879 InsertUnboundRequest(std::move(request));
1880 return;
1881 }
1882 }
1883
1884 // This function must be called with a valid ClientSocketHandle.
1885 NOTREACHED();
1886 }
1887
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1888 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1889 const ClientSocketHandle* handle) const {
1890 SanityCheck();
1891 if (GetConnectJobForHandle(handle))
1892 return true;
1893
1894 // There's no corresponding ConnectJob. Verify that the handle is at least
1895 // owned by a request.
1896 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1897 for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1898 if (pointer.value()->handle() == handle)
1899 return false;
1900 pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1901 }
1902 NOTREACHED();
1903 }
1904
BoundRequest()1905 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1906 : pending_error(OK) {}
1907
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1908 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1909 std::unique_ptr<ConnectJob> connect_job,
1910 std::unique_ptr<Request> request,
1911 int64_t generation)
1912 : connect_job(std::move(connect_job)),
1913 request(std::move(request)),
1914 generation(generation),
1915 pending_error(OK) {}
1916
1917 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1918 BoundRequest&& other) = default;
1919
1920 TransportClientSocketPool::Group::BoundRequest&
1921 TransportClientSocketPool::Group::BoundRequest::operator=(
1922 BoundRequest&& other) = default;
1923
1924 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1925
1926 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1927 TransportClientSocketPool::Group::RemoveUnboundRequest(
1928 const RequestQueue::Pointer& pointer) {
1929 SanityCheck();
1930
1931 std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1932 if (request->job()) {
1933 TryToAssignUnassignedJob(request->ReleaseJob());
1934 }
1935 // If there are no more unbound requests, kill the backup timer.
1936 if (unbound_requests_.empty())
1937 backup_job_timer_.Stop();
1938
1939 SanityCheck();
1940 return request;
1941 }
1942
1943 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1944 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1945 const ConnectJob* job) const {
1946 SanityCheck();
1947
1948 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1949 !pointer.is_null() && pointer.value()->job();
1950 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1951 if (pointer.value()->job() == job)
1952 return pointer;
1953 }
1954 // If a request with the job was not found, it must be in |unassigned_jobs_|.
1955 DCHECK(base::Contains(unassigned_jobs_, job));
1956 return RequestQueue::Pointer();
1957 }
1958
1959 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1960 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1961 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1962 size_t i = 0;
1963 for (; !pointer.is_null() && pointer.value()->job();
1964 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1965 ++i;
1966 }
1967 DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1968 DCHECK(pointer.is_null() || !pointer.value()->job());
1969 return pointer;
1970 }
1971
TryToAssignUnassignedJob(ConnectJob * job)1972 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1973 ConnectJob* job) {
1974 unassigned_jobs_.push_back(job);
1975 RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1976 if (!first_request_without_job.is_null()) {
1977 first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1978 unassigned_jobs_.pop_back();
1979 }
1980 }
1981
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1982 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1983 TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1984 DCHECK(!request_pointer.value()->job());
1985 if (!unassigned_jobs_.empty()) {
1986 request_pointer.value()->AssignJob(unassigned_jobs_.front());
1987 unassigned_jobs_.pop_front();
1988 return;
1989 }
1990
1991 // If the next request in the queue does not have a job, then there are no
1992 // requests with a job after |request_pointer| from which we can steal.
1993 RequestQueue::Pointer next_request =
1994 unbound_requests_.GetNextTowardsLastMin(request_pointer);
1995 if (next_request.is_null() || !next_request.value()->job())
1996 return;
1997
1998 // Walk down the queue to find the last request with a job.
1999 RequestQueue::Pointer cur = next_request;
2000 RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2001 while (!next.is_null() && next.value()->job()) {
2002 cur = next;
2003 next = unbound_requests_.GetNextTowardsLastMin(next);
2004 }
2005 // Steal the job from the last request with a job.
2006 TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2007 }
2008
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2009 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2010 TransportClientSocketPool::Request* source,
2011 TransportClientSocketPool::Request* dest) {
2012 DCHECK(!dest->job());
2013 DCHECK(source->job());
2014 dest->AssignJob(source->ReleaseJob());
2015 }
2016
2017 } // namespace net
2018