1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_stream_pool_group.h"
6
7 #include "base/task/sequenced_task_runner.h"
8 #include "base/types/expected.h"
9 #include "net/base/completion_once_callback.h"
10 #include "net/base/load_timing_info.h"
11 #include "net/base/net_errors.h"
12 #include "net/http/http_basic_stream.h"
13 #include "net/http/http_network_session.h"
14 #include "net/http/http_stream.h"
15 #include "net/http/http_stream_key.h"
16 #include "net/http/http_stream_pool_attempt_manager.h"
17 #include "net/http/http_stream_pool_handle.h"
18 #include "net/log/net_log_event_type.h"
19 #include "net/socket/next_proto.h"
20 #include "net/socket/stream_socket.h"
21 #include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
22
23 namespace net {
24
25 namespace {
26
IsNegotiatedProtocolTextBased(NextProto next_proto)27 bool IsNegotiatedProtocolTextBased(NextProto next_proto) {
28 return next_proto == kProtoUnknown || next_proto == kProtoHTTP11;
29 }
30
RecordNetLogClosingSocket(const StreamSocket & stream_socket,std::string_view reason)31 void RecordNetLogClosingSocket(const StreamSocket& stream_socket,
32 std::string_view reason) {
33 stream_socket.NetLog().AddEventWithStringParams(
34 NetLogEventType::HTTP_STREAM_POOL_CLOSING_SOCKET, "reason", reason);
35 }
36
37 } // namespace
38
39 // static
40 base::expected<void, std::string_view>
IsIdleStreamSocketUsable(const IdleStreamSocket & idle)41 HttpStreamPool::Group::IsIdleStreamSocketUsable(const IdleStreamSocket& idle) {
42 base::TimeDelta timeout = idle.stream_socket->WasEverUsed()
43 ? kUsedIdleStreamSocketTimeout
44 : kUnusedIdleStreamSocketTimeout;
45 if (base::TimeTicks::Now() - idle.time_became_idle >= timeout) {
46 return base::unexpected(kIdleTimeLimitExpired);
47 }
48
49 if (idle.stream_socket->WasEverUsed()) {
50 if (idle.stream_socket->IsConnectedAndIdle()) {
51 return base::ok();
52 }
53 if (idle.stream_socket->IsConnected()) {
54 return base::unexpected(kDataReceivedUnexpectedly);
55 } else {
56 return base::unexpected(kRemoteSideClosedConnection);
57 }
58 }
59
60 if (idle.stream_socket->IsConnected()) {
61 return base::ok();
62 }
63
64 return base::unexpected(kRemoteSideClosedConnection);
65 }
66
IdleStreamSocket(std::unique_ptr<StreamSocket> stream_socket,base::TimeTicks time_became_idle)67 HttpStreamPool::Group::IdleStreamSocket::IdleStreamSocket(
68 std::unique_ptr<StreamSocket> stream_socket,
69 base::TimeTicks time_became_idle)
70 : stream_socket(std::move(stream_socket)),
71 time_became_idle(time_became_idle) {}
72
73 HttpStreamPool::Group::IdleStreamSocket::~IdleStreamSocket() = default;
74
Group(HttpStreamPool * pool,HttpStreamKey stream_key,std::optional<QuicSessionAliasKey> quic_session_alias_key)75 HttpStreamPool::Group::Group(
76 HttpStreamPool* pool,
77 HttpStreamKey stream_key,
78 std::optional<QuicSessionAliasKey> quic_session_alias_key)
79 : pool_(pool),
80 stream_key_(std::move(stream_key)),
81 spdy_session_key_(stream_key_.CalculateSpdySessionKey()),
82 quic_session_alias_key_(quic_session_alias_key.has_value()
83 ? std::move(*quic_session_alias_key)
84 : stream_key_.CalculateQuicSessionAliasKey()),
85 net_log_(
86 NetLogWithSource::Make(http_network_session()->net_log(),
87 NetLogSourceType::HTTP_STREAM_POOL_GROUP)),
88 force_quic_(
89 http_network_session()->ShouldForceQuic(stream_key_.destination(),
90 ProxyInfo::Direct(),
91 /*is_websocket=*/false)) {
92 net_log_.BeginEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE, [&] {
93 base::Value::Dict dict;
94 dict.Set("stream_key", stream_key_.ToValue());
95 dict.Set("force_quic", force_quic_);
96 return dict;
97 });
98 }
99
~Group()100 HttpStreamPool::Group::~Group() {
101 // TODO(crbug.com/346835898): Ensure `pool_`'s total active stream counts
102 // are consistent.
103 net_log_.EndEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE);
104 }
105
CreateJob(Job::Delegate * delegate,quic::ParsedQuicVersion quic_version,NextProto expected_protocol,const NetLogWithSource & net_log)106 std::unique_ptr<HttpStreamPool::Job> HttpStreamPool::Group::CreateJob(
107 Job::Delegate* delegate,
108 quic::ParsedQuicVersion quic_version,
109 NextProto expected_protocol,
110 const NetLogWithSource& net_log) {
111 EnsureAttemptManager();
112 return std::make_unique<Job>(delegate, attempt_manager_.get(), quic_version,
113 expected_protocol, net_log);
114 }
115
Preconnect(size_t num_streams,quic::ParsedQuicVersion quic_version,CompletionOnceCallback callback)116 int HttpStreamPool::Group::Preconnect(size_t num_streams,
117 quic::ParsedQuicVersion quic_version,
118 CompletionOnceCallback callback) {
119 if (ActiveStreamSocketCount() >= num_streams) {
120 return OK;
121 }
122
123 EnsureAttemptManager();
124 return attempt_manager_->Preconnect(num_streams, quic_version,
125 std::move(callback));
126 }
127
CreateHandle(std::unique_ptr<StreamSocket> socket,StreamSocketHandle::SocketReuseType reuse_type,LoadTimingInfo::ConnectTiming connect_timing)128 std::unique_ptr<HttpStreamPoolHandle> HttpStreamPool::Group::CreateHandle(
129 std::unique_ptr<StreamSocket> socket,
130 StreamSocketHandle::SocketReuseType reuse_type,
131 LoadTimingInfo::ConnectTiming connect_timing) {
132 ++handed_out_stream_count_;
133 pool_->IncrementTotalHandedOutStreamCount();
134
135 auto handle = std::make_unique<HttpStreamPoolHandle>(
136 weak_ptr_factory_.GetWeakPtr(), std::move(socket), generation_);
137 handle->set_connect_timing(connect_timing);
138 handle->set_reuse_type(reuse_type);
139 return handle;
140 }
141
CreateTextBasedStream(std::unique_ptr<StreamSocket> socket,StreamSocketHandle::SocketReuseType reuse_type,LoadTimingInfo::ConnectTiming connect_timing)142 std::unique_ptr<HttpStream> HttpStreamPool::Group::CreateTextBasedStream(
143 std::unique_ptr<StreamSocket> socket,
144 StreamSocketHandle::SocketReuseType reuse_type,
145 LoadTimingInfo::ConnectTiming connect_timing) {
146 CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
147 return std::make_unique<HttpBasicStream>(
148 CreateHandle(std::move(socket), reuse_type, std::move(connect_timing)),
149 /*is_for_get_to_http_proxy=*/false);
150 }
151
ReleaseStreamSocket(std::unique_ptr<StreamSocket> socket,int64_t generation)152 void HttpStreamPool::Group::ReleaseStreamSocket(
153 std::unique_ptr<StreamSocket> socket,
154 int64_t generation) {
155 CHECK_GT(handed_out_stream_count_, 0u);
156 --handed_out_stream_count_;
157 pool_->DecrementTotalHandedOutStreamCount();
158
159 bool reusable = false;
160 std::string_view not_reusable_reason;
161 if (!socket->IsConnectedAndIdle()) {
162 not_reusable_reason = socket->IsConnected()
163 ? kDataReceivedUnexpectedly
164 : kClosedConnectionReturnedToPool;
165 } else if (generation != generation_) {
166 not_reusable_reason = kSocketGenerationOutOfDate;
167 } else if (ReachedMaxStreamLimit()) {
168 not_reusable_reason = kExceededSocketLimits;
169 } else {
170 reusable = true;
171 }
172
173 if (reusable) {
174 AddIdleStreamSocket(std::move(socket));
175 ProcessPendingRequest();
176 } else {
177 RecordNetLogClosingSocket(*socket, not_reusable_reason);
178 socket.reset();
179 }
180
181 pool_->ProcessPendingRequestsInGroups();
182 MaybeComplete();
183 }
184
AddIdleStreamSocket(std::unique_ptr<StreamSocket> socket)185 void HttpStreamPool::Group::AddIdleStreamSocket(
186 std::unique_ptr<StreamSocket> socket) {
187 CHECK(socket->IsConnectedAndIdle());
188 CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
189 CHECK_LE(ActiveStreamSocketCount(), pool_->max_stream_sockets_per_group());
190
191 idle_stream_sockets_.emplace_back(std::move(socket), base::TimeTicks::Now());
192 pool_->IncrementTotalIdleStreamCount();
193 CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, kIdleTimeLimitExpired);
194 MaybeComplete();
195 }
196
GetIdleStreamSocket()197 std::unique_ptr<StreamSocket> HttpStreamPool::Group::GetIdleStreamSocket() {
198 // Iterate through the idle streams from oldtest to newest and try to find a
199 // used idle stream. Prefer the newest used idle stream.
200 auto idle_it = idle_stream_sockets_.end();
201 for (auto it = idle_stream_sockets_.begin();
202 it != idle_stream_sockets_.end();) {
203 const base::expected<void, std::string_view> usable_result =
204 IsIdleStreamSocketUsable(*it);
205 if (!usable_result.has_value()) {
206 RecordNetLogClosingSocket(*it->stream_socket, usable_result.error());
207 it = idle_stream_sockets_.erase(it);
208 pool_->DecrementTotalIdleStreamCount();
209 continue;
210 }
211 if (it->stream_socket->WasEverUsed()) {
212 idle_it = it;
213 }
214 ++it;
215 }
216
217 if (idle_stream_sockets_.empty()) {
218 return nullptr;
219 }
220
221 if (idle_it == idle_stream_sockets_.end()) {
222 // There are no used idle streams. Pick the oldest (first) idle streams
223 // (FIFO).
224 idle_it = idle_stream_sockets_.begin();
225 }
226
227 CHECK(idle_it != idle_stream_sockets_.end());
228
229 std::unique_ptr<StreamSocket> stream_socket =
230 std::move(idle_it->stream_socket);
231 idle_stream_sockets_.erase(idle_it);
232 pool_->DecrementTotalIdleStreamCount();
233
234 return stream_socket;
235 }
236
ProcessPendingRequest()237 void HttpStreamPool::Group::ProcessPendingRequest() {
238 if (!attempt_manager_) {
239 return;
240 }
241 attempt_manager_->ProcessPendingJob();
242 }
243
CloseOneIdleStreamSocket()244 bool HttpStreamPool::Group::CloseOneIdleStreamSocket() {
245 if (idle_stream_sockets_.empty()) {
246 return false;
247 }
248
249 RecordNetLogClosingSocket(*idle_stream_sockets_.front().stream_socket,
250 kExceededSocketLimits);
251 idle_stream_sockets_.pop_front();
252 pool_->DecrementTotalIdleStreamCount();
253 if (CanComplete()) {
254 // Use PostTask since MaybeComplete() may delete `this`, and this method
255 // could be called while iterating all groups.
256 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
257 FROM_HERE,
258 base::BindOnce(&Group::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
259 }
260 return true;
261 }
262
ConnectingStreamSocketCount() const263 size_t HttpStreamPool::Group::ConnectingStreamSocketCount() const {
264 return attempt_manager_ ? attempt_manager_->InFlightAttemptCount() : 0;
265 }
266
ActiveStreamSocketCount() const267 size_t HttpStreamPool::Group::ActiveStreamSocketCount() const {
268 return handed_out_stream_count_ + idle_stream_sockets_.size() +
269 ConnectingStreamSocketCount();
270 }
271
ReachedMaxStreamLimit() const272 bool HttpStreamPool::Group::ReachedMaxStreamLimit() const {
273 return ActiveStreamSocketCount() >= pool_->max_stream_sockets_per_group();
274 }
275
276 std::optional<RequestPriority>
GetPriorityIfStalledByPoolLimit() const277 HttpStreamPool::Group::GetPriorityIfStalledByPoolLimit() const {
278 if (!attempt_manager_) {
279 return std::nullopt;
280 }
281
282 return attempt_manager_->IsStalledByPoolLimit()
283 ? std::make_optional(attempt_manager_->GetPriority())
284 : std::nullopt;
285 }
286
FlushWithError(int error,StreamCloseReason attempt_cancel_reason,std::string_view net_log_close_reason_utf8)287 void HttpStreamPool::Group::FlushWithError(
288 int error,
289 StreamCloseReason attempt_cancel_reason,
290 std::string_view net_log_close_reason_utf8) {
291 // Refresh() may delete this. Get a weak pointer to this and call CancelJobs()
292 // only when this is still alive.
293 base::WeakPtr<Group> weak_this = weak_ptr_factory_.GetWeakPtr();
294 Refresh(net_log_close_reason_utf8, attempt_cancel_reason);
295 if (weak_this) {
296 CancelJobs(error);
297 }
298 }
299
Refresh(std::string_view net_log_close_reason_utf8,StreamCloseReason cancel_reason)300 void HttpStreamPool::Group::Refresh(std::string_view net_log_close_reason_utf8,
301 StreamCloseReason cancel_reason) {
302 ++generation_;
303 CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
304 if (attempt_manager_) {
305 attempt_manager_->CancelInFlightAttempts(cancel_reason);
306 }
307 }
308
CloseIdleStreams(std::string_view net_log_close_reason_utf8)309 void HttpStreamPool::Group::CloseIdleStreams(
310 std::string_view net_log_close_reason_utf8) {
311 CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
312 if (CanComplete()) {
313 // Use PostTask since MaybeComplete() may delete `this`, and this method
314 // could be called while iterating all groups.
315 base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
316 FROM_HERE,
317 base::BindOnce(&Group::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
318 }
319 }
320
CancelJobs(int error)321 void HttpStreamPool::Group::CancelJobs(int error) {
322 if (attempt_manager_) {
323 attempt_manager_->CancelJobs(error);
324 }
325 }
326
OnRequiredHttp11()327 void HttpStreamPool::Group::OnRequiredHttp11() {
328 if (attempt_manager_) {
329 attempt_manager_->OnRequiredHttp11();
330 }
331 }
332
OnAttemptManagerComplete()333 void HttpStreamPool::Group::OnAttemptManagerComplete() {
334 CHECK(attempt_manager_);
335 attempt_manager_.reset();
336 MaybeComplete();
337 }
338
GetInfoAsValue() const339 base::Value::Dict HttpStreamPool::Group::GetInfoAsValue() const {
340 base::Value::Dict dict;
341 dict.Set("active_socket_count", static_cast<int>(ActiveStreamSocketCount()));
342 dict.Set("idle_socket_count", static_cast<int>(IdleStreamSocketCount()));
343 dict.Set("handed_out_socket_count",
344 static_cast<int>(HandedOutStreamSocketCount()));
345 if (attempt_manager_) {
346 dict.Set("attempt_state", attempt_manager_->GetInfoAsValue());
347 }
348 return dict;
349 }
350
CleanupTimedoutIdleStreamSocketsForTesting()351 void HttpStreamPool::Group::CleanupTimedoutIdleStreamSocketsForTesting() {
352 CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, "For testing");
353 }
354
CleanupIdleStreamSockets(CleanupMode mode,std::string_view net_log_close_reason_utf8)355 void HttpStreamPool::Group::CleanupIdleStreamSockets(
356 CleanupMode mode,
357 std::string_view net_log_close_reason_utf8) {
358 // Iterate though the idle sockets to delete any disconnected ones.
359 for (auto it = idle_stream_sockets_.begin();
360 it != idle_stream_sockets_.end();) {
361 bool should_delete = mode == CleanupMode::kForce;
362 const base::expected<void, std::string_view> usable_result =
363 IsIdleStreamSocketUsable(*it);
364 if (!usable_result.has_value()) {
365 should_delete = true;
366 }
367
368 if (should_delete) {
369 RecordNetLogClosingSocket(*it->stream_socket, net_log_close_reason_utf8);
370 it = idle_stream_sockets_.erase(it);
371 pool_->DecrementTotalIdleStreamCount();
372 } else {
373 ++it;
374 }
375 }
376 }
377
EnsureAttemptManager()378 void HttpStreamPool::Group::EnsureAttemptManager() {
379 if (attempt_manager_) {
380 return;
381 }
382 attempt_manager_ =
383 std::make_unique<AttemptManager>(this, http_network_session()->net_log());
384 }
385
CanComplete() const386 bool HttpStreamPool::Group::CanComplete() const {
387 return ActiveStreamSocketCount() == 0 && !attempt_manager_;
388 }
389
MaybeComplete()390 void HttpStreamPool::Group::MaybeComplete() {
391 if (!CanComplete()) {
392 return;
393 }
394
395 pool_->OnGroupComplete(this);
396 // `this` is deleted.
397 }
398
399 } // namespace net
400