1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/transport_client_socket_pool.h"
6
7 #include <utility>
8
9 #include "base/auto_reset.h"
10 #include "base/barrier_closure.h"
11 #include "base/check_op.h"
12 #include "base/compiler_specific.h"
13 #include "base/containers/contains.h"
14 #include "base/format_macros.h"
15 #include "base/functional/bind.h"
16 #include "base/functional/callback_helpers.h"
17 #include "base/location.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/notreached.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/strings/string_util.h"
23 #include "base/task/single_thread_task_runner.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "net/base/host_port_pair.h"
27 #include "net/base/net_errors.h"
28 #include "net/base/proxy_chain.h"
29 #include "net/base/proxy_server.h"
30 #include "net/log/net_log.h"
31 #include "net/log/net_log_event_type.h"
32 #include "net/log/net_log_source.h"
33 #include "net/socket/connect_job_factory.h"
34 #include "net/traffic_annotation/network_traffic_annotation.h"
35 #include "url/gurl.h"
36
37 namespace net {
38
39 namespace {
40
41 // Indicate whether or not we should establish a new transport layer connection
42 // after a certain timeout has passed without receiving an ACK.
43 bool g_connect_backup_jobs_enabled = true;
44
NetLogCreateConnectJobParams(bool backup_job,const ClientSocketPool::GroupId * group_id)45 base::Value::Dict NetLogCreateConnectJobParams(
46 bool backup_job,
47 const ClientSocketPool::GroupId* group_id) {
48 base::Value::Dict dict;
49 dict.Set("backup_job", backup_job);
50 dict.Set("group_id", group_id->ToString());
51 return dict;
52 }
53
54 } // namespace
55
56 const char TransportClientSocketPool::kCertDatabaseChanged[] =
57 "Cert database changed";
58 const char TransportClientSocketPool::kCertVerifierChanged[] =
59 "Cert verifier changed";
60 const char TransportClientSocketPool::kClosedConnectionReturnedToPool[] =
61 "Connection was closed when it was returned to the pool";
62 const char TransportClientSocketPool::kDataReceivedUnexpectedly[] =
63 "Data received unexpectedly";
64 const char TransportClientSocketPool::kIdleTimeLimitExpired[] =
65 "Idle time limit expired";
66 const char TransportClientSocketPool::kNetworkChanged[] = "Network changed";
67 const char TransportClientSocketPool::kRemoteSideClosedConnection[] =
68 "Remote side closed connection";
69 const char TransportClientSocketPool::kSocketGenerationOutOfDate[] =
70 "Socket generation out of date";
71 const char TransportClientSocketPool::kSocketPoolDestroyed[] =
72 "Socket pool destroyed";
73 const char TransportClientSocketPool::kSslConfigChanged[] =
74 "SSL configuration changed";
75
Request(ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,Flags flags,scoped_refptr<SocketParams> socket_params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,const NetLogWithSource & net_log)76 TransportClientSocketPool::Request::Request(
77 ClientSocketHandle* handle,
78 CompletionOnceCallback callback,
79 const ProxyAuthCallback& proxy_auth_callback,
80 RequestPriority priority,
81 const SocketTag& socket_tag,
82 RespectLimits respect_limits,
83 Flags flags,
84 scoped_refptr<SocketParams> socket_params,
85 const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
86 const NetLogWithSource& net_log)
87 : handle_(handle),
88 callback_(std::move(callback)),
89 proxy_auth_callback_(proxy_auth_callback),
90 priority_(priority),
91 respect_limits_(respect_limits),
92 flags_(flags),
93 socket_params_(std::move(socket_params)),
94 proxy_annotation_tag_(proxy_annotation_tag),
95 net_log_(net_log),
96 socket_tag_(socket_tag) {
97 if (respect_limits_ == ClientSocketPool::RespectLimits::DISABLED)
98 DCHECK_EQ(priority_, MAXIMUM_PRIORITY);
99 }
100
101 TransportClientSocketPool::Request::~Request() = default;
102
AssignJob(ConnectJob * job)103 void TransportClientSocketPool::Request::AssignJob(ConnectJob* job) {
104 DCHECK(job);
105 DCHECK(!job_);
106 job_ = job;
107 if (job_->priority() != priority_)
108 job_->ChangePriority(priority_);
109 }
110
ReleaseJob()111 ConnectJob* TransportClientSocketPool::Request::ReleaseJob() {
112 DCHECK(job_);
113 ConnectJob* job = job_;
114 job_ = nullptr;
115 return job;
116 }
117
118 struct TransportClientSocketPool::IdleSocket {
119 // An idle socket can't be used if it is disconnected or has been used
120 // before and has received data unexpectedly (hence no longer idle). The
121 // unread data would be mistaken for the beginning of the next response if
122 // we were to use the socket for a new request.
123 //
124 // Note that a socket that has never been used before (like a preconnected
125 // socket) may be used even with unread data. This may be, e.g., a SPDY
126 // SETTINGS frame.
127 //
128 // If the socket is not usable, |net_log_reason_utf8| is set to a string
129 // indicating why the socket is not usable.
130 bool IsUsable(const char** net_log_reason_utf8) const;
131
132 std::unique_ptr<StreamSocket> socket;
133 base::TimeTicks start_time;
134 };
135
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change)136 TransportClientSocketPool::TransportClientSocketPool(
137 int max_sockets,
138 int max_sockets_per_group,
139 base::TimeDelta unused_idle_socket_timeout,
140 const ProxyChain& proxy_chain,
141 bool is_for_websockets,
142 const CommonConnectJobParams* common_connect_job_params,
143 bool cleanup_on_ip_address_change)
144 : TransportClientSocketPool(max_sockets,
145 max_sockets_per_group,
146 unused_idle_socket_timeout,
147 ClientSocketPool::used_idle_socket_timeout(),
148 proxy_chain,
149 is_for_websockets,
150 common_connect_job_params,
151 cleanup_on_ip_address_change,
152 std::make_unique<ConnectJobFactory>(),
153 common_connect_job_params->ssl_client_context,
154 /*connect_backup_jobs_enabled=*/true) {}
155
~TransportClientSocketPool()156 TransportClientSocketPool::~TransportClientSocketPool() {
157 // Clean up any idle sockets and pending connect jobs. Assert that we have no
158 // remaining active sockets or pending requests. They should have all been
159 // cleaned up prior to |this| being destroyed.
160 FlushWithError(ERR_ABORTED, kSocketPoolDestroyed);
161 DCHECK(group_map_.empty());
162 DCHECK(pending_callback_map_.empty());
163 DCHECK_EQ(0, connecting_socket_count_);
164 DCHECK_EQ(0, handed_out_socket_count_);
165 CHECK(higher_pools_.empty());
166
167 if (ssl_client_context_)
168 ssl_client_context_->RemoveObserver(this);
169
170 if (cleanup_on_ip_address_change_)
171 NetworkChangeNotifier::RemoveIPAddressObserver(this);
172 }
173
174 std::unique_ptr<TransportClientSocketPool>
CreateForTesting(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)175 TransportClientSocketPool::CreateForTesting(
176 int max_sockets,
177 int max_sockets_per_group,
178 base::TimeDelta unused_idle_socket_timeout,
179 base::TimeDelta used_idle_socket_timeout,
180 const ProxyChain& proxy_chain,
181 bool is_for_websockets,
182 const CommonConnectJobParams* common_connect_job_params,
183 std::unique_ptr<ConnectJobFactory> connect_job_factory,
184 SSLClientContext* ssl_client_context,
185 bool connect_backup_jobs_enabled) {
186 return base::WrapUnique<TransportClientSocketPool>(
187 new TransportClientSocketPool(
188 max_sockets, max_sockets_per_group, unused_idle_socket_timeout,
189 used_idle_socket_timeout, proxy_chain, is_for_websockets,
190 common_connect_job_params, /*cleanup_on_ip_address_change=*/true,
191 std::move(connect_job_factory), ssl_client_context,
192 connect_backup_jobs_enabled));
193 }
194
CallbackResultPair()195 TransportClientSocketPool::CallbackResultPair::CallbackResultPair()
196 : result(OK) {}
197
CallbackResultPair(CompletionOnceCallback callback_in,int result_in)198 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
199 CompletionOnceCallback callback_in,
200 int result_in)
201 : callback(std::move(callback_in)), result(result_in) {}
202
203 TransportClientSocketPool::CallbackResultPair::CallbackResultPair(
204 TransportClientSocketPool::CallbackResultPair&& other) = default;
205
206 TransportClientSocketPool::CallbackResultPair&
207 TransportClientSocketPool::CallbackResultPair::operator=(
208 TransportClientSocketPool::CallbackResultPair&& other) = default;
209
210 TransportClientSocketPool::CallbackResultPair::~CallbackResultPair() = default;
211
IsStalled() const212 bool TransportClientSocketPool::IsStalled() const {
213 // If fewer than |max_sockets_| are in use, then clearly |this| is not
214 // stalled.
215 if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_)
216 return false;
217 // So in order to be stalled, |this| must be using at least |max_sockets_| AND
218 // |this| must have a request that is actually stalled on the global socket
219 // limit. To find such a request, look for a group that has more requests
220 // than jobs AND where the number of sockets is less than
221 // |max_sockets_per_group_|. (If the number of sockets is equal to
222 // |max_sockets_per_group_|, then the request is stalled on the group limit,
223 // which does not count.)
224 for (const auto& it : group_map_) {
225 if (it.second->CanUseAdditionalSocketSlot(max_sockets_per_group_))
226 return true;
227 }
228 return false;
229 }
230
AddHigherLayeredPool(HigherLayeredPool * higher_pool)231 void TransportClientSocketPool::AddHigherLayeredPool(
232 HigherLayeredPool* higher_pool) {
233 CHECK(higher_pool);
234 CHECK(!base::Contains(higher_pools_, higher_pool));
235 higher_pools_.insert(higher_pool);
236 }
237
RemoveHigherLayeredPool(HigherLayeredPool * higher_pool)238 void TransportClientSocketPool::RemoveHigherLayeredPool(
239 HigherLayeredPool* higher_pool) {
240 CHECK(higher_pool);
241 CHECK(base::Contains(higher_pools_, higher_pool));
242 higher_pools_.erase(higher_pool);
243 }
244
RequestSocket(const GroupId & group_id,scoped_refptr<SocketParams> params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,RequestPriority priority,const SocketTag & socket_tag,RespectLimits respect_limits,ClientSocketHandle * handle,CompletionOnceCallback callback,const ProxyAuthCallback & proxy_auth_callback,const NetLogWithSource & net_log)245 int TransportClientSocketPool::RequestSocket(
246 const GroupId& group_id,
247 scoped_refptr<SocketParams> params,
248 const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
249 RequestPriority priority,
250 const SocketTag& socket_tag,
251 RespectLimits respect_limits,
252 ClientSocketHandle* handle,
253 CompletionOnceCallback callback,
254 const ProxyAuthCallback& proxy_auth_callback,
255 const NetLogWithSource& net_log) {
256 CHECK(callback);
257 CHECK(handle);
258
259 NetLogTcpClientSocketPoolRequestedSocket(net_log, group_id);
260
261 std::unique_ptr<Request> request = std::make_unique<Request>(
262 handle, std::move(callback), proxy_auth_callback, priority, socket_tag,
263 respect_limits, NORMAL, std::move(params), proxy_annotation_tag, net_log);
264
265 // Cleanup any timed-out idle sockets.
266 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
267
268 request->net_log().BeginEvent(NetLogEventType::SOCKET_POOL);
269
270 int rv =
271 RequestSocketInternal(group_id, *request,
272 /*preconnect_done_closure=*/base::OnceClosure());
273 if (rv != ERR_IO_PENDING) {
274 if (rv == OK) {
275 request->handle()->socket()->ApplySocketTag(request->socket_tag());
276 }
277 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
278 rv);
279 CHECK(!request->handle()->is_initialized());
280 request.reset();
281 } else {
282 Group* group = GetOrCreateGroup(group_id);
283 group->InsertUnboundRequest(std::move(request));
284 // Have to do this asynchronously, as closing sockets in higher level pools
285 // call back in to |this|, which will cause all sorts of fun and exciting
286 // re-entrancy issues if the socket pool is doing something else at the
287 // time.
288 if (group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
289 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
290 FROM_HERE,
291 base::BindOnce(
292 &TransportClientSocketPool::TryToCloseSocketsInLayeredPools,
293 weak_factory_.GetWeakPtr()));
294 }
295 }
296 return rv;
297 }
298
RequestSockets(const GroupId & group_id,scoped_refptr<SocketParams> params,const absl::optional<NetworkTrafficAnnotationTag> & proxy_annotation_tag,int num_sockets,CompletionOnceCallback callback,const NetLogWithSource & net_log)299 int TransportClientSocketPool::RequestSockets(
300 const GroupId& group_id,
301 scoped_refptr<SocketParams> params,
302 const absl::optional<NetworkTrafficAnnotationTag>& proxy_annotation_tag,
303 int num_sockets,
304 CompletionOnceCallback callback,
305 const NetLogWithSource& net_log) {
306 // TODO(eroman): Split out the host and port parameters.
307 net_log.AddEvent(NetLogEventType::TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKETS,
308 [&] { return NetLogGroupIdParams(group_id); });
309
310 Request request(nullptr /* no handle */, CompletionOnceCallback(),
311 ProxyAuthCallback(), IDLE, SocketTag(),
312 RespectLimits::ENABLED, NO_IDLE_SOCKETS, std::move(params),
313 proxy_annotation_tag, net_log);
314
315 // Cleanup any timed-out idle sockets.
316 CleanupIdleSockets(false, nullptr /* net_log_reason_utf8 */);
317
318 if (num_sockets > max_sockets_per_group_) {
319 num_sockets = max_sockets_per_group_;
320 }
321
322 request.net_log().BeginEventWithIntParams(
323 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, "num_sockets",
324 num_sockets);
325
326 Group* group = GetOrCreateGroup(group_id);
327
328 // RequestSocketsInternal() may delete the group.
329 bool deleted_group = false;
330
331 int rv = OK;
332
333 base::RepeatingClosure preconnect_done_closure = base::BarrierClosure(
334 num_sockets,
335 base::BindOnce(
336 [](CompletionOnceCallback callback) {
337 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
338 FROM_HERE, base::BindOnce(std::move(callback), OK));
339 },
340 std::move(callback)));
341 int pending_connect_job_count = 0;
342 for (int num_iterations_left = num_sockets;
343 group->NumActiveSocketSlots() < num_sockets && num_iterations_left > 0;
344 num_iterations_left--) {
345 rv = RequestSocketInternal(group_id, request, preconnect_done_closure);
346 if (rv == ERR_IO_PENDING) {
347 ++pending_connect_job_count;
348 }
349 if (rv < 0 && rv != ERR_IO_PENDING) {
350 // We're encountering a synchronous error. Give up.
351 if (!base::Contains(group_map_, group_id))
352 deleted_group = true;
353 break;
354 }
355 if (!base::Contains(group_map_, group_id)) {
356 // Unexpected. The group should only be getting deleted on synchronous
357 // error.
358 NOTREACHED();
359 deleted_group = true;
360 break;
361 }
362 }
363
364 if (!deleted_group && group->IsEmpty())
365 RemoveGroup(group_id);
366
367 if (rv == ERR_IO_PENDING)
368 rv = OK;
369 request.net_log().EndEventWithNetErrorCode(
370 NetLogEventType::SOCKET_POOL_CONNECTING_N_SOCKETS, rv);
371
372 // Currently we don't handle preconnect errors. So this method returns OK even
373 // if failed to preconnect.
374 // TODO(crbug.com/1330235): Consider support error handlings when needed.
375 if (pending_connect_job_count == 0)
376 return OK;
377 for (int i = 0; i < num_sockets - pending_connect_job_count; ++i) {
378 preconnect_done_closure.Run();
379 }
380
381 return ERR_IO_PENDING;
382 }
383
RequestSocketInternal(const GroupId & group_id,const Request & request,base::OnceClosure preconnect_done_closure)384 int TransportClientSocketPool::RequestSocketInternal(
385 const GroupId& group_id,
386 const Request& request,
387 base::OnceClosure preconnect_done_closure) {
388 #if DCHECK_IS_ON()
389 DCHECK(!request_in_process_);
390 base::AutoReset<bool> auto_reset(&request_in_process_, true);
391 #endif // DCHECK_IS_ON()
392
393 ClientSocketHandle* const handle = request.handle();
394 const bool preconnecting = !handle;
395 DCHECK_EQ(preconnecting, !!preconnect_done_closure);
396
397 Group* group = nullptr;
398 auto group_it = group_map_.find(group_id);
399 if (group_it != group_map_.end()) {
400 group = group_it->second;
401
402 if (!(request.flags() & NO_IDLE_SOCKETS)) {
403 // Try to reuse a socket.
404 if (AssignIdleSocketToRequest(request, group))
405 return OK;
406 }
407
408 // If there are more ConnectJobs than pending requests, don't need to do
409 // anything. Can just wait for the extra job to connect, and then assign it
410 // to the request.
411 if (!preconnecting && group->TryToUseNeverAssignedConnectJob())
412 return ERR_IO_PENDING;
413
414 // Can we make another active socket now?
415 if (!group->HasAvailableSocketSlot(max_sockets_per_group_) &&
416 request.respect_limits() == RespectLimits::ENABLED) {
417 // TODO(willchan): Consider whether or not we need to close a socket in a
418 // higher layered group. I don't think this makes sense since we would
419 // just reuse that socket then if we needed one and wouldn't make it down
420 // to this layer.
421 request.net_log().AddEvent(
422 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP);
423 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
424 }
425 }
426
427 if (ReachedMaxSocketsLimit() &&
428 request.respect_limits() == RespectLimits::ENABLED) {
429 // NOTE(mmenke): Wonder if we really need different code for each case
430 // here. Only reason for them now seems to be preconnects.
431 if (idle_socket_count_ > 0) {
432 // There's an idle socket in this pool. Either that's because there's
433 // still one in this group, but we got here due to preconnecting
434 // bypassing idle sockets, or because there's an idle socket in another
435 // group.
436 bool closed = CloseOneIdleSocketExceptInGroup(group);
437 if (preconnecting && !closed)
438 return ERR_PRECONNECT_MAX_SOCKET_LIMIT;
439 } else {
440 // We could check if we really have a stalled group here, but it
441 // requires a scan of all groups, so just flip a flag here, and do the
442 // check later.
443 request.net_log().AddEvent(
444 NetLogEventType::SOCKET_POOL_STALLED_MAX_SOCKETS);
445 return preconnecting ? ERR_PRECONNECT_MAX_SOCKET_LIMIT : ERR_IO_PENDING;
446 }
447 }
448
449 // We couldn't find a socket to reuse, and there's space to allocate one,
450 // so allocate and connect a new one.
451 group = GetOrCreateGroup(group_id);
452 std::unique_ptr<ConnectJob> connect_job(
453 CreateConnectJob(group_id, request.socket_params(), proxy_chain_,
454 request.proxy_annotation_tag(), request.priority(),
455 request.socket_tag(), group));
456 connect_job->net_log().AddEvent(
457 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
458 return NetLogCreateConnectJobParams(false /* backup_job */, &group_id);
459 });
460
461 int rv = connect_job->Connect();
462 if (rv == ERR_IO_PENDING) {
463 if (preconnect_done_closure) {
464 DCHECK(preconnecting);
465 connect_job->set_done_closure(std::move(preconnect_done_closure));
466 }
467 // If we didn't have any sockets in this group, set a timer for potentially
468 // creating a new one. If the SYN is lost, this backup socket may complete
469 // before the slow socket, improving end user latency.
470 if (connect_backup_jobs_enabled_ && group->IsEmpty())
471 group->StartBackupJobTimer(group_id);
472 group->AddJob(std::move(connect_job), preconnecting);
473 connecting_socket_count_++;
474 return rv;
475 }
476
477 LogBoundConnectJobToRequest(connect_job->net_log().source(), request);
478 if (preconnecting) {
479 if (rv == OK)
480 AddIdleSocket(connect_job->PassSocket(), group);
481 } else {
482 DCHECK(handle);
483 if (rv != OK)
484 handle->SetAdditionalErrorState(connect_job.get());
485 std::unique_ptr<StreamSocket> socket = connect_job->PassSocket();
486 if (socket) {
487 HandOutSocket(std::move(socket), ClientSocketHandle::UNUSED,
488 connect_job->connect_timing(), handle,
489 base::TimeDelta() /* idle_time */, group,
490 request.net_log());
491 }
492 }
493 if (group->IsEmpty())
494 RemoveGroup(group_id);
495
496 return rv;
497 }
498
AssignIdleSocketToRequest(const Request & request,Group * group)499 bool TransportClientSocketPool::AssignIdleSocketToRequest(
500 const Request& request,
501 Group* group) {
502 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
503 auto idle_socket_it = idle_sockets->end();
504
505 // Iterate through the idle sockets forwards (oldest to newest)
506 // * Delete any disconnected ones.
507 // * If we find a used idle socket, assign to |idle_socket|. At the end,
508 // the |idle_socket_it| will be set to the newest used idle socket.
509 for (auto it = idle_sockets->begin(); it != idle_sockets->end();) {
510 // Check whether socket is usable. Note that it's unlikely that the socket
511 // is not usable because this function is always invoked after a
512 // reusability check, but in theory socket can be closed asynchronously.
513 const char* net_log_reason_utf8;
514 if (!it->IsUsable(&net_log_reason_utf8)) {
515 it->socket->NetLog().AddEventWithStringParams(
516 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
517 net_log_reason_utf8);
518 DecrementIdleCount();
519 it = idle_sockets->erase(it);
520 continue;
521 }
522
523 if (it->socket->WasEverUsed()) {
524 // We found one we can reuse!
525 idle_socket_it = it;
526 }
527
528 ++it;
529 }
530
531 // If we haven't found an idle socket, that means there are no used idle
532 // sockets. Pick the oldest (first) idle socket (FIFO).
533
534 if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty())
535 idle_socket_it = idle_sockets->begin();
536
537 if (idle_socket_it != idle_sockets->end()) {
538 DecrementIdleCount();
539 base::TimeDelta idle_time =
540 base::TimeTicks::Now() - idle_socket_it->start_time;
541 std::unique_ptr<StreamSocket> socket = std::move(idle_socket_it->socket);
542 idle_sockets->erase(idle_socket_it);
543 // TODO(davidben): If |idle_time| is under some low watermark, consider
544 // treating as UNUSED rather than UNUSED_IDLE. This will avoid
545 // HttpNetworkTransaction retrying on some errors.
546 ClientSocketHandle::SocketReuseType reuse_type =
547 socket->WasEverUsed() ? ClientSocketHandle::REUSED_IDLE
548 : ClientSocketHandle::UNUSED_IDLE;
549
550 HandOutSocket(std::move(socket), reuse_type,
551 LoadTimingInfo::ConnectTiming(), request.handle(), idle_time,
552 group, request.net_log());
553 return true;
554 }
555
556 return false;
557 }
558
559 // static
LogBoundConnectJobToRequest(const NetLogSource & connect_job_source,const Request & request)560 void TransportClientSocketPool::LogBoundConnectJobToRequest(
561 const NetLogSource& connect_job_source,
562 const Request& request) {
563 request.net_log().AddEventReferencingSource(
564 NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, connect_job_source);
565 }
566
SetPriority(const GroupId & group_id,ClientSocketHandle * handle,RequestPriority priority)567 void TransportClientSocketPool::SetPriority(const GroupId& group_id,
568 ClientSocketHandle* handle,
569 RequestPriority priority) {
570 auto group_it = group_map_.find(group_id);
571 if (group_it == group_map_.end()) {
572 DCHECK(base::Contains(pending_callback_map_, handle));
573 // The Request has already completed and been destroyed; nothing to
574 // reprioritize.
575 return;
576 }
577
578 group_it->second->SetPriority(handle, priority);
579 }
580
CancelRequest(const GroupId & group_id,ClientSocketHandle * handle,bool cancel_connect_job)581 void TransportClientSocketPool::CancelRequest(const GroupId& group_id,
582 ClientSocketHandle* handle,
583 bool cancel_connect_job) {
584 auto callback_it = pending_callback_map_.find(handle);
585 if (callback_it != pending_callback_map_.end()) {
586 int result = callback_it->second.result;
587 pending_callback_map_.erase(callback_it);
588 std::unique_ptr<StreamSocket> socket = handle->PassSocket();
589 if (socket) {
590 if (result != OK) {
591 socket->Disconnect();
592 } else if (cancel_connect_job) {
593 // Close the socket if |cancel_connect_job| is true and there are no
594 // other pending requests.
595 Group* group = GetOrCreateGroup(group_id);
596 if (group->unbound_request_count() == 0)
597 socket->Disconnect();
598 }
599 ReleaseSocket(handle->group_id(), std::move(socket),
600 handle->group_generation());
601 }
602 return;
603 }
604
605 CHECK(base::Contains(group_map_, group_id));
606 Group* group = GetOrCreateGroup(group_id);
607
608 std::unique_ptr<Request> request = group->FindAndRemoveBoundRequest(handle);
609 if (request) {
610 --connecting_socket_count_;
611 OnAvailableSocketSlot(group_id, group);
612 CheckForStalledSocketGroups();
613 return;
614 }
615
616 // Search |unbound_requests_| for matching handle.
617 request = group->FindAndRemoveUnboundRequest(handle);
618 if (request) {
619 request->net_log().AddEvent(NetLogEventType::CANCELLED);
620 request->net_log().EndEvent(NetLogEventType::SOCKET_POOL);
621
622 // Let the job run, unless |cancel_connect_job| is true, or we're at the
623 // socket limit and there are no other requests waiting on the job.
624 bool reached_limit = ReachedMaxSocketsLimit();
625 if (group->jobs().size() > group->unbound_request_count() &&
626 (cancel_connect_job || reached_limit)) {
627 RemoveConnectJob(group->jobs().begin()->get(), group);
628 if (group->IsEmpty())
629 RemoveGroup(group->group_id());
630 if (reached_limit)
631 CheckForStalledSocketGroups();
632 }
633 }
634 }
635
CloseIdleSockets(const char * net_log_reason_utf8)636 void TransportClientSocketPool::CloseIdleSockets(
637 const char* net_log_reason_utf8) {
638 CleanupIdleSockets(true, net_log_reason_utf8);
639 DCHECK_EQ(0, idle_socket_count_);
640 }
641
CloseIdleSocketsInGroup(const GroupId & group_id,const char * net_log_reason_utf8)642 void TransportClientSocketPool::CloseIdleSocketsInGroup(
643 const GroupId& group_id,
644 const char* net_log_reason_utf8) {
645 if (idle_socket_count_ == 0)
646 return;
647 auto it = group_map_.find(group_id);
648 if (it == group_map_.end())
649 return;
650 CleanupIdleSocketsInGroup(true, it->second, base::TimeTicks::Now(),
651 net_log_reason_utf8);
652 if (it->second->IsEmpty())
653 RemoveGroup(it);
654 }
655
IdleSocketCount() const656 int TransportClientSocketPool::IdleSocketCount() const {
657 return idle_socket_count_;
658 }
659
IdleSocketCountInGroup(const GroupId & group_id) const660 size_t TransportClientSocketPool::IdleSocketCountInGroup(
661 const GroupId& group_id) const {
662 auto i = group_map_.find(group_id);
663 CHECK(i != group_map_.end());
664
665 return i->second->idle_sockets().size();
666 }
667
GetLoadState(const GroupId & group_id,const ClientSocketHandle * handle) const668 LoadState TransportClientSocketPool::GetLoadState(
669 const GroupId& group_id,
670 const ClientSocketHandle* handle) const {
671 if (base::Contains(pending_callback_map_, handle))
672 return LOAD_STATE_CONNECTING;
673
674 auto group_it = group_map_.find(group_id);
675 if (group_it == group_map_.end()) {
676 // TODO(mmenke): This is actually reached in the wild, for unknown reasons.
677 // Would be great to understand why, and if it's a bug, fix it. If not,
678 // should have a test for that case.
679 NOTREACHED();
680 return LOAD_STATE_IDLE;
681 }
682
683 const Group& group = *group_it->second;
684 ConnectJob* job = group.GetConnectJobForHandle(handle);
685 if (job)
686 return job->GetLoadState();
687
688 if (group.CanUseAdditionalSocketSlot(max_sockets_per_group_))
689 return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL;
690 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
691 }
692
GetInfoAsValue(const std::string & name,const std::string & type) const693 base::Value TransportClientSocketPool::GetInfoAsValue(
694 const std::string& name,
695 const std::string& type) const {
696 // TODO(mmenke): This currently doesn't return bound Requests or ConnectJobs.
697 base::Value::Dict dict;
698 dict.Set("name", name);
699 dict.Set("type", type);
700 dict.Set("handed_out_socket_count", handed_out_socket_count_);
701 dict.Set("connecting_socket_count", connecting_socket_count_);
702 dict.Set("idle_socket_count", idle_socket_count_);
703 dict.Set("max_socket_count", max_sockets_);
704 dict.Set("max_sockets_per_group", max_sockets_per_group_);
705
706 if (group_map_.empty())
707 return base::Value(std::move(dict));
708
709 base::Value::Dict all_groups_dict;
710 for (const auto& entry : group_map_) {
711 const Group* group = entry.second;
712 base::Value::Dict group_dict;
713
714 group_dict.Set("pending_request_count",
715 static_cast<int>(group->unbound_request_count()));
716 if (group->has_unbound_requests()) {
717 group_dict.Set("top_pending_priority",
718 RequestPriorityToString(group->TopPendingPriority()));
719 }
720
721 group_dict.Set("active_socket_count", group->active_socket_count());
722
723 base::Value::List idle_socket_list;
724 for (const auto& idle_socket : group->idle_sockets()) {
725 int source_id = idle_socket.socket->NetLog().source().id;
726 idle_socket_list.Append(source_id);
727 }
728 group_dict.Set("idle_sockets", std::move(idle_socket_list));
729
730 base::Value::List connect_jobs_list;
731 for (const auto& job : group->jobs()) {
732 int source_id = job->net_log().source().id;
733 connect_jobs_list.Append(source_id);
734 }
735 group_dict.Set("connect_jobs", std::move(connect_jobs_list));
736
737 group_dict.Set("is_stalled",
738 group->CanUseAdditionalSocketSlot(max_sockets_per_group_));
739 group_dict.Set("backup_job_timer_is_running",
740 group->BackupJobTimerIsRunning());
741
742 all_groups_dict.Set(entry.first.ToString(), std::move(group_dict));
743 }
744 dict.Set("groups", std::move(all_groups_dict));
745 return base::Value(std::move(dict));
746 }
747
HasActiveSocket(const GroupId & group_id) const748 bool TransportClientSocketPool::HasActiveSocket(const GroupId& group_id) const {
749 return HasGroup(group_id);
750 }
751
IsUsable(const char ** net_log_reason_utf8) const752 bool TransportClientSocketPool::IdleSocket::IsUsable(
753 const char** net_log_reason_utf8) const {
754 DCHECK(net_log_reason_utf8);
755 if (socket->WasEverUsed()) {
756 if (!socket->IsConnectedAndIdle()) {
757 if (!socket->IsConnected()) {
758 *net_log_reason_utf8 = kRemoteSideClosedConnection;
759 } else {
760 *net_log_reason_utf8 = kDataReceivedUnexpectedly;
761 }
762 return false;
763 }
764 return true;
765 }
766
767 if (!socket->IsConnected()) {
768 *net_log_reason_utf8 = kRemoteSideClosedConnection;
769 return false;
770 }
771 return true;
772 }
773
TransportClientSocketPool(int max_sockets,int max_sockets_per_group,base::TimeDelta unused_idle_socket_timeout,base::TimeDelta used_idle_socket_timeout,const ProxyChain & proxy_chain,bool is_for_websockets,const CommonConnectJobParams * common_connect_job_params,bool cleanup_on_ip_address_change,std::unique_ptr<ConnectJobFactory> connect_job_factory,SSLClientContext * ssl_client_context,bool connect_backup_jobs_enabled)774 TransportClientSocketPool::TransportClientSocketPool(
775 int max_sockets,
776 int max_sockets_per_group,
777 base::TimeDelta unused_idle_socket_timeout,
778 base::TimeDelta used_idle_socket_timeout,
779 const ProxyChain& proxy_chain,
780 bool is_for_websockets,
781 const CommonConnectJobParams* common_connect_job_params,
782 bool cleanup_on_ip_address_change,
783 std::unique_ptr<ConnectJobFactory> connect_job_factory,
784 SSLClientContext* ssl_client_context,
785 bool connect_backup_jobs_enabled)
786 : ClientSocketPool(is_for_websockets,
787 common_connect_job_params,
788 std::move(connect_job_factory)),
789 max_sockets_(max_sockets),
790 max_sockets_per_group_(max_sockets_per_group),
791 unused_idle_socket_timeout_(unused_idle_socket_timeout),
792 used_idle_socket_timeout_(used_idle_socket_timeout),
793 proxy_chain_(proxy_chain),
794 cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
795 connect_backup_jobs_enabled_(connect_backup_jobs_enabled &&
796 g_connect_backup_jobs_enabled),
797 ssl_client_context_(ssl_client_context) {
798 DCHECK_LE(0, max_sockets_per_group);
799 DCHECK_LE(max_sockets_per_group, max_sockets);
800
801 if (cleanup_on_ip_address_change_)
802 NetworkChangeNotifier::AddIPAddressObserver(this);
803
804 if (ssl_client_context_)
805 ssl_client_context_->AddObserver(this);
806 }
807
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)808 void TransportClientSocketPool::OnSSLConfigChanged(
809 SSLClientContext::SSLConfigChangeType change_type) {
810 const char* message = nullptr;
811 // When the SSL config or cert verifier config changes, flush all idle
812 // sockets so they won't get re-used, and allow any active sockets to finish,
813 // but don't put them back in the socket pool.
814 switch (change_type) {
815 case SSLClientContext::SSLConfigChangeType::kSSLConfigChanged:
816 message = kNetworkChanged;
817 break;
818 case SSLClientContext::SSLConfigChangeType::kCertDatabaseChanged:
819 message = kCertDatabaseChanged;
820 break;
821 case SSLClientContext::SSLConfigChangeType::kCertVerifierChanged:
822 message = kCertVerifierChanged;
823 break;
824 };
825
826 base::TimeTicks now = base::TimeTicks::Now();
827 for (auto it = group_map_.begin(); it != group_map_.end();) {
828 it = RefreshGroup(it, now, message);
829 }
830 CheckForStalledSocketGroups();
831 }
832
833 // TODO(crbug.com/1206799): Get `server` as SchemeHostPort?
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)834 void TransportClientSocketPool::OnSSLConfigForServersChanged(
835 const base::flat_set<HostPortPair>& servers) {
836 // Current time value. Retrieving it once at the function start rather than
837 // inside the inner loop, since it shouldn't change by any meaningful amount.
838 //
839 // TODO(davidben): This value is not actually needed because
840 // CleanupIdleSocketsInGroup() is called with |force| = true. Tidy up
841 // interfaces so the parameter is not necessary.
842 base::TimeTicks now = base::TimeTicks::Now();
843
844 // If the proxy chain includes a server from `servers` and uses SSL settings
845 // (HTTPS or QUIC), refresh every group.
846 // TODO(https://crbug.com/1491092): Check each ProxyServer in `proxy_chain_`.
847 bool proxy_matches = false;
848 for (const ProxyServer& proxy_server : proxy_chain_.proxy_servers()) {
849 if (proxy_server.is_http_like() && !proxy_server.is_http() &&
850 servers.contains(proxy_server.host_port_pair())) {
851 proxy_matches = true;
852 }
853 }
854
855 bool refreshed_any = false;
856 for (auto it = group_map_.begin(); it != group_map_.end();) {
857 if (proxy_matches ||
858 (GURL::SchemeIsCryptographic(it->first.destination().scheme()) &&
859 servers.contains(
860 HostPortPair::FromSchemeHostPort(it->first.destination())))) {
861 refreshed_any = true;
862 // Note this call may destroy the group and invalidate |to_refresh|.
863 it = RefreshGroup(it, now, kSslConfigChanged);
864 } else {
865 ++it;
866 }
867 }
868
869 if (refreshed_any) {
870 // Check to see if any group can use the freed up socket slots. It would be
871 // more efficient to give the slots to the refreshed groups, if the still
872 // exists and need them, but this should be rare enough that it doesn't
873 // matter. This will also make sure the slots are given to the group with
874 // the highest priority request without an assigned ConnectJob.
875 CheckForStalledSocketGroups();
876 }
877 }
878
HasGroup(const GroupId & group_id) const879 bool TransportClientSocketPool::HasGroup(const GroupId& group_id) const {
880 return base::Contains(group_map_, group_id);
881 }
882
CleanupIdleSockets(bool force,const char * net_log_reason_utf8)883 void TransportClientSocketPool::CleanupIdleSockets(
884 bool force,
885 const char* net_log_reason_utf8) {
886 if (idle_socket_count_ == 0)
887 return;
888
889 // Current time value. Retrieving it once at the function start rather than
890 // inside the inner loop, since it shouldn't change by any meaningful amount.
891 base::TimeTicks now = base::TimeTicks::Now();
892
893 for (auto i = group_map_.begin(); i != group_map_.end();) {
894 Group* group = i->second;
895 CHECK(group);
896 CleanupIdleSocketsInGroup(force, group, now, net_log_reason_utf8);
897 // Delete group if no longer needed.
898 if (group->IsEmpty()) {
899 i = RemoveGroup(i);
900 } else {
901 ++i;
902 }
903 }
904 }
905
CloseOneIdleSocket()906 bool TransportClientSocketPool::CloseOneIdleSocket() {
907 if (idle_socket_count_ == 0)
908 return false;
909 return CloseOneIdleSocketExceptInGroup(nullptr);
910 }
911
CloseOneIdleConnectionInHigherLayeredPool()912 bool TransportClientSocketPool::CloseOneIdleConnectionInHigherLayeredPool() {
913 // This pool doesn't have any idle sockets. It's possible that a pool at a
914 // higher layer is holding one of this sockets active, but it's actually idle.
915 // Query the higher layers.
916 for (auto* higher_pool : higher_pools_) {
917 if (higher_pool->CloseOneIdleConnection())
918 return true;
919 }
920 return false;
921 }
922
CleanupIdleSocketsInGroup(bool force,Group * group,const base::TimeTicks & now,const char * net_log_reason_utf8)923 void TransportClientSocketPool::CleanupIdleSocketsInGroup(
924 bool force,
925 Group* group,
926 const base::TimeTicks& now,
927 const char* net_log_reason_utf8) {
928 // If |force| is true, a reason must be provided.
929 DCHECK(!force || net_log_reason_utf8);
930
931 auto idle_socket_it = group->mutable_idle_sockets()->begin();
932 while (idle_socket_it != group->idle_sockets().end()) {
933 bool should_clean_up = force;
934 const char* reason_for_closing_socket = net_log_reason_utf8;
935 base::TimeDelta timeout = idle_socket_it->socket->WasEverUsed()
936 ? used_idle_socket_timeout_
937 : unused_idle_socket_timeout_;
938
939 // Timeout errors take precedence over the reason for flushing sockets in
940 // the group, if applicable.
941 if (now - idle_socket_it->start_time >= timeout) {
942 should_clean_up = true;
943 reason_for_closing_socket = kIdleTimeLimitExpired;
944 }
945
946 // Usability errors take precedence over over other errors.
947 if (!idle_socket_it->IsUsable(&reason_for_closing_socket))
948 should_clean_up = true;
949
950 if (should_clean_up) {
951 DCHECK(reason_for_closing_socket);
952 idle_socket_it->socket->NetLog().AddEventWithStringParams(
953 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
954 reason_for_closing_socket);
955 idle_socket_it = group->mutable_idle_sockets()->erase(idle_socket_it);
956 DecrementIdleCount();
957 } else {
958 DCHECK(!reason_for_closing_socket);
959 ++idle_socket_it;
960 }
961 }
962 }
963
GetOrCreateGroup(const GroupId & group_id)964 TransportClientSocketPool::Group* TransportClientSocketPool::GetOrCreateGroup(
965 const GroupId& group_id) {
966 auto it = group_map_.find(group_id);
967 if (it != group_map_.end())
968 return it->second;
969 Group* group = new Group(group_id, this);
970 group_map_[group_id] = group;
971 return group;
972 }
973
RemoveGroup(const GroupId & group_id)974 void TransportClientSocketPool::RemoveGroup(const GroupId& group_id) {
975 auto it = group_map_.find(group_id);
976 CHECK(it != group_map_.end());
977
978 RemoveGroup(it);
979 }
980
981 TransportClientSocketPool::GroupMap::iterator
RemoveGroup(GroupMap::iterator it)982 TransportClientSocketPool::RemoveGroup(GroupMap::iterator it) {
983 delete it->second;
984 return group_map_.erase(it);
985 }
986
987 // static
connect_backup_jobs_enabled()988 bool TransportClientSocketPool::connect_backup_jobs_enabled() {
989 return g_connect_backup_jobs_enabled;
990 }
991
992 // static
set_connect_backup_jobs_enabled(bool enabled)993 bool TransportClientSocketPool::set_connect_backup_jobs_enabled(bool enabled) {
994 bool old_value = g_connect_backup_jobs_enabled;
995 g_connect_backup_jobs_enabled = enabled;
996 return old_value;
997 }
998
IncrementIdleCount()999 void TransportClientSocketPool::IncrementIdleCount() {
1000 ++idle_socket_count_;
1001 }
1002
DecrementIdleCount()1003 void TransportClientSocketPool::DecrementIdleCount() {
1004 --idle_socket_count_;
1005 }
1006
ReleaseSocket(const GroupId & group_id,std::unique_ptr<StreamSocket> socket,int64_t group_generation)1007 void TransportClientSocketPool::ReleaseSocket(
1008 const GroupId& group_id,
1009 std::unique_ptr<StreamSocket> socket,
1010 int64_t group_generation) {
1011 auto i = group_map_.find(group_id);
1012 CHECK(i != group_map_.end());
1013
1014 Group* group = i->second;
1015 CHECK(group);
1016
1017 CHECK_GT(handed_out_socket_count_, 0);
1018 handed_out_socket_count_--;
1019
1020 CHECK_GT(group->active_socket_count(), 0);
1021 group->DecrementActiveSocketCount();
1022
1023 bool can_resuse_socket = false;
1024 base::StringPiece not_reusable_reason;
1025 if (!socket->IsConnectedAndIdle()) {
1026 if (!socket->IsConnected()) {
1027 not_reusable_reason = kClosedConnectionReturnedToPool;
1028 } else {
1029 not_reusable_reason = kDataReceivedUnexpectedly;
1030 }
1031 } else if (group_generation != group->generation()) {
1032 not_reusable_reason = kSocketGenerationOutOfDate;
1033 } else {
1034 can_resuse_socket = true;
1035 }
1036
1037 if (can_resuse_socket) {
1038 DCHECK(not_reusable_reason.empty());
1039
1040 // Add it to the idle list.
1041 AddIdleSocket(std::move(socket), group);
1042 OnAvailableSocketSlot(group_id, group);
1043 } else {
1044 DCHECK(!not_reusable_reason.empty());
1045
1046 socket->NetLog().AddEventWithStringParams(
1047 NetLogEventType::SOCKET_POOL_CLOSING_SOCKET, "reason",
1048 not_reusable_reason);
1049 if (group->IsEmpty())
1050 RemoveGroup(i);
1051 socket.reset();
1052 }
1053
1054 CheckForStalledSocketGroups();
1055 }
1056
CheckForStalledSocketGroups()1057 void TransportClientSocketPool::CheckForStalledSocketGroups() {
1058 // Loop until there's nothing more to do.
1059 while (true) {
1060 // If we have idle sockets, see if we can give one to the top-stalled group.
1061 GroupId top_group_id;
1062 Group* top_group = nullptr;
1063 if (!FindTopStalledGroup(&top_group, &top_group_id))
1064 return;
1065
1066 if (ReachedMaxSocketsLimit()) {
1067 if (idle_socket_count_ > 0) {
1068 CloseOneIdleSocket();
1069 } else {
1070 // We can't activate more sockets since we're already at our global
1071 // limit.
1072 return;
1073 }
1074 }
1075
1076 // Note that this may delete top_group.
1077 OnAvailableSocketSlot(top_group_id, top_group);
1078 }
1079 }
1080
1081 // Search for the highest priority pending request, amongst the groups that
1082 // are not at the |max_sockets_per_group_| limit. Note: for requests with
1083 // the same priority, the winner is based on group hash ordering (and not
1084 // insertion order).
FindTopStalledGroup(Group ** group,GroupId * group_id) const1085 bool TransportClientSocketPool::FindTopStalledGroup(Group** group,
1086 GroupId* group_id) const {
1087 CHECK((group && group_id) || (!group && !group_id));
1088 Group* top_group = nullptr;
1089 const GroupId* top_group_id = nullptr;
1090 bool has_stalled_group = false;
1091 for (const auto& it : group_map_) {
1092 Group* curr_group = it.second;
1093 if (!curr_group->has_unbound_requests())
1094 continue;
1095 if (curr_group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1096 if (!group)
1097 return true;
1098 has_stalled_group = true;
1099 bool has_higher_priority =
1100 !top_group ||
1101 curr_group->TopPendingPriority() > top_group->TopPendingPriority();
1102 if (has_higher_priority) {
1103 top_group = curr_group;
1104 top_group_id = &it.first;
1105 }
1106 }
1107 }
1108
1109 if (top_group) {
1110 CHECK(group);
1111 *group = top_group;
1112 *group_id = *top_group_id;
1113 } else {
1114 CHECK(!has_stalled_group);
1115 }
1116 return has_stalled_group;
1117 }
1118
OnIPAddressChanged()1119 void TransportClientSocketPool::OnIPAddressChanged() {
1120 DCHECK(cleanup_on_ip_address_change_);
1121 FlushWithError(ERR_NETWORK_CHANGED, kNetworkChanged);
1122 }
1123
FlushWithError(int error,const char * net_log_reason_utf8)1124 void TransportClientSocketPool::FlushWithError(
1125 int error,
1126 const char* net_log_reason_utf8) {
1127 CancelAllConnectJobs();
1128 CloseIdleSockets(net_log_reason_utf8);
1129 CancelAllRequestsWithError(error);
1130 for (const auto& group : group_map_) {
1131 group.second->IncrementGeneration();
1132 }
1133 }
1134
RemoveConnectJob(ConnectJob * job,Group * group)1135 void TransportClientSocketPool::RemoveConnectJob(ConnectJob* job,
1136 Group* group) {
1137 CHECK_GT(connecting_socket_count_, 0);
1138 connecting_socket_count_--;
1139
1140 DCHECK(group);
1141 group->RemoveUnboundJob(job);
1142 }
1143
OnAvailableSocketSlot(const GroupId & group_id,Group * group)1144 void TransportClientSocketPool::OnAvailableSocketSlot(const GroupId& group_id,
1145 Group* group) {
1146 DCHECK(base::Contains(group_map_, group_id));
1147 if (group->IsEmpty()) {
1148 RemoveGroup(group_id);
1149 } else if (group->has_unbound_requests()) {
1150 ProcessPendingRequest(group_id, group);
1151 }
1152 }
1153
ProcessPendingRequest(const GroupId & group_id,Group * group)1154 void TransportClientSocketPool::ProcessPendingRequest(const GroupId& group_id,
1155 Group* group) {
1156 const Request* next_request = group->GetNextUnboundRequest();
1157 DCHECK(next_request);
1158
1159 // If the group has no idle sockets, and can't make use of an additional slot,
1160 // either because it's at the limit or because it's at the socket per group
1161 // limit, then there's nothing to do.
1162 if (group->idle_sockets().empty() &&
1163 !group->CanUseAdditionalSocketSlot(max_sockets_per_group_)) {
1164 return;
1165 }
1166
1167 int rv =
1168 RequestSocketInternal(group_id, *next_request,
1169 /*preconnect_done_closure=*/base::OnceClosure());
1170 if (rv != ERR_IO_PENDING) {
1171 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1172 DCHECK(request);
1173 if (group->IsEmpty())
1174 RemoveGroup(group_id);
1175
1176 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1177 rv);
1178 InvokeUserCallbackLater(request->handle(), request->release_callback(), rv,
1179 request->socket_tag());
1180 }
1181 }
1182
HandOutSocket(std::unique_ptr<StreamSocket> socket,ClientSocketHandle::SocketReuseType reuse_type,const LoadTimingInfo::ConnectTiming & connect_timing,ClientSocketHandle * handle,base::TimeDelta idle_time,Group * group,const NetLogWithSource & net_log)1183 void TransportClientSocketPool::HandOutSocket(
1184 std::unique_ptr<StreamSocket> socket,
1185 ClientSocketHandle::SocketReuseType reuse_type,
1186 const LoadTimingInfo::ConnectTiming& connect_timing,
1187 ClientSocketHandle* handle,
1188 base::TimeDelta idle_time,
1189 Group* group,
1190 const NetLogWithSource& net_log) {
1191 DCHECK(socket);
1192 handle->SetSocket(std::move(socket));
1193 handle->set_reuse_type(reuse_type);
1194 handle->set_idle_time(idle_time);
1195 handle->set_group_generation(group->generation());
1196 handle->set_connect_timing(connect_timing);
1197
1198 if (reuse_type == ClientSocketHandle::REUSED_IDLE) {
1199 net_log.AddEventWithIntParams(
1200 NetLogEventType::SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, "idle_ms",
1201 static_cast<int>(idle_time.InMilliseconds()));
1202 }
1203
1204 net_log.AddEventReferencingSource(
1205 NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
1206 handle->socket()->NetLog().source());
1207
1208 handed_out_socket_count_++;
1209 group->IncrementActiveSocketCount();
1210 }
1211
AddIdleSocket(std::unique_ptr<StreamSocket> socket,Group * group)1212 void TransportClientSocketPool::AddIdleSocket(
1213 std::unique_ptr<StreamSocket> socket,
1214 Group* group) {
1215 DCHECK(socket);
1216 IdleSocket idle_socket;
1217 idle_socket.socket = std::move(socket);
1218 idle_socket.start_time = base::TimeTicks::Now();
1219
1220 group->mutable_idle_sockets()->push_back(std::move(idle_socket));
1221 IncrementIdleCount();
1222 }
1223
CancelAllConnectJobs()1224 void TransportClientSocketPool::CancelAllConnectJobs() {
1225 for (auto i = group_map_.begin(); i != group_map_.end();) {
1226 Group* group = i->second;
1227 CHECK(group);
1228 connecting_socket_count_ -= group->jobs().size();
1229 group->RemoveAllUnboundJobs();
1230
1231 // Delete group if no longer needed.
1232 if (group->IsEmpty()) {
1233 i = RemoveGroup(i);
1234 } else {
1235 ++i;
1236 }
1237 }
1238 }
1239
CancelAllRequestsWithError(int error)1240 void TransportClientSocketPool::CancelAllRequestsWithError(int error) {
1241 for (auto i = group_map_.begin(); i != group_map_.end();) {
1242 Group* group = i->second;
1243 CHECK(group);
1244
1245 while (true) {
1246 std::unique_ptr<Request> request = group->PopNextUnboundRequest();
1247 if (!request)
1248 break;
1249 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1250 error, request->socket_tag());
1251 }
1252
1253 // Mark bound connect jobs as needing to fail. Can't fail them immediately
1254 // because they may have access to objects owned by the ConnectJob, and
1255 // could access them if a user callback invocation is queued. It would also
1256 // result in the consumer handling two messages at once, which in general
1257 // isn't safe for a lot of code.
1258 group->SetPendingErrorForAllBoundRequests(error);
1259
1260 // Delete group if no longer needed.
1261 if (group->IsEmpty()) {
1262 i = RemoveGroup(i);
1263 } else {
1264 ++i;
1265 }
1266 }
1267 }
1268
ReachedMaxSocketsLimit() const1269 bool TransportClientSocketPool::ReachedMaxSocketsLimit() const {
1270 // Each connecting socket will eventually connect and be handed out.
1271 int total =
1272 handed_out_socket_count_ + connecting_socket_count_ + idle_socket_count_;
1273 // There can be more sockets than the limit since some requests can ignore
1274 // the limit
1275 if (total < max_sockets_)
1276 return false;
1277 return true;
1278 }
1279
CloseOneIdleSocketExceptInGroup(const Group * exception_group)1280 bool TransportClientSocketPool::CloseOneIdleSocketExceptInGroup(
1281 const Group* exception_group) {
1282 CHECK_GT(idle_socket_count_, 0);
1283
1284 for (auto i = group_map_.begin(); i != group_map_.end(); ++i) {
1285 Group* group = i->second;
1286 CHECK(group);
1287 if (exception_group == group)
1288 continue;
1289 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets();
1290
1291 if (!idle_sockets->empty()) {
1292 idle_sockets->pop_front();
1293 DecrementIdleCount();
1294 if (group->IsEmpty())
1295 RemoveGroup(i);
1296
1297 return true;
1298 }
1299 }
1300
1301 return false;
1302 }
1303
OnConnectJobComplete(Group * group,int result,ConnectJob * job)1304 void TransportClientSocketPool::OnConnectJobComplete(Group* group,
1305 int result,
1306 ConnectJob* job) {
1307 DCHECK_NE(ERR_IO_PENDING, result);
1308 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1309 DCHECK_EQ(group, group_map_[group->group_id()]);
1310 DCHECK(result != OK || job->socket() != nullptr);
1311
1312 // Check if the ConnectJob is already bound to a Request. If so, result is
1313 // returned to that specific request.
1314 absl::optional<Group::BoundRequest> bound_request =
1315 group->FindAndRemoveBoundRequestForConnectJob(job);
1316 Request* request = nullptr;
1317 std::unique_ptr<Request> owned_request;
1318 if (bound_request) {
1319 --connecting_socket_count_;
1320
1321 // If the socket pools were previously flushed with an error, return that
1322 // error to the bound request and discard the socket.
1323 if (bound_request->pending_error != OK) {
1324 InvokeUserCallbackLater(bound_request->request->handle(),
1325 bound_request->request->release_callback(),
1326 bound_request->pending_error,
1327 bound_request->request->socket_tag());
1328 bound_request->request->net_log().EndEventWithNetErrorCode(
1329 NetLogEventType::SOCKET_POOL, bound_request->pending_error);
1330 OnAvailableSocketSlot(group->group_id(), group);
1331 CheckForStalledSocketGroups();
1332 return;
1333 }
1334
1335 // If the ConnectJob is from a previous generation, add the request back to
1336 // the group, and kick off another request. The socket will be discarded.
1337 if (bound_request->generation != group->generation()) {
1338 group->InsertUnboundRequest(std::move(bound_request->request));
1339 OnAvailableSocketSlot(group->group_id(), group);
1340 CheckForStalledSocketGroups();
1341 return;
1342 }
1343
1344 request = bound_request->request.get();
1345 } else {
1346 // In this case, RemoveConnectJob(job, _) must be called before exiting this
1347 // method. Otherwise, |job| will be leaked.
1348 owned_request = group->PopNextUnboundRequest();
1349 request = owned_request.get();
1350
1351 if (!request) {
1352 if (result == OK)
1353 AddIdleSocket(job->PassSocket(), group);
1354 RemoveConnectJob(job, group);
1355 OnAvailableSocketSlot(group->group_id(), group);
1356 CheckForStalledSocketGroups();
1357 return;
1358 }
1359
1360 LogBoundConnectJobToRequest(job->net_log().source(), *request);
1361 }
1362
1363 // The case where there's no request is handled above.
1364 DCHECK(request);
1365
1366 if (result != OK)
1367 request->handle()->SetAdditionalErrorState(job);
1368 if (job->socket()) {
1369 HandOutSocket(job->PassSocket(), ClientSocketHandle::UNUSED,
1370 job->connect_timing(), request->handle(), base::TimeDelta(),
1371 group, request->net_log());
1372 }
1373 request->net_log().EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
1374 result);
1375 InvokeUserCallbackLater(request->handle(), request->release_callback(),
1376 result, request->socket_tag());
1377 if (!bound_request)
1378 RemoveConnectJob(job, group);
1379 // If no socket was handed out, there's a new socket slot available.
1380 if (!request->handle()->socket()) {
1381 OnAvailableSocketSlot(group->group_id(), group);
1382 CheckForStalledSocketGroups();
1383 }
1384 }
1385
OnNeedsProxyAuth(Group * group,const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1386 void TransportClientSocketPool::OnNeedsProxyAuth(
1387 Group* group,
1388 const HttpResponseInfo& response,
1389 HttpAuthController* auth_controller,
1390 base::OnceClosure restart_with_auth_callback,
1391 ConnectJob* job) {
1392 DCHECK(group_map_.find(group->group_id()) != group_map_.end());
1393 DCHECK_EQ(group, group_map_[group->group_id()]);
1394
1395 const Request* request = group->BindRequestToConnectJob(job);
1396 // If can't bind the ConnectJob to a request, treat this as a ConnectJob
1397 // failure.
1398 if (!request) {
1399 OnConnectJobComplete(group, ERR_PROXY_AUTH_REQUESTED, job);
1400 return;
1401 }
1402
1403 request->proxy_auth_callback().Run(response, auth_controller,
1404 std::move(restart_with_auth_callback));
1405 }
1406
InvokeUserCallbackLater(ClientSocketHandle * handle,CompletionOnceCallback callback,int rv,const SocketTag & socket_tag)1407 void TransportClientSocketPool::InvokeUserCallbackLater(
1408 ClientSocketHandle* handle,
1409 CompletionOnceCallback callback,
1410 int rv,
1411 const SocketTag& socket_tag) {
1412 CHECK(!base::Contains(pending_callback_map_, handle));
1413 pending_callback_map_[handle] = CallbackResultPair(std::move(callback), rv);
1414 if (rv == OK) {
1415 handle->socket()->ApplySocketTag(socket_tag);
1416 }
1417 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
1418 FROM_HERE, base::BindOnce(&TransportClientSocketPool::InvokeUserCallback,
1419 weak_factory_.GetWeakPtr(),
1420 // This is safe as `handle` is checked against a
1421 // map to verify it's alive before dereference.
1422 // This code path must only be reachable by
1423 // `handle`s that have had Init called.
1424 base::UnsafeDangling(handle)));
1425 }
1426
InvokeUserCallback(MayBeDangling<ClientSocketHandle> handle)1427 void TransportClientSocketPool::InvokeUserCallback(
1428 MayBeDangling<ClientSocketHandle> handle) {
1429 auto it = pending_callback_map_.find(handle);
1430
1431 // Exit if the request has already been cancelled.
1432 if (it == pending_callback_map_.end())
1433 return;
1434
1435 CHECK(!handle->is_initialized());
1436 CompletionOnceCallback callback = std::move(it->second.callback);
1437 int result = it->second.result;
1438 pending_callback_map_.erase(it);
1439 std::move(callback).Run(result);
1440 }
1441
TryToCloseSocketsInLayeredPools()1442 void TransportClientSocketPool::TryToCloseSocketsInLayeredPools() {
1443 while (IsStalled()) {
1444 // Closing a socket will result in calling back into |this| to use the freed
1445 // socket slot, so nothing else is needed.
1446 if (!CloseOneIdleConnectionInHigherLayeredPool())
1447 return;
1448 }
1449 }
1450
1451 TransportClientSocketPool::GroupMap::iterator
RefreshGroup(GroupMap::iterator it,const base::TimeTicks & now,const char * net_log_reason_utf8)1452 TransportClientSocketPool::RefreshGroup(GroupMap::iterator it,
1453 const base::TimeTicks& now,
1454 const char* net_log_reason_utf8) {
1455 Group* group = it->second;
1456 CHECK(group);
1457 CleanupIdleSocketsInGroup(true /* force */, group, now, net_log_reason_utf8);
1458
1459 connecting_socket_count_ -= group->jobs().size();
1460 group->RemoveAllUnboundJobs();
1461
1462 // Otherwise, prevent reuse of existing sockets.
1463 group->IncrementGeneration();
1464
1465 // Delete group if no longer needed.
1466 if (group->IsEmpty()) {
1467 return RemoveGroup(it);
1468 }
1469 return ++it;
1470 }
1471
Group(const GroupId & group_id,TransportClientSocketPool * client_socket_pool)1472 TransportClientSocketPool::Group::Group(
1473 const GroupId& group_id,
1474 TransportClientSocketPool* client_socket_pool)
1475 : group_id_(group_id),
1476 client_socket_pool_(client_socket_pool),
1477 unbound_requests_(NUM_PRIORITIES) {}
1478
~Group()1479 TransportClientSocketPool::Group::~Group() {
1480 DCHECK_EQ(0u, never_assigned_job_count());
1481 DCHECK_EQ(0u, unassigned_job_count());
1482 DCHECK(unbound_requests_.empty());
1483 DCHECK(jobs_.empty());
1484 DCHECK(bound_requests_.empty());
1485 }
1486
OnConnectJobComplete(int result,ConnectJob * job)1487 void TransportClientSocketPool::Group::OnConnectJobComplete(int result,
1488 ConnectJob* job) {
1489 DCHECK_NE(ERR_IO_PENDING, result);
1490 client_socket_pool_->OnConnectJobComplete(this, result, job);
1491 }
1492
OnNeedsProxyAuth(const HttpResponseInfo & response,HttpAuthController * auth_controller,base::OnceClosure restart_with_auth_callback,ConnectJob * job)1493 void TransportClientSocketPool::Group::OnNeedsProxyAuth(
1494 const HttpResponseInfo& response,
1495 HttpAuthController* auth_controller,
1496 base::OnceClosure restart_with_auth_callback,
1497 ConnectJob* job) {
1498 client_socket_pool_->OnNeedsProxyAuth(this, response, auth_controller,
1499 std::move(restart_with_auth_callback),
1500 job);
1501 }
1502
StartBackupJobTimer(const GroupId & group_id)1503 void TransportClientSocketPool::Group::StartBackupJobTimer(
1504 const GroupId& group_id) {
1505 // Only allow one timer to run at a time.
1506 if (BackupJobTimerIsRunning())
1507 return;
1508
1509 // Unretained here is okay because |backup_job_timer_| is
1510 // automatically cancelled when it's destroyed.
1511 backup_job_timer_.Start(FROM_HERE,
1512 client_socket_pool_->ConnectRetryInterval(),
1513 base::BindOnce(&Group::OnBackupJobTimerFired,
1514 base::Unretained(this), group_id));
1515 }
1516
BackupJobTimerIsRunning() const1517 bool TransportClientSocketPool::Group::BackupJobTimerIsRunning() const {
1518 return backup_job_timer_.IsRunning();
1519 }
1520
TryToUseNeverAssignedConnectJob()1521 bool TransportClientSocketPool::Group::TryToUseNeverAssignedConnectJob() {
1522 SanityCheck();
1523
1524 if (never_assigned_job_count_ == 0)
1525 return false;
1526 --never_assigned_job_count_;
1527 return true;
1528 }
1529
AddJob(std::unique_ptr<ConnectJob> job,bool is_preconnect)1530 void TransportClientSocketPool::Group::AddJob(std::unique_ptr<ConnectJob> job,
1531 bool is_preconnect) {
1532 SanityCheck();
1533
1534 if (is_preconnect)
1535 ++never_assigned_job_count_;
1536 jobs_.push_back(std::move(job));
1537 TryToAssignUnassignedJob(jobs_.back().get());
1538
1539 SanityCheck();
1540 }
1541
RemoveUnboundJob(ConnectJob * job)1542 std::unique_ptr<ConnectJob> TransportClientSocketPool::Group::RemoveUnboundJob(
1543 ConnectJob* job) {
1544 SanityCheck();
1545
1546 // Check that |job| is in the list.
1547 auto it = base::ranges::find(jobs_, job, &std::unique_ptr<ConnectJob>::get);
1548 DCHECK(it != jobs_.end());
1549
1550 // Check if |job| is in the unassigned jobs list. If so, remove it.
1551 auto it2 = base::ranges::find(unassigned_jobs_, job);
1552 if (it2 != unassigned_jobs_.end()) {
1553 unassigned_jobs_.erase(it2);
1554 } else {
1555 // Otherwise, |job| must be assigned to some Request. Unassign it, then
1556 // try to replace it with another job if possible (either by taking an
1557 // unassigned job or stealing from another request, if any requests after it
1558 // have a job).
1559 RequestQueue::Pointer request_with_job = FindUnboundRequestWithJob(job);
1560 DCHECK(!request_with_job.is_null());
1561 request_with_job.value()->ReleaseJob();
1562 TryToAssignJobToRequest(request_with_job);
1563 }
1564 std::unique_ptr<ConnectJob> owned_job = std::move(*it);
1565 jobs_.erase(it);
1566
1567 size_t job_count = jobs_.size();
1568 if (job_count < never_assigned_job_count_)
1569 never_assigned_job_count_ = job_count;
1570
1571 // If we've got no more jobs for this group, then we no longer need a
1572 // backup job either.
1573 if (jobs_.empty()) {
1574 DCHECK(unassigned_jobs_.empty());
1575 backup_job_timer_.Stop();
1576 }
1577
1578 SanityCheck();
1579 return owned_job;
1580 }
1581
OnBackupJobTimerFired(const GroupId & group_id)1582 void TransportClientSocketPool::Group::OnBackupJobTimerFired(
1583 const GroupId& group_id) {
1584 // If there are no more jobs pending, there is no work to do.
1585 // If we've done our cleanups correctly, this should not happen.
1586 if (jobs_.empty()) {
1587 NOTREACHED();
1588 return;
1589 }
1590
1591 // If the old job has already established a connection, don't start a backup
1592 // job. Backup jobs are only for issues establishing the initial TCP
1593 // connection - the timeout they used is tuned for that, and tests expect that
1594 // behavior.
1595 //
1596 // TODO(https://crbug.com/929814): Replace both this and the
1597 // LOAD_STATE_RESOLVING_HOST check with a callback. Use the
1598 // LOAD_STATE_RESOLVING_HOST callback to start the timer (And invoke the
1599 // OnHostResolved callback of any pending requests), and the
1600 // HasEstablishedConnection() callback to stop the timer. That should result
1601 // in a more robust, testable API.
1602 if ((*jobs_.begin())->HasEstablishedConnection())
1603 return;
1604
1605 // If our old job is waiting on DNS, or if we can't create any sockets
1606 // right now due to limits, just reset the timer.
1607 if (client_socket_pool_->ReachedMaxSocketsLimit() ||
1608 !HasAvailableSocketSlot(client_socket_pool_->max_sockets_per_group_) ||
1609 (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) {
1610 StartBackupJobTimer(group_id);
1611 return;
1612 }
1613
1614 if (unbound_requests_.empty())
1615 return;
1616
1617 Request* request = unbound_requests_.FirstMax().value().get();
1618 std::unique_ptr<ConnectJob> owned_backup_job =
1619 client_socket_pool_->CreateConnectJob(
1620 group_id, request->socket_params(), client_socket_pool_->proxy_chain_,
1621 request->proxy_annotation_tag(), request->priority(),
1622 request->socket_tag(), this);
1623 owned_backup_job->net_log().AddEvent(
1624 NetLogEventType::SOCKET_POOL_CONNECT_JOB_CREATED, [&] {
1625 return NetLogCreateConnectJobParams(true /* backup_job */, &group_id_);
1626 });
1627 ConnectJob* backup_job = owned_backup_job.get();
1628 AddJob(std::move(owned_backup_job), false);
1629 client_socket_pool_->connecting_socket_count_++;
1630 int rv = backup_job->Connect();
1631 if (rv != ERR_IO_PENDING) {
1632 client_socket_pool_->OnConnectJobComplete(this, rv, backup_job);
1633 }
1634 }
1635
SanityCheck() const1636 void TransportClientSocketPool::Group::SanityCheck() const {
1637 #if DCHECK_IS_ON()
1638 DCHECK_LE(never_assigned_job_count(), jobs_.size());
1639 DCHECK_LE(unassigned_job_count(), jobs_.size());
1640
1641 // Check that |unassigned_jobs_| is empty iff there are at least as many
1642 // requests as jobs.
1643 DCHECK_EQ(unassigned_jobs_.empty(), jobs_.size() <= unbound_requests_.size());
1644
1645 size_t num_assigned_jobs = jobs_.size() - unassigned_jobs_.size();
1646
1647 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1648 for (size_t i = 0; i < unbound_requests_.size();
1649 ++i, pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1650 DCHECK(!pointer.is_null());
1651 DCHECK(pointer.value());
1652 // Check that the first |num_assigned_jobs| requests have valid job
1653 // assignments.
1654 if (i < num_assigned_jobs) {
1655 // The request has a job.
1656 ConnectJob* job = pointer.value()->job();
1657 DCHECK(job);
1658 // The request's job is not in |unassigned_jobs_|
1659 DCHECK(!base::Contains(unassigned_jobs_, job));
1660 // The request's job is in |jobs_|
1661 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1662 // The same job is not assigned to any other request with a job.
1663 RequestQueue::Pointer pointer2 =
1664 unbound_requests_.GetNextTowardsLastMin(pointer);
1665 for (size_t j = i + 1; j < num_assigned_jobs;
1666 ++j, pointer2 = unbound_requests_.GetNextTowardsLastMin(pointer2)) {
1667 DCHECK(!pointer2.is_null());
1668 ConnectJob* job2 = pointer2.value()->job();
1669 DCHECK(job2);
1670 DCHECK_NE(job, job2);
1671 }
1672 DCHECK_EQ(pointer.value()->priority(), job->priority());
1673 } else {
1674 // Check that any subsequent requests do not have a job.
1675 DCHECK(!pointer.value()->job());
1676 }
1677 }
1678
1679 for (auto it = unassigned_jobs_.begin(); it != unassigned_jobs_.end(); ++it) {
1680 // Check that all unassigned jobs are in |jobs_|
1681 ConnectJob* job = *it;
1682 DCHECK(base::Contains(jobs_, job, &std::unique_ptr<ConnectJob>::get));
1683 // Check that there are no duplicated entries in |unassigned_jobs_|
1684 for (auto it2 = std::next(it); it2 != unassigned_jobs_.end(); ++it2) {
1685 DCHECK_NE(job, *it2);
1686 }
1687
1688 // Check that no |unassigned_jobs_| are in |bound_requests_|.
1689 DCHECK(!base::Contains(bound_requests_, job,
1690 [](const BoundRequest& bound_request) {
1691 return bound_request.connect_job.get();
1692 }));
1693 }
1694 #endif
1695 }
1696
RemoveAllUnboundJobs()1697 void TransportClientSocketPool::Group::RemoveAllUnboundJobs() {
1698 SanityCheck();
1699
1700 // Remove jobs from any requests that have them.
1701 if (!unbound_requests_.empty()) {
1702 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1703 !pointer.is_null() && pointer.value()->job();
1704 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1705 pointer.value()->ReleaseJob();
1706 }
1707 }
1708 unassigned_jobs_.clear();
1709 never_assigned_job_count_ = 0;
1710
1711 // Diagnostics check for crbug.com/1231248. `Group`s are deleted only on
1712 // removal from `TransportClientSocketPool::group_map_`, so if this check
1713 // fails, `this` has been deleted, likely through some reentrancy issue.
1714 CHECK(client_socket_pool_->HasGroup(group_id_));
1715
1716 // Delete active jobs.
1717 jobs_.clear();
1718 // Stop backup job timer.
1719 backup_job_timer_.Stop();
1720
1721 SanityCheck();
1722 }
1723
ConnectJobCount() const1724 size_t TransportClientSocketPool::Group::ConnectJobCount() const {
1725 return bound_requests_.size() + jobs_.size();
1726 }
1727
GetConnectJobForHandle(const ClientSocketHandle * handle) const1728 ConnectJob* TransportClientSocketPool::Group::GetConnectJobForHandle(
1729 const ClientSocketHandle* handle) const {
1730 // Search through bound requests for |handle|.
1731 for (const auto& bound_pair : bound_requests_) {
1732 if (handle == bound_pair.request->handle())
1733 return bound_pair.connect_job.get();
1734 }
1735
1736 // Search through the unbound requests that have corresponding jobs for a
1737 // request with |handle|.
1738 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1739 !pointer.is_null() && pointer.value()->job();
1740 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1741 if (pointer.value()->handle() == handle)
1742 return pointer.value()->job();
1743 }
1744
1745 return nullptr;
1746 }
1747
InsertUnboundRequest(std::unique_ptr<Request> request)1748 void TransportClientSocketPool::Group::InsertUnboundRequest(
1749 std::unique_ptr<Request> request) {
1750 SanityCheck();
1751
1752 // Should not have a job because it is not already in |unbound_requests_|
1753 DCHECK(!request->job());
1754 // This value must be cached before we release |request|.
1755 RequestPriority priority = request->priority();
1756
1757 RequestQueue::Pointer new_position;
1758 if (request->respect_limits() == RespectLimits::DISABLED) {
1759 // Put requests with RespectLimits::DISABLED (which should have
1760 // priority == MAXIMUM_PRIORITY) ahead of other requests with
1761 // MAXIMUM_PRIORITY.
1762 DCHECK_EQ(priority, MAXIMUM_PRIORITY);
1763 new_position =
1764 unbound_requests_.InsertAtFront(std::move(request), priority);
1765 } else {
1766 new_position = unbound_requests_.Insert(std::move(request), priority);
1767 }
1768 DCHECK(!unbound_requests_.empty());
1769
1770 TryToAssignJobToRequest(new_position);
1771
1772 SanityCheck();
1773 }
1774
1775 const TransportClientSocketPool::Request*
GetNextUnboundRequest() const1776 TransportClientSocketPool::Group::GetNextUnboundRequest() const {
1777 return unbound_requests_.empty() ? nullptr
1778 : unbound_requests_.FirstMax().value().get();
1779 }
1780
1781 std::unique_ptr<TransportClientSocketPool::Request>
PopNextUnboundRequest()1782 TransportClientSocketPool::Group::PopNextUnboundRequest() {
1783 if (unbound_requests_.empty())
1784 return nullptr;
1785 return RemoveUnboundRequest(unbound_requests_.FirstMax());
1786 }
1787
1788 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveUnboundRequest(ClientSocketHandle * handle)1789 TransportClientSocketPool::Group::FindAndRemoveUnboundRequest(
1790 ClientSocketHandle* handle) {
1791 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1792 !pointer.is_null();
1793 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1794 if (pointer.value()->handle() == handle) {
1795 DCHECK_EQ(static_cast<RequestPriority>(pointer.priority()),
1796 pointer.value()->priority());
1797 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1798 return request;
1799 }
1800 }
1801 return nullptr;
1802 }
1803
SetPendingErrorForAllBoundRequests(int pending_error)1804 void TransportClientSocketPool::Group::SetPendingErrorForAllBoundRequests(
1805 int pending_error) {
1806 for (auto& bound_request : bound_requests_) {
1807 // Earlier errors take precedence.
1808 if (bound_request.pending_error == OK)
1809 bound_request.pending_error = pending_error;
1810 }
1811 }
1812
1813 const TransportClientSocketPool::Request*
BindRequestToConnectJob(ConnectJob * connect_job)1814 TransportClientSocketPool::Group::BindRequestToConnectJob(
1815 ConnectJob* connect_job) {
1816 // Check if |job| is already bound to a Request.
1817 for (const auto& bound_pair : bound_requests_) {
1818 if (bound_pair.connect_job.get() == connect_job)
1819 return bound_pair.request.get();
1820 }
1821
1822 // If not, try to bind it to a Request.
1823 const Request* request = GetNextUnboundRequest();
1824 // If there are no pending requests, or the highest priority request has no
1825 // callback to handle auth challenges, return nullptr.
1826 if (!request || request->proxy_auth_callback().is_null())
1827 return nullptr;
1828
1829 // Otherwise, bind the ConnectJob to the Request.
1830 std::unique_ptr<Request> owned_request = PopNextUnboundRequest();
1831 DCHECK_EQ(owned_request.get(), request);
1832 std::unique_ptr<ConnectJob> owned_connect_job = RemoveUnboundJob(connect_job);
1833 LogBoundConnectJobToRequest(owned_connect_job->net_log().source(), *request);
1834 bound_requests_.emplace_back(BoundRequest(
1835 std::move(owned_connect_job), std::move(owned_request), generation()));
1836 return request;
1837 }
1838
1839 absl::optional<TransportClientSocketPool::Group::BoundRequest>
FindAndRemoveBoundRequestForConnectJob(ConnectJob * connect_job)1840 TransportClientSocketPool::Group::FindAndRemoveBoundRequestForConnectJob(
1841 ConnectJob* connect_job) {
1842 for (auto bound_pair = bound_requests_.begin();
1843 bound_pair != bound_requests_.end(); ++bound_pair) {
1844 if (bound_pair->connect_job.get() != connect_job)
1845 continue;
1846 BoundRequest ret = std::move(*bound_pair);
1847 bound_requests_.erase(bound_pair);
1848 return std::move(ret);
1849 }
1850 return absl::nullopt;
1851 }
1852
1853 std::unique_ptr<TransportClientSocketPool::Request>
FindAndRemoveBoundRequest(ClientSocketHandle * client_socket_handle)1854 TransportClientSocketPool::Group::FindAndRemoveBoundRequest(
1855 ClientSocketHandle* client_socket_handle) {
1856 for (auto bound_pair = bound_requests_.begin();
1857 bound_pair != bound_requests_.end(); ++bound_pair) {
1858 if (bound_pair->request->handle() != client_socket_handle)
1859 continue;
1860 std::unique_ptr<Request> request = std::move(bound_pair->request);
1861 bound_requests_.erase(bound_pair);
1862 return request;
1863 }
1864 return nullptr;
1865 }
1866
SetPriority(ClientSocketHandle * handle,RequestPriority priority)1867 void TransportClientSocketPool::Group::SetPriority(ClientSocketHandle* handle,
1868 RequestPriority priority) {
1869 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1870 !pointer.is_null();
1871 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1872 if (pointer.value()->handle() == handle) {
1873 if (pointer.value()->priority() == priority)
1874 return;
1875
1876 std::unique_ptr<Request> request = RemoveUnboundRequest(pointer);
1877
1878 // Requests that ignore limits much be created and remain at the highest
1879 // priority, and should not be reprioritized.
1880 DCHECK_EQ(request->respect_limits(), RespectLimits::ENABLED);
1881
1882 request->set_priority(priority);
1883 InsertUnboundRequest(std::move(request));
1884 return;
1885 }
1886 }
1887
1888 // This function must be called with a valid ClientSocketHandle.
1889 NOTREACHED();
1890 }
1891
RequestWithHandleHasJobForTesting(const ClientSocketHandle * handle) const1892 bool TransportClientSocketPool::Group::RequestWithHandleHasJobForTesting(
1893 const ClientSocketHandle* handle) const {
1894 SanityCheck();
1895 if (GetConnectJobForHandle(handle))
1896 return true;
1897
1898 // There's no corresponding ConnectJob. Verify that the handle is at least
1899 // owned by a request.
1900 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1901 for (size_t i = 0; i < unbound_requests_.size(); ++i) {
1902 if (pointer.value()->handle() == handle)
1903 return false;
1904 pointer = unbound_requests_.GetNextTowardsLastMin(pointer);
1905 }
1906 NOTREACHED();
1907 return false;
1908 }
1909
BoundRequest()1910 TransportClientSocketPool::Group::BoundRequest::BoundRequest()
1911 : pending_error(OK) {}
1912
BoundRequest(std::unique_ptr<ConnectJob> connect_job,std::unique_ptr<Request> request,int64_t generation)1913 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1914 std::unique_ptr<ConnectJob> connect_job,
1915 std::unique_ptr<Request> request,
1916 int64_t generation)
1917 : connect_job(std::move(connect_job)),
1918 request(std::move(request)),
1919 generation(generation),
1920 pending_error(OK) {}
1921
1922 TransportClientSocketPool::Group::BoundRequest::BoundRequest(
1923 BoundRequest&& other) = default;
1924
1925 TransportClientSocketPool::Group::BoundRequest&
1926 TransportClientSocketPool::Group::BoundRequest::operator=(
1927 BoundRequest&& other) = default;
1928
1929 TransportClientSocketPool::Group::BoundRequest::~BoundRequest() = default;
1930
1931 std::unique_ptr<TransportClientSocketPool::Request>
RemoveUnboundRequest(const RequestQueue::Pointer & pointer)1932 TransportClientSocketPool::Group::RemoveUnboundRequest(
1933 const RequestQueue::Pointer& pointer) {
1934 SanityCheck();
1935
1936 std::unique_ptr<Request> request = unbound_requests_.Erase(pointer);
1937 if (request->job()) {
1938 TryToAssignUnassignedJob(request->ReleaseJob());
1939 }
1940 // If there are no more unbound requests, kill the backup timer.
1941 if (unbound_requests_.empty())
1942 backup_job_timer_.Stop();
1943
1944 SanityCheck();
1945 return request;
1946 }
1947
1948 TransportClientSocketPool::RequestQueue::Pointer
FindUnboundRequestWithJob(const ConnectJob * job) const1949 TransportClientSocketPool::Group::FindUnboundRequestWithJob(
1950 const ConnectJob* job) const {
1951 SanityCheck();
1952
1953 for (RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1954 !pointer.is_null() && pointer.value()->job();
1955 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1956 if (pointer.value()->job() == job)
1957 return pointer;
1958 }
1959 // If a request with the job was not found, it must be in |unassigned_jobs_|.
1960 DCHECK(base::Contains(unassigned_jobs_, job));
1961 return RequestQueue::Pointer();
1962 }
1963
1964 TransportClientSocketPool::RequestQueue::Pointer
GetFirstRequestWithoutJob() const1965 TransportClientSocketPool::Group::GetFirstRequestWithoutJob() const {
1966 RequestQueue::Pointer pointer = unbound_requests_.FirstMax();
1967 size_t i = 0;
1968 for (; !pointer.is_null() && pointer.value()->job();
1969 pointer = unbound_requests_.GetNextTowardsLastMin(pointer)) {
1970 ++i;
1971 }
1972 DCHECK_EQ(i, jobs_.size() - unassigned_jobs_.size());
1973 DCHECK(pointer.is_null() || !pointer.value()->job());
1974 return pointer;
1975 }
1976
TryToAssignUnassignedJob(ConnectJob * job)1977 void TransportClientSocketPool::Group::TryToAssignUnassignedJob(
1978 ConnectJob* job) {
1979 unassigned_jobs_.push_back(job);
1980 RequestQueue::Pointer first_request_without_job = GetFirstRequestWithoutJob();
1981 if (!first_request_without_job.is_null()) {
1982 first_request_without_job.value()->AssignJob(unassigned_jobs_.back());
1983 unassigned_jobs_.pop_back();
1984 }
1985 }
1986
TryToAssignJobToRequest(TransportClientSocketPool::RequestQueue::Pointer request_pointer)1987 void TransportClientSocketPool::Group::TryToAssignJobToRequest(
1988 TransportClientSocketPool::RequestQueue::Pointer request_pointer) {
1989 DCHECK(!request_pointer.value()->job());
1990 if (!unassigned_jobs_.empty()) {
1991 request_pointer.value()->AssignJob(unassigned_jobs_.front());
1992 unassigned_jobs_.pop_front();
1993 return;
1994 }
1995
1996 // If the next request in the queue does not have a job, then there are no
1997 // requests with a job after |request_pointer| from which we can steal.
1998 RequestQueue::Pointer next_request =
1999 unbound_requests_.GetNextTowardsLastMin(request_pointer);
2000 if (next_request.is_null() || !next_request.value()->job())
2001 return;
2002
2003 // Walk down the queue to find the last request with a job.
2004 RequestQueue::Pointer cur = next_request;
2005 RequestQueue::Pointer next = unbound_requests_.GetNextTowardsLastMin(cur);
2006 while (!next.is_null() && next.value()->job()) {
2007 cur = next;
2008 next = unbound_requests_.GetNextTowardsLastMin(next);
2009 }
2010 // Steal the job from the last request with a job.
2011 TransferJobBetweenRequests(cur.value().get(), request_pointer.value().get());
2012 }
2013
TransferJobBetweenRequests(TransportClientSocketPool::Request * source,TransportClientSocketPool::Request * dest)2014 void TransportClientSocketPool::Group::TransferJobBetweenRequests(
2015 TransportClientSocketPool::Request* source,
2016 TransportClientSocketPool::Request* dest) {
2017 DCHECK(!dest->job());
2018 DCHECK(source->job());
2019 dest->AssignJob(source->ReleaseJob());
2020 }
2021
2022 } // namespace net
2023