1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_stream_pool_job_controller.h"
6
7 #include <memory>
8 #include <optional>
9 #include <vector>
10
11 #include "base/memory/raw_ptr.h"
12 #include "base/notreached.h"
13 #include "base/task/sequenced_task_runner.h"
14 #include "net/base/load_flags.h"
15 #include "net/base/load_states.h"
16 #include "net/base/net_error_details.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/port_util.h"
19 #include "net/base/request_priority.h"
20 #include "net/dns/public/resolve_error_info.h"
21 #include "net/http/alternative_service.h"
22 #include "net/http/http_network_session.h"
23 #include "net/http/http_stream_key.h"
24 #include "net/http/http_stream_pool.h"
25 #include "net/http/http_stream_pool_group.h"
26 #include "net/http/http_stream_pool_job.h"
27 #include "net/http/http_stream_pool_request_info.h"
28 #include "net/http/http_stream_request.h"
29 #include "net/quic/quic_chromium_client_session.h"
30 #include "net/quic/quic_http_stream.h"
31 #include "net/socket/next_proto.h"
32 #include "net/spdy/spdy_http_stream.h"
33 #include "net/ssl/ssl_cert_request_info.h"
34 #include "net/ssl/ssl_config.h"
35 #include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
36 #include "url/scheme_host_port.h"
37 #include "url/url_constants.h"
38
39 namespace net {
40
41 // static
42 std::optional<HttpStreamPool::JobController::Alternative>
CalculateAlternative(HttpStreamPool * pool,const HttpStreamKey & origin_stream_key,const HttpStreamPoolRequestInfo & request_info,bool enable_alternative_services)43 HttpStreamPool::JobController::CalculateAlternative(
44 HttpStreamPool* pool,
45 const HttpStreamKey& origin_stream_key,
46 const HttpStreamPoolRequestInfo& request_info,
47 bool enable_alternative_services) {
48 const NextProto protocol = request_info.alternative_service_info.protocol();
49
50 if (!enable_alternative_services || protocol == NextProto::kProtoUnknown) {
51 return std::nullopt;
52 }
53
54 CHECK(protocol == NextProto::kProtoHTTP2 ||
55 protocol == NextProto::kProtoQUIC);
56
57 url::SchemeHostPort destination(
58 url::kHttpsScheme,
59 request_info.alternative_service_info.GetHostPortPair().host(),
60 request_info.alternative_service_info.GetHostPortPair().port());
61
62 // If the alternative endpoint's destination is the same as origin, we don't
63 // need an alternative job since the origin job will handle all protocols for
64 // the destination.
65 if (destination == request_info.destination) {
66 return std::nullopt;
67 }
68
69 HttpStreamKey stream_key(
70 destination, request_info.privacy_mode, request_info.socket_tag,
71 request_info.network_anonymization_key, request_info.secure_dns_policy,
72 request_info.disable_cert_network_fetches);
73
74 Alternative alternative = {
75 .stream_key = std::move(stream_key),
76 .protocol = request_info.alternative_service_info.protocol(),
77 .quic_version = quic::ParsedQuicVersion::Unsupported()};
78
79 if (protocol == NextProto::kProtoQUIC) {
80 alternative.quic_version =
81 pool->SelectQuicVersion(request_info.alternative_service_info);
82 alternative.quic_key =
83 origin_stream_key.CalculateQuicSessionAliasKey(std::move(destination));
84 }
85
86 return alternative;
87 }
88
JobController(HttpStreamPool * pool,HttpStreamPoolRequestInfo request_info,RequestPriority priority,std::vector<SSLConfig::CertAndStatus> allowed_bad_certs,bool enable_ip_based_pooling,bool enable_alternative_services)89 HttpStreamPool::JobController::JobController(
90 HttpStreamPool* pool,
91 HttpStreamPoolRequestInfo request_info,
92 RequestPriority priority,
93 std::vector<SSLConfig::CertAndStatus> allowed_bad_certs,
94 bool enable_ip_based_pooling,
95 bool enable_alternative_services)
96 : pool_(pool),
97 priority_(priority),
98 allowed_bad_certs_(std::move(allowed_bad_certs)),
99 enable_ip_based_pooling_(enable_ip_based_pooling),
100 enable_alternative_services_(enable_alternative_services),
101 respect_limits_(request_info.load_flags & LOAD_IGNORE_LIMITS
102 ? RespectLimits::kIgnore
103 : RespectLimits::kRespect),
104 is_http1_allowed_(request_info.is_http1_allowed),
105 proxy_info_(request_info.proxy_info),
106 alternative_service_info_(request_info.alternative_service_info),
107 origin_stream_key_(request_info.destination,
108 request_info.privacy_mode,
109 request_info.socket_tag,
110 request_info.network_anonymization_key,
111 request_info.secure_dns_policy,
112 request_info.disable_cert_network_fetches),
113 origin_quic_key_(origin_stream_key_.CalculateQuicSessionAliasKey()),
114 alternative_(CalculateAlternative(pool,
115 origin_stream_key_,
116 request_info,
117 enable_alternative_services_)) {
118 CHECK(proxy_info_.is_direct());
119 if (!alternative_.has_value() &&
120 alternative_service_info_.protocol() == NextProto::kProtoQUIC) {
121 origin_quic_version_ = pool_->SelectQuicVersion(alternative_service_info_);
122 }
123 }
124
125 HttpStreamPool::JobController::~JobController() = default;
126
RequestStream(HttpStreamRequest::Delegate * delegate,const NetLogWithSource & net_log)127 std::unique_ptr<HttpStreamRequest> HttpStreamPool::JobController::RequestStream(
128 HttpStreamRequest::Delegate* delegate,
129 const NetLogWithSource& net_log) {
130 CHECK(!delegate_);
131 CHECK(!stream_request_);
132
133 if (pool_->delegate_for_testing_) {
134 pool_->delegate_for_testing_->OnRequestStream(origin_stream_key_);
135 }
136
137 delegate_ = delegate;
138 auto stream_request = std::make_unique<HttpStreamRequest>(
139 this, /*websocket_handshake_stream_create_helper=*/nullptr, net_log,
140 HttpStreamRequest::HTTP_STREAM);
141 stream_request_ = stream_request.get();
142
143 std::unique_ptr<HttpStream> quic_http_stream =
144 MaybeCreateStreamFromExistingQuicSession();
145 if (quic_http_stream) {
146 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
147 FROM_HERE,
148 base::BindOnce(
149 &HttpStreamPool::JobController::CallRequestCompleteAndStreamReady,
150 weak_ptr_factory_.GetWeakPtr(), std::move(quic_http_stream),
151 NextProto::kProtoQUIC));
152 return stream_request;
153 }
154
155 SpdySessionKey spdy_session_key =
156 origin_stream_key_.CalculateSpdySessionKey();
157 base::WeakPtr<SpdySession> spdy_session = pool_->FindAvailableSpdySession(
158 origin_stream_key_, spdy_session_key, enable_ip_based_pooling_, net_log);
159 if (spdy_session) {
160 auto http_stream = std::make_unique<SpdyHttpStream>(
161 spdy_session, net_log.source(),
162 spdy_session_pool()->GetDnsAliasesForSessionKey(spdy_session_key));
163 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
164 FROM_HERE,
165 base::BindOnce(
166 &HttpStreamPool::JobController::CallRequestCompleteAndStreamReady,
167 weak_ptr_factory_.GetWeakPtr(), std::move(http_stream),
168 NextProto::kProtoHTTP2));
169 return stream_request;
170 }
171
172 if (alternative_.has_value()) {
173 alternative_job_ =
174 pool_
175 ->GetOrCreateGroup(alternative_->stream_key, alternative_->quic_key)
176 .CreateJob(this, alternative_->quic_version, alternative_->protocol,
177 net_log);
178 alternative_job_->Start();
179 } else {
180 alternative_job_result_ = OK;
181 }
182
183 const bool alternative_job_succeeded = alternative_job_ &&
184 alternative_job_result_.has_value() &&
185 *alternative_job_result_ == OK;
186 if (!alternative_job_succeeded) {
187 origin_job_ = pool_->GetOrCreateGroup(origin_stream_key_, origin_quic_key_)
188 .CreateJob(this, origin_quic_version_,
189 NextProto::kProtoUnknown, net_log);
190 origin_job_->Start();
191 }
192
193 return stream_request;
194 }
195
Preconnect(size_t num_streams,CompletionOnceCallback callback)196 int HttpStreamPool::JobController::Preconnect(
197 size_t num_streams,
198 CompletionOnceCallback callback) {
199 num_streams = std::min(kDefaultMaxStreamSocketsPerGroup, num_streams);
200
201 if (!IsPortAllowedForScheme(origin_stream_key_.destination().port(),
202 origin_stream_key_.destination().scheme())) {
203 return ERR_UNSAFE_PORT;
204 }
205
206 if (CanUseExistingQuicSession()) {
207 return OK;
208 }
209
210 SpdySessionKey spdy_session_key =
211 origin_stream_key_.CalculateSpdySessionKey();
212 bool had_spdy_session = spdy_session_pool()->HasAvailableSession(
213 spdy_session_key, /*is_websocket=*/false);
214 if (pool_->FindAvailableSpdySession(origin_stream_key_, spdy_session_key,
215 /*enable_ip_based_pooling=*/true)) {
216 return OK;
217 }
218 if (had_spdy_session) {
219 // We had a SPDY session but the server required HTTP/1.1. The session is
220 // going away right now.
221 return ERR_HTTP_1_1_REQUIRED;
222 }
223
224 if (pool_->delegate_for_testing_) {
225 // Some tests expect OnPreconnect() is called after checking existing
226 // sessions.
227 std::optional<int> result = pool_->delegate_for_testing_->OnPreconnect(
228 origin_stream_key_, num_streams);
229 if (result.has_value()) {
230 return *result;
231 }
232 }
233
234 return pool_->GetOrCreateGroup(origin_stream_key_, origin_quic_key_)
235 .Preconnect(num_streams, origin_quic_version_, std::move(callback));
236 }
237
priority() const238 RequestPriority HttpStreamPool::JobController::priority() const {
239 return priority_;
240 }
241
respect_limits() const242 HttpStreamPool::RespectLimits HttpStreamPool::JobController::respect_limits()
243 const {
244 return respect_limits_;
245 }
246
247 const std::vector<SSLConfig::CertAndStatus>&
allowed_bad_certs() const248 HttpStreamPool::JobController::allowed_bad_certs() const {
249 return allowed_bad_certs_;
250 }
251
enable_ip_based_pooling() const252 bool HttpStreamPool::JobController::enable_ip_based_pooling() const {
253 return enable_ip_based_pooling_;
254 }
255
enable_alternative_services() const256 bool HttpStreamPool::JobController::enable_alternative_services() const {
257 return enable_alternative_services_;
258 }
259
is_http1_allowed() const260 bool HttpStreamPool::JobController::is_http1_allowed() const {
261 return is_http1_allowed_;
262 }
263
proxy_info() const264 const ProxyInfo& HttpStreamPool::JobController::proxy_info() const {
265 return proxy_info_;
266 }
267
OnStreamReady(Job * job,std::unique_ptr<HttpStream> stream,NextProto negotiated_protocol)268 void HttpStreamPool::JobController::OnStreamReady(
269 Job* job,
270 std::unique_ptr<HttpStream> stream,
271 NextProto negotiated_protocol) {
272 SetJobResult(job, OK);
273 // Use PostTask to align the behavior with HttpStreamFactory::Job, see
274 // https://crrev.com/2827533002.
275 // TODO(crbug.com/346835898): Avoid using PostTask here if possible.
276 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
277 FROM_HERE,
278 base::BindOnce(&JobController::CallRequestCompleteAndStreamReady,
279 weak_ptr_factory_.GetWeakPtr(), std::move(stream),
280 negotiated_protocol));
281 }
282
OnStreamFailed(Job * job,int status,const NetErrorDetails & net_error_details,ResolveErrorInfo resolve_error_info)283 void HttpStreamPool::JobController::OnStreamFailed(
284 Job* job,
285 int status,
286 const NetErrorDetails& net_error_details,
287 ResolveErrorInfo resolve_error_info) {
288 stream_request_->AddConnectionAttempts(job->connection_attempts());
289 SetJobResult(job, status);
290 if (AllJobsFinished()) {
291 // Use PostTask to align the behavior with HttpStreamFactory::Job, see
292 // https://crrev.com/2827533002.
293 // TODO(crbug.com/346835898): Avoid using PostTask here if possible.
294 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
295 FROM_HERE,
296 base::BindOnce(&JobController::CallOnStreamFailed,
297 weak_ptr_factory_.GetWeakPtr(), status,
298 net_error_details, std::move(resolve_error_info)));
299 }
300 }
301
OnCertificateError(Job * job,int status,const SSLInfo & ssl_info)302 void HttpStreamPool::JobController::OnCertificateError(
303 Job* job,
304 int status,
305 const SSLInfo& ssl_info) {
306 stream_request_->AddConnectionAttempts(job->connection_attempts());
307 CancelOtherJob(job);
308 // Use PostTask to align the behavior with HttpStreamFactory::Job, see
309 // https://crrev.com/2827533002.
310 // TODO(crbug.com/346835898): Avoid using PostTask here if possible.
311 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
312 FROM_HERE,
313 base::BindOnce(&JobController::CallOnCertificateError,
314 weak_ptr_factory_.GetWeakPtr(), status, ssl_info));
315 }
316
OnNeedsClientAuth(Job * job,SSLCertRequestInfo * cert_info)317 void HttpStreamPool::JobController::OnNeedsClientAuth(
318 Job* job,
319 SSLCertRequestInfo* cert_info) {
320 stream_request_->AddConnectionAttempts(job->connection_attempts());
321 CancelOtherJob(job);
322 // Use PostTask to align the behavior with HttpStreamFactory::Job, see
323 // https://crrev.com/2827533002.
324 // TODO(crbug.com/346835898): Avoid using PostTask here if possible.
325 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
326 FROM_HERE, base::BindOnce(&JobController::CallOnNeedsClientAuth,
327 weak_ptr_factory_.GetWeakPtr(),
328 base::RetainedRef(cert_info)));
329 }
330
GetLoadState() const331 LoadState HttpStreamPool::JobController::GetLoadState() const {
332 CHECK(stream_request_);
333 if (stream_request_->completed()) {
334 return LOAD_STATE_IDLE;
335 }
336
337 if (origin_job_) {
338 return origin_job_->GetLoadState();
339 }
340 if (alternative_job_) {
341 return alternative_job_->GetLoadState();
342 }
343 return LOAD_STATE_IDLE;
344 }
345
OnRequestComplete()346 void HttpStreamPool::JobController::OnRequestComplete() {
347 delegate_ = nullptr;
348 stream_request_ = nullptr;
349
350 origin_job_.reset();
351 alternative_job_.reset();
352 MaybeMarkAlternativeServiceBroken();
353
354 pool_->OnJobControllerComplete(this);
355 // `this` is deleted.
356 }
357
RestartTunnelWithProxyAuth()358 int HttpStreamPool::JobController::RestartTunnelWithProxyAuth() {
359 NOTREACHED();
360 }
361
SetPriority(RequestPriority priority)362 void HttpStreamPool::JobController::SetPriority(RequestPriority priority) {
363 if (origin_job_) {
364 origin_job_->SetPriority(priority);
365 }
366 if (alternative_job_) {
367 alternative_job_->SetPriority(priority);
368 }
369 }
370
quic_session_pool()371 QuicSessionPool* HttpStreamPool::JobController::quic_session_pool() {
372 return pool_->http_network_session()->quic_session_pool();
373 }
374
spdy_session_pool()375 SpdySessionPool* HttpStreamPool::JobController::spdy_session_pool() {
376 return pool_->http_network_session()->spdy_session_pool();
377 }
378
379 std::unique_ptr<HttpStream>
MaybeCreateStreamFromExistingQuicSession()380 HttpStreamPool::JobController::MaybeCreateStreamFromExistingQuicSession() {
381 std::unique_ptr<HttpStream> stream =
382 MaybeCreateStreamFromExistingQuicSessionInternal(origin_quic_key_);
383 if (stream) {
384 return stream;
385 }
386
387 if (alternative_.has_value()) {
388 stream = MaybeCreateStreamFromExistingQuicSessionInternal(
389 alternative_->quic_key);
390 }
391
392 return stream;
393 }
394
395 std::unique_ptr<HttpStream>
MaybeCreateStreamFromExistingQuicSessionInternal(const QuicSessionAliasKey & key)396 HttpStreamPool::JobController::MaybeCreateStreamFromExistingQuicSessionInternal(
397 const QuicSessionAliasKey& key) {
398 if (!key.destination().IsValid() ||
399 !pool_->CanUseQuic(
400 key.destination(), key.session_key().network_anonymization_key(),
401 enable_ip_based_pooling_, enable_alternative_services_)) {
402 return nullptr;
403 }
404
405 QuicChromiumClientSession* quic_session =
406 quic_session_pool()->FindExistingSession(key.session_key(),
407 key.destination());
408 if (quic_session) {
409 return std::make_unique<QuicHttpStream>(
410 quic_session->CreateHandle(key.destination()),
411 quic_session->GetDnsAliasesForSessionKey(key.session_key()));
412 }
413
414 if (alternative_.has_value()) {
415 return nullptr;
416 }
417
418 return nullptr;
419 }
420
CanUseExistingQuicSession()421 bool HttpStreamPool::JobController::CanUseExistingQuicSession() {
422 return pool_->CanUseExistingQuicSession(
423 origin_quic_key_, enable_ip_based_pooling_, enable_alternative_services_);
424 }
425
CallRequestCompleteAndStreamReady(std::unique_ptr<HttpStream> stream,NextProto negotiated_protocol)426 void HttpStreamPool::JobController::CallRequestCompleteAndStreamReady(
427 std::unique_ptr<HttpStream> stream,
428 NextProto negotiated_protocol) {
429 CHECK(stream_request_);
430 CHECK(delegate_);
431 stream_request_->Complete(negotiated_protocol,
432 ALTERNATE_PROTOCOL_USAGE_UNSPECIFIED_REASON);
433 delegate_->OnStreamReady(proxy_info_, std::move(stream));
434 }
435
CallOnStreamFailed(int status,const NetErrorDetails & net_error_details,ResolveErrorInfo resolve_error_info)436 void HttpStreamPool::JobController::CallOnStreamFailed(
437 int status,
438 const NetErrorDetails& net_error_details,
439 ResolveErrorInfo resolve_error_info) {
440 delegate_->OnStreamFailed(status, net_error_details, proxy_info_,
441 std::move(resolve_error_info));
442 }
443
CallOnCertificateError(int status,const SSLInfo & ssl_info)444 void HttpStreamPool::JobController::CallOnCertificateError(
445 int status,
446 const SSLInfo& ssl_info) {
447 delegate_->OnCertificateError(status, ssl_info);
448 }
449
CallOnNeedsClientAuth(SSLCertRequestInfo * cert_info)450 void HttpStreamPool::JobController::CallOnNeedsClientAuth(
451 SSLCertRequestInfo* cert_info) {
452 delegate_->OnNeedsClientAuth(cert_info);
453 }
454
SetJobResult(Job * job,int status)455 void HttpStreamPool::JobController::SetJobResult(Job* job, int status) {
456 if (origin_job_.get() == job) {
457 origin_job_result_ = status;
458 } else if (alternative_job_.get() == job) {
459 alternative_job_result_ = status;
460 } else {
461 NOTREACHED();
462 }
463 }
464
CancelOtherJob(Job * job)465 void HttpStreamPool::JobController::CancelOtherJob(Job* job) {
466 if (origin_job_.get() == job) {
467 alternative_job_.reset();
468 } else if (alternative_job_.get() == job) {
469 origin_job_.reset();
470 } else {
471 NOTREACHED();
472 }
473 }
474
AllJobsFinished()475 bool HttpStreamPool::JobController::AllJobsFinished() {
476 return origin_job_result_.has_value() && alternative_job_result_.has_value();
477 }
478
MaybeMarkAlternativeServiceBroken()479 void HttpStreamPool::JobController::MaybeMarkAlternativeServiceBroken() {
480 // If alternative job succeeds or not completed, no brokenness to report.
481 if (!alternative_job_result_.has_value() || *alternative_job_result_ == OK) {
482 return;
483 }
484
485 // No brokenness to report if the origin job fails.
486 if (origin_job_result_.has_value() && *origin_job_result_ != OK) {
487 return;
488 }
489
490 CHECK(alternative_.has_value());
491
492 pool_->http_network_session()
493 ->http_server_properties()
494 ->MarkAlternativeServiceBroken(
495 alternative_service_info_.alternative_service(),
496 alternative_->stream_key.network_anonymization_key());
497 }
498
499 } // namespace net
500