1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_stream_pool.h"
6
7 #include <algorithm>
8 #include <map>
9 #include <memory>
10 #include <ostream>
11 #include <set>
12 #include <string>
13
14 #include "base/containers/flat_set.h"
15 #include "base/functional/bind.h"
16 #include "base/memory/weak_ptr.h"
17 #include "base/metrics/field_trial_params.h"
18 #include "base/notreached.h"
19 #include "base/task/sequenced_task_runner.h"
20 #include "net/base/completion_once_callback.h"
21 #include "net/base/features.h"
22 #include "net/base/host_port_pair.h"
23 #include "net/base/load_states.h"
24 #include "net/base/net_errors.h"
25 #include "net/base/network_anonymization_key.h"
26 #include "net/base/network_change_notifier.h"
27 #include "net/base/proxy_chain.h"
28 #include "net/base/request_priority.h"
29 #include "net/base/session_usage.h"
30 #include "net/http/alternative_service.h"
31 #include "net/http/http_network_session.h"
32 #include "net/http/http_stream_key.h"
33 #include "net/http/http_stream_pool_group.h"
34 #include "net/http/http_stream_pool_job_controller.h"
35 #include "net/http/http_stream_request.h"
36 #include "net/log/net_log_with_source.h"
37 #include "net/quic/quic_session_pool.h"
38 #include "net/socket/next_proto.h"
39 #include "net/socket/ssl_client_socket.h"
40 #include "net/spdy/spdy_session.h"
41 #include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
42 #include "url/gurl.h"
43 #include "url/scheme_host_port.h"
44
45 namespace net {
46
47 namespace {
48
49 constexpr base::FeatureParam<size_t> kHttpStreamPoolMaxStreamPerPool{
50 &features::kHappyEyeballsV3,
51 HttpStreamPool::kMaxStreamSocketsPerPoolParamName.data(),
52 HttpStreamPool::kDefaultMaxStreamSocketsPerPool};
53
54 constexpr base::FeatureParam<size_t> kHttpStreamPoolMaxStreamPerGroup{
55 &features::kHappyEyeballsV3,
56 HttpStreamPool::kMaxStreamSocketsPerGroupParamName.data(),
57 HttpStreamPool::kDefaultMaxStreamSocketsPerGroup};
58
59 constexpr base::FeatureParam<bool> kEnableConsistencyCheck{
60 &features::kHappyEyeballsV3,
61 HttpStreamPool::kEnableConsistencyCheckParamName.data(), false};
62
63 // Represents total stream counts in the pool. Only used for consistency check.
64 struct StreamCounts {
65 size_t handed_out = 0;
66 size_t idle = 0;
67 size_t connecting = 0;
68
69 auto operator<=>(const StreamCounts&) const = default;
70
ToValuenet::__anoneeea95c40111::StreamCounts71 base::Value::Dict ToValue() const {
72 base::Value::Dict dict;
73 dict.Set("handed_out", static_cast<int>(handed_out));
74 dict.Set("idle", static_cast<int>(idle));
75 dict.Set("connecting", static_cast<int>(connecting));
76 return dict;
77 }
78 };
79
operator <<(std::ostream & os,const StreamCounts & counts)80 std::ostream& operator<<(std::ostream& os, const StreamCounts& counts) {
81 return os << "{ handed_out: " << counts.handed_out
82 << ", idle: " << counts.idle
83 << ", connecting: " << counts.connecting << " }";
84 }
85
86 } // namespace
87
HttpStreamPool(HttpNetworkSession * http_network_session,bool cleanup_on_ip_address_change)88 HttpStreamPool::HttpStreamPool(HttpNetworkSession* http_network_session,
89 bool cleanup_on_ip_address_change)
90 : http_network_session_(http_network_session),
91 stream_attempt_params_(
92 StreamAttemptParams::FromHttpNetworkSession(http_network_session_)),
93 cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
94 net_log_(NetLogWithSource::Make(http_network_session_->net_log(),
95 NetLogSourceType::HTTP_STREAM_POOL)),
96 max_stream_sockets_per_pool_(kHttpStreamPoolMaxStreamPerPool.Get()),
97 // Ensure that the per-group limit is less than or equals to the per-pool
98 // limit.
99 max_stream_sockets_per_group_(
100 std::min(kHttpStreamPoolMaxStreamPerPool.Get(),
101 kHttpStreamPoolMaxStreamPerGroup.Get())) {
102 CHECK(http_network_session_);
103 if (cleanup_on_ip_address_change) {
104 NetworkChangeNotifier::AddIPAddressObserver(this);
105 }
106
107 http_network_session_->ssl_client_context()->AddObserver(this);
108
109 if (kEnableConsistencyCheck.Get()) {
110 CheckConsistency();
111 }
112 }
113
~HttpStreamPool()114 HttpStreamPool::~HttpStreamPool() {
115 http_network_session_->ssl_client_context()->RemoveObserver(this);
116
117 if (cleanup_on_ip_address_change_) {
118 NetworkChangeNotifier::RemoveIPAddressObserver(this);
119 }
120 }
121
OnShuttingDown()122 void HttpStreamPool::OnShuttingDown() {
123 is_shutting_down_ = true;
124 }
125
RequestStream(HttpStreamRequest::Delegate * delegate,HttpStreamPoolRequestInfo request_info,RequestPriority priority,const std::vector<SSLConfig::CertAndStatus> & allowed_bad_certs,bool enable_ip_based_pooling,bool enable_alternative_services,const NetLogWithSource & net_log)126 std::unique_ptr<HttpStreamRequest> HttpStreamPool::RequestStream(
127 HttpStreamRequest::Delegate* delegate,
128 HttpStreamPoolRequestInfo request_info,
129 RequestPriority priority,
130 const std::vector<SSLConfig::CertAndStatus>& allowed_bad_certs,
131 bool enable_ip_based_pooling,
132 bool enable_alternative_services,
133 const NetLogWithSource& net_log) {
134 auto controller = std::make_unique<JobController>(
135 this, std::move(request_info), priority, allowed_bad_certs,
136 enable_ip_based_pooling, enable_alternative_services);
137 JobController* controller_raw_ptr = controller.get();
138 // Put `controller` into `job_controllers_` before calling RequestStream() to
139 // make sure `job_controllers_` always contains `controller` when
140 // OnJobControllerComplete() is called.
141 job_controllers_.emplace(std::move(controller));
142
143 return controller_raw_ptr->RequestStream(delegate, net_log);
144 }
145
Preconnect(HttpStreamPoolRequestInfo request_info,size_t num_streams,CompletionOnceCallback callback)146 int HttpStreamPool::Preconnect(HttpStreamPoolRequestInfo request_info,
147 size_t num_streams,
148 CompletionOnceCallback callback) {
149 std::vector<SSLConfig::CertAndStatus> allowed_bad_certs;
150 auto controller = std::make_unique<JobController>(
151 this, std::move(request_info), /*priority=*/RequestPriority::IDLE,
152 std::move(allowed_bad_certs),
153 /*enable_ip_based_pooling=*/true,
154 /*enable_alternative_services=*/true);
155 JobController* controller_raw_ptr = controller.get();
156 // SAFETY: Using base::Unretained() is safe because `this` will own
157 // `controller` when Preconnect() return ERR_IO_PENDING.
158 int rv = controller_raw_ptr->Preconnect(
159 num_streams, base::BindOnce(&HttpStreamPool::OnPreconnectComplete,
160 base::Unretained(this), controller_raw_ptr,
161 std::move(callback)));
162 if (rv == ERR_IO_PENDING) {
163 job_controllers_.emplace(std::move(controller));
164 }
165 return rv;
166 }
167
IncrementTotalIdleStreamCount()168 void HttpStreamPool::IncrementTotalIdleStreamCount() {
169 CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
170 ++total_idle_stream_count_;
171 }
172
DecrementTotalIdleStreamCount()173 void HttpStreamPool::DecrementTotalIdleStreamCount() {
174 CHECK_GT(total_idle_stream_count_, 0u);
175 --total_idle_stream_count_;
176 }
177
IncrementTotalHandedOutStreamCount()178 void HttpStreamPool::IncrementTotalHandedOutStreamCount() {
179 CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
180 ++total_handed_out_stream_count_;
181 }
182
DecrementTotalHandedOutStreamCount()183 void HttpStreamPool::DecrementTotalHandedOutStreamCount() {
184 CHECK_GT(total_handed_out_stream_count_, 0u);
185 --total_handed_out_stream_count_;
186 }
187
IncrementTotalConnectingStreamCount()188 void HttpStreamPool::IncrementTotalConnectingStreamCount() {
189 CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
190 ++total_connecting_stream_count_;
191 }
192
DecrementTotalConnectingStreamCount(size_t amount)193 void HttpStreamPool::DecrementTotalConnectingStreamCount(size_t amount) {
194 CHECK_GE(total_connecting_stream_count_, amount);
195 total_connecting_stream_count_ -= amount;
196 }
197
OnIPAddressChanged()198 void HttpStreamPool::OnIPAddressChanged() {
199 CHECK(cleanup_on_ip_address_change_);
200 for (const auto& group : groups_) {
201 group.second->FlushWithError(ERR_NETWORK_CHANGED,
202 StreamCloseReason::kIpAddressChanged,
203 kIpAddressChanged);
204 }
205 }
206
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)207 void HttpStreamPool::OnSSLConfigChanged(
208 SSLClientContext::SSLConfigChangeType change_type) {
209 for (const auto& group : groups_) {
210 group.second->Refresh(kSslConfigChanged,
211 StreamCloseReason::kSslConfigChanged);
212 }
213 ProcessPendingRequestsInGroups();
214 }
215
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)216 void HttpStreamPool::OnSSLConfigForServersChanged(
217 const base::flat_set<HostPortPair>& servers) {
218 for (const auto& group : groups_) {
219 if (GURL::SchemeIsCryptographic(group.first.destination().scheme()) &&
220 servers.contains(
221 HostPortPair::FromSchemeHostPort(group.first.destination()))) {
222 group.second->Refresh(kSslConfigChanged,
223 StreamCloseReason::kSslConfigChanged);
224 }
225 }
226 ProcessPendingRequestsInGroups();
227 }
228
OnGroupComplete(Group * group)229 void HttpStreamPool::OnGroupComplete(Group* group) {
230 auto it = groups_.find(group->stream_key());
231 CHECK(it != groups_.end());
232 groups_.erase(it);
233 }
234
OnJobControllerComplete(JobController * job_controller)235 void HttpStreamPool::OnJobControllerComplete(JobController* job_controller) {
236 auto it = job_controllers_.find(job_controller);
237 CHECK(it != job_controllers_.end());
238 job_controllers_.erase(it);
239 }
240
FlushWithError(int error,StreamCloseReason attempt_cancel_reason,std::string_view net_log_close_reason_utf8)241 void HttpStreamPool::FlushWithError(
242 int error,
243 StreamCloseReason attempt_cancel_reason,
244 std::string_view net_log_close_reason_utf8) {
245 for (auto& group : groups_) {
246 group.second->FlushWithError(error, attempt_cancel_reason,
247 net_log_close_reason_utf8);
248 }
249 }
250
CloseIdleStreams(std::string_view net_log_close_reason_utf8)251 void HttpStreamPool::CloseIdleStreams(
252 std::string_view net_log_close_reason_utf8) {
253 for (auto& group : groups_) {
254 group.second->CloseIdleStreams(net_log_close_reason_utf8);
255 }
256 }
257
IsPoolStalled()258 bool HttpStreamPool::IsPoolStalled() {
259 if (!ReachedMaxStreamLimit()) {
260 return false;
261 }
262 return FindHighestStalledGroup() != nullptr;
263 }
264
ProcessPendingRequestsInGroups()265 void HttpStreamPool::ProcessPendingRequestsInGroups() {
266 if (is_shutting_down_) {
267 return;
268 }
269
270 // Loop until there is nothing more to do.
271 while (true) {
272 Group* group = FindHighestStalledGroup();
273 if (!group) {
274 return;
275 }
276
277 if (ReachedMaxStreamLimit()) {
278 if (!CloseOneIdleStreamSocket()) {
279 return;
280 }
281 }
282
283 group->ProcessPendingRequest();
284 }
285 }
286
RequiresHTTP11(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key)287 bool HttpStreamPool::RequiresHTTP11(
288 const url::SchemeHostPort& destination,
289 const NetworkAnonymizationKey& network_anonymization_key) {
290 return http_network_session()->http_server_properties()->RequiresHTTP11(
291 destination, network_anonymization_key);
292 }
293
IsQuicBroken(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key)294 bool HttpStreamPool::IsQuicBroken(
295 const url::SchemeHostPort& destination,
296 const NetworkAnonymizationKey& network_anonymization_key) {
297 return http_network_session()
298 ->http_server_properties()
299 ->IsAlternativeServiceBroken(
300 AlternativeService(NextProto::kProtoQUIC,
301 HostPortPair::FromSchemeHostPort(destination)),
302 network_anonymization_key);
303 }
304
CanUseQuic(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key,bool enable_ip_based_pooling,bool enable_alternative_services)305 bool HttpStreamPool::CanUseQuic(
306 const url::SchemeHostPort& destination,
307 const NetworkAnonymizationKey& network_anonymization_key,
308 bool enable_ip_based_pooling,
309 bool enable_alternative_services) {
310 if (http_network_session()->ShouldForceQuic(destination, ProxyInfo::Direct(),
311 /*is_websocket=*/false)) {
312 return true;
313 }
314 return http_network_session()->IsQuicEnabled() && enable_ip_based_pooling &&
315 enable_alternative_services &&
316 GURL::SchemeIsCryptographic(destination.scheme()) &&
317 !RequiresHTTP11(destination, network_anonymization_key) &&
318 !IsQuicBroken(destination, network_anonymization_key);
319 }
320
SelectQuicVersion(const AlternativeServiceInfo & alternative_service_info)321 quic::ParsedQuicVersion HttpStreamPool::SelectQuicVersion(
322 const AlternativeServiceInfo& alternative_service_info) {
323 if (alternative_service_info.protocol() != NextProto::kProtoQUIC) {
324 return quic::ParsedQuicVersion::Unsupported();
325 }
326 return http_network_session()->context().quic_context->SelectQuicVersion(
327 alternative_service_info.advertised_versions());
328 }
329
CanUseExistingQuicSession(const QuicSessionAliasKey & quic_session_alias_key,bool enable_ip_based_pooling,bool enable_alternative_services)330 bool HttpStreamPool::CanUseExistingQuicSession(
331 const QuicSessionAliasKey& quic_session_alias_key,
332 bool enable_ip_based_pooling,
333 bool enable_alternative_services) {
334 const url::SchemeHostPort& destination = quic_session_alias_key.destination();
335 return destination.IsValid() &&
336 CanUseQuic(
337 destination,
338 quic_session_alias_key.session_key().network_anonymization_key(),
339 enable_ip_based_pooling, enable_alternative_services) &&
340 http_network_session()->quic_session_pool()->CanUseExistingSession(
341 quic_session_alias_key.session_key(), destination);
342 }
343
SetDelegateForTesting(std::unique_ptr<TestDelegate> delegate)344 void HttpStreamPool::SetDelegateForTesting(
345 std::unique_ptr<TestDelegate> delegate) {
346 delegate_for_testing_ = std::move(delegate);
347 }
348
GetInfoAsValue() const349 base::Value::Dict HttpStreamPool::GetInfoAsValue() const {
350 // Using "socket" instead of "stream" for compatibility with ClientSocketPool.
351 base::Value::Dict dict;
352 dict.Set("handed_out_socket_count",
353 static_cast<int>(total_handed_out_stream_count_));
354 dict.Set("connecting_socket_count",
355 static_cast<int>(total_connecting_stream_count_));
356 dict.Set("idle_socket_count", static_cast<int>(total_idle_stream_count_));
357 dict.Set("max_socket_count", static_cast<int>(max_stream_sockets_per_pool_));
358 dict.Set("max_sockets_per_group",
359 static_cast<int>(max_stream_sockets_per_group_));
360
361 base::Value::Dict group_dicts;
362 for (const auto& [key, group] : groups_) {
363 group_dicts.Set(key.ToString(), group->GetInfoAsValue());
364 }
365
366 if (!group_dicts.empty()) {
367 dict.Set("groups", std::move(group_dicts));
368 }
369 return dict;
370 }
371
GetOrCreateGroupForTesting(const HttpStreamKey & stream_key)372 HttpStreamPool::Group& HttpStreamPool::GetOrCreateGroupForTesting(
373 const HttpStreamKey& stream_key) {
374 return GetOrCreateGroup(stream_key);
375 }
376
GetGroupForTesting(const HttpStreamKey & stream_key)377 HttpStreamPool::Group* HttpStreamPool::GetGroupForTesting(
378 const HttpStreamKey& stream_key) {
379 return GetGroup(stream_key);
380 }
381
GetOrCreateGroup(const HttpStreamKey & stream_key,std::optional<QuicSessionAliasKey> quic_session_alias_key)382 HttpStreamPool::Group& HttpStreamPool::GetOrCreateGroup(
383 const HttpStreamKey& stream_key,
384 std::optional<QuicSessionAliasKey> quic_session_alias_key) {
385 auto it = groups_.find(stream_key);
386 if (it == groups_.end()) {
387 it = groups_.try_emplace(
388 it, stream_key,
389 std::make_unique<Group>(this, stream_key, quic_session_alias_key));
390 }
391 return *it->second;
392 }
393
GetGroup(const HttpStreamKey & stream_key)394 HttpStreamPool::Group* HttpStreamPool::GetGroup(
395 const HttpStreamKey& stream_key) {
396 auto it = groups_.find(stream_key);
397 return it == groups_.end() ? nullptr : it->second.get();
398 }
399
FindHighestStalledGroup()400 HttpStreamPool::Group* HttpStreamPool::FindHighestStalledGroup() {
401 Group* highest_stalled_group = nullptr;
402 std::optional<RequestPriority> highest_priority;
403
404 for (const auto& group : groups_) {
405 std::optional<RequestPriority> priority =
406 group.second->GetPriorityIfStalledByPoolLimit();
407 if (!priority) {
408 continue;
409 }
410 if (!highest_priority || *priority > *highest_priority) {
411 highest_priority = priority;
412 highest_stalled_group = group.second.get();
413 }
414 }
415
416 return highest_stalled_group;
417 }
418
CloseOneIdleStreamSocket()419 bool HttpStreamPool::CloseOneIdleStreamSocket() {
420 if (total_idle_stream_count_ == 0) {
421 return false;
422 }
423
424 for (auto& group : groups_) {
425 if (group.second->CloseOneIdleStreamSocket()) {
426 return true;
427 }
428 }
429 NOTREACHED();
430 }
431
FindAvailableSpdySession(const HttpStreamKey & stream_key,const SpdySessionKey & spdy_session_key,bool enable_ip_based_pooling,const NetLogWithSource & net_log)432 base::WeakPtr<SpdySession> HttpStreamPool::FindAvailableSpdySession(
433 const HttpStreamKey& stream_key,
434 const SpdySessionKey& spdy_session_key,
435 bool enable_ip_based_pooling,
436 const NetLogWithSource& net_log) {
437 if (!GURL::SchemeIsCryptographic(stream_key.destination().scheme())) {
438 return nullptr;
439 }
440
441 base::WeakPtr<SpdySession> spdy_session =
442 http_network_session()->spdy_session_pool()->FindAvailableSession(
443 spdy_session_key, enable_ip_based_pooling, /*is_websocket=*/false,
444 net_log);
445 if (spdy_session) {
446 if (RequiresHTTP11(stream_key.destination(),
447 stream_key.network_anonymization_key())) {
448 spdy_session->MakeUnavailable();
449 Group* group = GetGroup(stream_key);
450 if (group) {
451 group->OnRequiredHttp11();
452 }
453 return nullptr;
454 }
455 }
456 return spdy_session;
457 }
458
OnPreconnectComplete(JobController * job_controller,CompletionOnceCallback callback,int rv)459 void HttpStreamPool::OnPreconnectComplete(JobController* job_controller,
460 CompletionOnceCallback callback,
461 int rv) {
462 OnJobControllerComplete(job_controller);
463 std::move(callback).Run(rv);
464 }
465
CheckConsistency()466 void HttpStreamPool::CheckConsistency() {
467 CHECK(kEnableConsistencyCheck.Get());
468
469 const StreamCounts pool_total_counts = {
470 .handed_out = total_handed_out_stream_count_,
471 .idle = total_idle_stream_count_,
472 .connecting = total_connecting_stream_count_};
473
474 if (groups_.empty()) {
475 VLOG_IF(1, pool_total_counts == StreamCounts())
476 << "Total stream counts are not zero: " << pool_total_counts;
477 } else {
478 StreamCounts groups_total_counts;
479 base::Value::Dict groups;
480 for (const auto& [key, group] : groups_) {
481 groups_total_counts.handed_out += group->HandedOutStreamSocketCount();
482 groups_total_counts.idle += group->IdleStreamSocketCount();
483 groups_total_counts.connecting += group->ConnectingStreamSocketCount();
484 groups.Set(key.ToString(), group->GetInfoAsValue());
485 }
486
487 const bool ok = pool_total_counts == groups_total_counts;
488 NetLogEventType event_type =
489 ok ? NetLogEventType::HTTP_STREAM_POOL_CONSISTENCY_CHECK_OK
490 : NetLogEventType::HTTP_STREAM_POOL_CONSISTENCY_CHECK_FAIL;
491 net_log_.AddEvent(event_type, [&] {
492 base::Value::Dict dict;
493 dict.Set("pool_total_counts", pool_total_counts.ToValue());
494 dict.Set("groups_total_counts", groups_total_counts.ToValue());
495 dict.Set("groups", std::move(groups));
496 return dict;
497 });
498 VLOG_IF(1, !ok) << "Stream counts mismatch: pool=" << pool_total_counts
499 << ", groups=" << groups_total_counts;
500 }
501
502 base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
503 FROM_HERE,
504 base::BindOnce(&HttpStreamPool::CheckConsistency,
505 weak_ptr_factory_.GetWeakPtr()),
506 base::Seconds(3));
507 }
508
509 } // namespace net
510