• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/http/http_stream_pool.h"
6 
7 #include <algorithm>
8 #include <map>
9 #include <memory>
10 #include <ostream>
11 #include <set>
12 #include <string>
13 
14 #include "base/containers/flat_set.h"
15 #include "base/functional/bind.h"
16 #include "base/memory/weak_ptr.h"
17 #include "base/metrics/field_trial_params.h"
18 #include "base/notreached.h"
19 #include "base/task/sequenced_task_runner.h"
20 #include "net/base/completion_once_callback.h"
21 #include "net/base/features.h"
22 #include "net/base/host_port_pair.h"
23 #include "net/base/load_states.h"
24 #include "net/base/net_errors.h"
25 #include "net/base/network_anonymization_key.h"
26 #include "net/base/network_change_notifier.h"
27 #include "net/base/proxy_chain.h"
28 #include "net/base/request_priority.h"
29 #include "net/base/session_usage.h"
30 #include "net/http/alternative_service.h"
31 #include "net/http/http_network_session.h"
32 #include "net/http/http_stream_key.h"
33 #include "net/http/http_stream_pool_group.h"
34 #include "net/http/http_stream_pool_job_controller.h"
35 #include "net/http/http_stream_request.h"
36 #include "net/log/net_log_with_source.h"
37 #include "net/quic/quic_session_pool.h"
38 #include "net/socket/next_proto.h"
39 #include "net/socket/ssl_client_socket.h"
40 #include "net/spdy/spdy_session.h"
41 #include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
42 #include "url/gurl.h"
43 #include "url/scheme_host_port.h"
44 
45 namespace net {
46 
47 namespace {
48 
49 constexpr base::FeatureParam<size_t> kHttpStreamPoolMaxStreamPerPool{
50     &features::kHappyEyeballsV3,
51     HttpStreamPool::kMaxStreamSocketsPerPoolParamName.data(),
52     HttpStreamPool::kDefaultMaxStreamSocketsPerPool};
53 
54 constexpr base::FeatureParam<size_t> kHttpStreamPoolMaxStreamPerGroup{
55     &features::kHappyEyeballsV3,
56     HttpStreamPool::kMaxStreamSocketsPerGroupParamName.data(),
57     HttpStreamPool::kDefaultMaxStreamSocketsPerGroup};
58 
59 constexpr base::FeatureParam<bool> kEnableConsistencyCheck{
60     &features::kHappyEyeballsV3,
61     HttpStreamPool::kEnableConsistencyCheckParamName.data(), false};
62 
63 // Represents total stream counts in the pool. Only used for consistency check.
64 struct StreamCounts {
65   size_t handed_out = 0;
66   size_t idle = 0;
67   size_t connecting = 0;
68 
69   auto operator<=>(const StreamCounts&) const = default;
70 
ToValuenet::__anoneeea95c40111::StreamCounts71   base::Value::Dict ToValue() const {
72     base::Value::Dict dict;
73     dict.Set("handed_out", static_cast<int>(handed_out));
74     dict.Set("idle", static_cast<int>(idle));
75     dict.Set("connecting", static_cast<int>(connecting));
76     return dict;
77   }
78 };
79 
operator <<(std::ostream & os,const StreamCounts & counts)80 std::ostream& operator<<(std::ostream& os, const StreamCounts& counts) {
81   return os << "{ handed_out: " << counts.handed_out
82             << ", idle: " << counts.idle
83             << ", connecting: " << counts.connecting << " }";
84 }
85 
86 }  // namespace
87 
HttpStreamPool(HttpNetworkSession * http_network_session,bool cleanup_on_ip_address_change)88 HttpStreamPool::HttpStreamPool(HttpNetworkSession* http_network_session,
89                                bool cleanup_on_ip_address_change)
90     : http_network_session_(http_network_session),
91       stream_attempt_params_(
92           StreamAttemptParams::FromHttpNetworkSession(http_network_session_)),
93       cleanup_on_ip_address_change_(cleanup_on_ip_address_change),
94       net_log_(NetLogWithSource::Make(http_network_session_->net_log(),
95                                       NetLogSourceType::HTTP_STREAM_POOL)),
96       max_stream_sockets_per_pool_(kHttpStreamPoolMaxStreamPerPool.Get()),
97       // Ensure that the per-group limit is less than or equals to the per-pool
98       // limit.
99       max_stream_sockets_per_group_(
100           std::min(kHttpStreamPoolMaxStreamPerPool.Get(),
101                    kHttpStreamPoolMaxStreamPerGroup.Get())) {
102   CHECK(http_network_session_);
103   if (cleanup_on_ip_address_change) {
104     NetworkChangeNotifier::AddIPAddressObserver(this);
105   }
106 
107   http_network_session_->ssl_client_context()->AddObserver(this);
108 
109   if (kEnableConsistencyCheck.Get()) {
110     CheckConsistency();
111   }
112 }
113 
~HttpStreamPool()114 HttpStreamPool::~HttpStreamPool() {
115   http_network_session_->ssl_client_context()->RemoveObserver(this);
116 
117   if (cleanup_on_ip_address_change_) {
118     NetworkChangeNotifier::RemoveIPAddressObserver(this);
119   }
120 }
121 
OnShuttingDown()122 void HttpStreamPool::OnShuttingDown() {
123   is_shutting_down_ = true;
124 }
125 
RequestStream(HttpStreamRequest::Delegate * delegate,HttpStreamPoolRequestInfo request_info,RequestPriority priority,const std::vector<SSLConfig::CertAndStatus> & allowed_bad_certs,bool enable_ip_based_pooling,bool enable_alternative_services,const NetLogWithSource & net_log)126 std::unique_ptr<HttpStreamRequest> HttpStreamPool::RequestStream(
127     HttpStreamRequest::Delegate* delegate,
128     HttpStreamPoolRequestInfo request_info,
129     RequestPriority priority,
130     const std::vector<SSLConfig::CertAndStatus>& allowed_bad_certs,
131     bool enable_ip_based_pooling,
132     bool enable_alternative_services,
133     const NetLogWithSource& net_log) {
134   auto controller = std::make_unique<JobController>(
135       this, std::move(request_info), priority, allowed_bad_certs,
136       enable_ip_based_pooling, enable_alternative_services);
137   JobController* controller_raw_ptr = controller.get();
138   // Put `controller` into `job_controllers_` before calling RequestStream() to
139   // make sure `job_controllers_` always contains `controller` when
140   // OnJobControllerComplete() is called.
141   job_controllers_.emplace(std::move(controller));
142 
143   return controller_raw_ptr->RequestStream(delegate, net_log);
144 }
145 
Preconnect(HttpStreamPoolRequestInfo request_info,size_t num_streams,CompletionOnceCallback callback)146 int HttpStreamPool::Preconnect(HttpStreamPoolRequestInfo request_info,
147                                size_t num_streams,
148                                CompletionOnceCallback callback) {
149   std::vector<SSLConfig::CertAndStatus> allowed_bad_certs;
150   auto controller = std::make_unique<JobController>(
151       this, std::move(request_info), /*priority=*/RequestPriority::IDLE,
152       std::move(allowed_bad_certs),
153       /*enable_ip_based_pooling=*/true,
154       /*enable_alternative_services=*/true);
155   JobController* controller_raw_ptr = controller.get();
156   // SAFETY: Using base::Unretained() is safe because `this` will own
157   // `controller` when Preconnect() return ERR_IO_PENDING.
158   int rv = controller_raw_ptr->Preconnect(
159       num_streams, base::BindOnce(&HttpStreamPool::OnPreconnectComplete,
160                                   base::Unretained(this), controller_raw_ptr,
161                                   std::move(callback)));
162   if (rv == ERR_IO_PENDING) {
163     job_controllers_.emplace(std::move(controller));
164   }
165   return rv;
166 }
167 
IncrementTotalIdleStreamCount()168 void HttpStreamPool::IncrementTotalIdleStreamCount() {
169   CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
170   ++total_idle_stream_count_;
171 }
172 
DecrementTotalIdleStreamCount()173 void HttpStreamPool::DecrementTotalIdleStreamCount() {
174   CHECK_GT(total_idle_stream_count_, 0u);
175   --total_idle_stream_count_;
176 }
177 
IncrementTotalHandedOutStreamCount()178 void HttpStreamPool::IncrementTotalHandedOutStreamCount() {
179   CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
180   ++total_handed_out_stream_count_;
181 }
182 
DecrementTotalHandedOutStreamCount()183 void HttpStreamPool::DecrementTotalHandedOutStreamCount() {
184   CHECK_GT(total_handed_out_stream_count_, 0u);
185   --total_handed_out_stream_count_;
186 }
187 
IncrementTotalConnectingStreamCount()188 void HttpStreamPool::IncrementTotalConnectingStreamCount() {
189   CHECK_LT(TotalActiveStreamCount(), kDefaultMaxStreamSocketsPerPool);
190   ++total_connecting_stream_count_;
191 }
192 
DecrementTotalConnectingStreamCount(size_t amount)193 void HttpStreamPool::DecrementTotalConnectingStreamCount(size_t amount) {
194   CHECK_GE(total_connecting_stream_count_, amount);
195   total_connecting_stream_count_ -= amount;
196 }
197 
OnIPAddressChanged()198 void HttpStreamPool::OnIPAddressChanged() {
199   CHECK(cleanup_on_ip_address_change_);
200   for (const auto& group : groups_) {
201     group.second->FlushWithError(ERR_NETWORK_CHANGED,
202                                  StreamCloseReason::kIpAddressChanged,
203                                  kIpAddressChanged);
204   }
205 }
206 
OnSSLConfigChanged(SSLClientContext::SSLConfigChangeType change_type)207 void HttpStreamPool::OnSSLConfigChanged(
208     SSLClientContext::SSLConfigChangeType change_type) {
209   for (const auto& group : groups_) {
210     group.second->Refresh(kSslConfigChanged,
211                           StreamCloseReason::kSslConfigChanged);
212   }
213   ProcessPendingRequestsInGroups();
214 }
215 
OnSSLConfigForServersChanged(const base::flat_set<HostPortPair> & servers)216 void HttpStreamPool::OnSSLConfigForServersChanged(
217     const base::flat_set<HostPortPair>& servers) {
218   for (const auto& group : groups_) {
219     if (GURL::SchemeIsCryptographic(group.first.destination().scheme()) &&
220         servers.contains(
221             HostPortPair::FromSchemeHostPort(group.first.destination()))) {
222       group.second->Refresh(kSslConfigChanged,
223                             StreamCloseReason::kSslConfigChanged);
224     }
225   }
226   ProcessPendingRequestsInGroups();
227 }
228 
OnGroupComplete(Group * group)229 void HttpStreamPool::OnGroupComplete(Group* group) {
230   auto it = groups_.find(group->stream_key());
231   CHECK(it != groups_.end());
232   groups_.erase(it);
233 }
234 
OnJobControllerComplete(JobController * job_controller)235 void HttpStreamPool::OnJobControllerComplete(JobController* job_controller) {
236   auto it = job_controllers_.find(job_controller);
237   CHECK(it != job_controllers_.end());
238   job_controllers_.erase(it);
239 }
240 
FlushWithError(int error,StreamCloseReason attempt_cancel_reason,std::string_view net_log_close_reason_utf8)241 void HttpStreamPool::FlushWithError(
242     int error,
243     StreamCloseReason attempt_cancel_reason,
244     std::string_view net_log_close_reason_utf8) {
245   for (auto& group : groups_) {
246     group.second->FlushWithError(error, attempt_cancel_reason,
247                                  net_log_close_reason_utf8);
248   }
249 }
250 
CloseIdleStreams(std::string_view net_log_close_reason_utf8)251 void HttpStreamPool::CloseIdleStreams(
252     std::string_view net_log_close_reason_utf8) {
253   for (auto& group : groups_) {
254     group.second->CloseIdleStreams(net_log_close_reason_utf8);
255   }
256 }
257 
IsPoolStalled()258 bool HttpStreamPool::IsPoolStalled() {
259   if (!ReachedMaxStreamLimit()) {
260     return false;
261   }
262   return FindHighestStalledGroup() != nullptr;
263 }
264 
ProcessPendingRequestsInGroups()265 void HttpStreamPool::ProcessPendingRequestsInGroups() {
266   if (is_shutting_down_) {
267     return;
268   }
269 
270   // Loop until there is nothing more to do.
271   while (true) {
272     Group* group = FindHighestStalledGroup();
273     if (!group) {
274       return;
275     }
276 
277     if (ReachedMaxStreamLimit()) {
278       if (!CloseOneIdleStreamSocket()) {
279         return;
280       }
281     }
282 
283     group->ProcessPendingRequest();
284   }
285 }
286 
RequiresHTTP11(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key)287 bool HttpStreamPool::RequiresHTTP11(
288     const url::SchemeHostPort& destination,
289     const NetworkAnonymizationKey& network_anonymization_key) {
290   return http_network_session()->http_server_properties()->RequiresHTTP11(
291       destination, network_anonymization_key);
292 }
293 
IsQuicBroken(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key)294 bool HttpStreamPool::IsQuicBroken(
295     const url::SchemeHostPort& destination,
296     const NetworkAnonymizationKey& network_anonymization_key) {
297   return http_network_session()
298       ->http_server_properties()
299       ->IsAlternativeServiceBroken(
300           AlternativeService(NextProto::kProtoQUIC,
301                              HostPortPair::FromSchemeHostPort(destination)),
302           network_anonymization_key);
303 }
304 
CanUseQuic(const url::SchemeHostPort & destination,const NetworkAnonymizationKey & network_anonymization_key,bool enable_ip_based_pooling,bool enable_alternative_services)305 bool HttpStreamPool::CanUseQuic(
306     const url::SchemeHostPort& destination,
307     const NetworkAnonymizationKey& network_anonymization_key,
308     bool enable_ip_based_pooling,
309     bool enable_alternative_services) {
310   if (http_network_session()->ShouldForceQuic(destination, ProxyInfo::Direct(),
311                                               /*is_websocket=*/false)) {
312     return true;
313   }
314   return http_network_session()->IsQuicEnabled() && enable_ip_based_pooling &&
315          enable_alternative_services &&
316          GURL::SchemeIsCryptographic(destination.scheme()) &&
317          !RequiresHTTP11(destination, network_anonymization_key) &&
318          !IsQuicBroken(destination, network_anonymization_key);
319 }
320 
SelectQuicVersion(const AlternativeServiceInfo & alternative_service_info)321 quic::ParsedQuicVersion HttpStreamPool::SelectQuicVersion(
322     const AlternativeServiceInfo& alternative_service_info) {
323   if (alternative_service_info.protocol() != NextProto::kProtoQUIC) {
324     return quic::ParsedQuicVersion::Unsupported();
325   }
326   return http_network_session()->context().quic_context->SelectQuicVersion(
327       alternative_service_info.advertised_versions());
328 }
329 
CanUseExistingQuicSession(const QuicSessionAliasKey & quic_session_alias_key,bool enable_ip_based_pooling,bool enable_alternative_services)330 bool HttpStreamPool::CanUseExistingQuicSession(
331     const QuicSessionAliasKey& quic_session_alias_key,
332     bool enable_ip_based_pooling,
333     bool enable_alternative_services) {
334   const url::SchemeHostPort& destination = quic_session_alias_key.destination();
335   return destination.IsValid() &&
336          CanUseQuic(
337              destination,
338              quic_session_alias_key.session_key().network_anonymization_key(),
339              enable_ip_based_pooling, enable_alternative_services) &&
340          http_network_session()->quic_session_pool()->CanUseExistingSession(
341              quic_session_alias_key.session_key(), destination);
342 }
343 
SetDelegateForTesting(std::unique_ptr<TestDelegate> delegate)344 void HttpStreamPool::SetDelegateForTesting(
345     std::unique_ptr<TestDelegate> delegate) {
346   delegate_for_testing_ = std::move(delegate);
347 }
348 
GetInfoAsValue() const349 base::Value::Dict HttpStreamPool::GetInfoAsValue() const {
350   // Using "socket" instead of "stream" for compatibility with ClientSocketPool.
351   base::Value::Dict dict;
352   dict.Set("handed_out_socket_count",
353            static_cast<int>(total_handed_out_stream_count_));
354   dict.Set("connecting_socket_count",
355            static_cast<int>(total_connecting_stream_count_));
356   dict.Set("idle_socket_count", static_cast<int>(total_idle_stream_count_));
357   dict.Set("max_socket_count", static_cast<int>(max_stream_sockets_per_pool_));
358   dict.Set("max_sockets_per_group",
359            static_cast<int>(max_stream_sockets_per_group_));
360 
361   base::Value::Dict group_dicts;
362   for (const auto& [key, group] : groups_) {
363     group_dicts.Set(key.ToString(), group->GetInfoAsValue());
364   }
365 
366   if (!group_dicts.empty()) {
367     dict.Set("groups", std::move(group_dicts));
368   }
369   return dict;
370 }
371 
GetOrCreateGroupForTesting(const HttpStreamKey & stream_key)372 HttpStreamPool::Group& HttpStreamPool::GetOrCreateGroupForTesting(
373     const HttpStreamKey& stream_key) {
374   return GetOrCreateGroup(stream_key);
375 }
376 
GetGroupForTesting(const HttpStreamKey & stream_key)377 HttpStreamPool::Group* HttpStreamPool::GetGroupForTesting(
378     const HttpStreamKey& stream_key) {
379   return GetGroup(stream_key);
380 }
381 
GetOrCreateGroup(const HttpStreamKey & stream_key,std::optional<QuicSessionAliasKey> quic_session_alias_key)382 HttpStreamPool::Group& HttpStreamPool::GetOrCreateGroup(
383     const HttpStreamKey& stream_key,
384     std::optional<QuicSessionAliasKey> quic_session_alias_key) {
385   auto it = groups_.find(stream_key);
386   if (it == groups_.end()) {
387     it = groups_.try_emplace(
388         it, stream_key,
389         std::make_unique<Group>(this, stream_key, quic_session_alias_key));
390   }
391   return *it->second;
392 }
393 
GetGroup(const HttpStreamKey & stream_key)394 HttpStreamPool::Group* HttpStreamPool::GetGroup(
395     const HttpStreamKey& stream_key) {
396   auto it = groups_.find(stream_key);
397   return it == groups_.end() ? nullptr : it->second.get();
398 }
399 
FindHighestStalledGroup()400 HttpStreamPool::Group* HttpStreamPool::FindHighestStalledGroup() {
401   Group* highest_stalled_group = nullptr;
402   std::optional<RequestPriority> highest_priority;
403 
404   for (const auto& group : groups_) {
405     std::optional<RequestPriority> priority =
406         group.second->GetPriorityIfStalledByPoolLimit();
407     if (!priority) {
408       continue;
409     }
410     if (!highest_priority || *priority > *highest_priority) {
411       highest_priority = priority;
412       highest_stalled_group = group.second.get();
413     }
414   }
415 
416   return highest_stalled_group;
417 }
418 
CloseOneIdleStreamSocket()419 bool HttpStreamPool::CloseOneIdleStreamSocket() {
420   if (total_idle_stream_count_ == 0) {
421     return false;
422   }
423 
424   for (auto& group : groups_) {
425     if (group.second->CloseOneIdleStreamSocket()) {
426       return true;
427     }
428   }
429   NOTREACHED();
430 }
431 
FindAvailableSpdySession(const HttpStreamKey & stream_key,const SpdySessionKey & spdy_session_key,bool enable_ip_based_pooling,const NetLogWithSource & net_log)432 base::WeakPtr<SpdySession> HttpStreamPool::FindAvailableSpdySession(
433     const HttpStreamKey& stream_key,
434     const SpdySessionKey& spdy_session_key,
435     bool enable_ip_based_pooling,
436     const NetLogWithSource& net_log) {
437   if (!GURL::SchemeIsCryptographic(stream_key.destination().scheme())) {
438     return nullptr;
439   }
440 
441   base::WeakPtr<SpdySession> spdy_session =
442       http_network_session()->spdy_session_pool()->FindAvailableSession(
443           spdy_session_key, enable_ip_based_pooling, /*is_websocket=*/false,
444           net_log);
445   if (spdy_session) {
446     if (RequiresHTTP11(stream_key.destination(),
447                        stream_key.network_anonymization_key())) {
448       spdy_session->MakeUnavailable();
449       Group* group = GetGroup(stream_key);
450       if (group) {
451         group->OnRequiredHttp11();
452       }
453       return nullptr;
454     }
455   }
456   return spdy_session;
457 }
458 
OnPreconnectComplete(JobController * job_controller,CompletionOnceCallback callback,int rv)459 void HttpStreamPool::OnPreconnectComplete(JobController* job_controller,
460                                           CompletionOnceCallback callback,
461                                           int rv) {
462   OnJobControllerComplete(job_controller);
463   std::move(callback).Run(rv);
464 }
465 
CheckConsistency()466 void HttpStreamPool::CheckConsistency() {
467   CHECK(kEnableConsistencyCheck.Get());
468 
469   const StreamCounts pool_total_counts = {
470       .handed_out = total_handed_out_stream_count_,
471       .idle = total_idle_stream_count_,
472       .connecting = total_connecting_stream_count_};
473 
474   if (groups_.empty()) {
475     VLOG_IF(1, pool_total_counts == StreamCounts())
476         << "Total stream counts are not zero: " << pool_total_counts;
477   } else {
478     StreamCounts groups_total_counts;
479     base::Value::Dict groups;
480     for (const auto& [key, group] : groups_) {
481       groups_total_counts.handed_out += group->HandedOutStreamSocketCount();
482       groups_total_counts.idle += group->IdleStreamSocketCount();
483       groups_total_counts.connecting += group->ConnectingStreamSocketCount();
484       groups.Set(key.ToString(), group->GetInfoAsValue());
485     }
486 
487     const bool ok = pool_total_counts == groups_total_counts;
488     NetLogEventType event_type =
489         ok ? NetLogEventType::HTTP_STREAM_POOL_CONSISTENCY_CHECK_OK
490            : NetLogEventType::HTTP_STREAM_POOL_CONSISTENCY_CHECK_FAIL;
491     net_log_.AddEvent(event_type, [&] {
492       base::Value::Dict dict;
493       dict.Set("pool_total_counts", pool_total_counts.ToValue());
494       dict.Set("groups_total_counts", groups_total_counts.ToValue());
495       dict.Set("groups", std::move(groups));
496       return dict;
497     });
498     VLOG_IF(1, !ok) << "Stream counts mismatch: pool=" << pool_total_counts
499                     << ", groups=" << groups_total_counts;
500   }
501 
502   base::SequencedTaskRunner::GetCurrentDefault()->PostDelayedTask(
503       FROM_HERE,
504       base::BindOnce(&HttpStreamPool::CheckConsistency,
505                      weak_ptr_factory_.GetWeakPtr()),
506       base::Seconds(3));
507 }
508 
509 }  // namespace net
510