• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/http/http_stream_pool_group.h"
6 
7 #include "base/task/sequenced_task_runner.h"
8 #include "base/types/expected.h"
9 #include "net/base/completion_once_callback.h"
10 #include "net/base/load_timing_info.h"
11 #include "net/base/net_errors.h"
12 #include "net/http/http_basic_stream.h"
13 #include "net/http/http_network_session.h"
14 #include "net/http/http_stream.h"
15 #include "net/http/http_stream_key.h"
16 #include "net/http/http_stream_pool_attempt_manager.h"
17 #include "net/http/http_stream_pool_handle.h"
18 #include "net/log/net_log_event_type.h"
19 #include "net/socket/next_proto.h"
20 #include "net/socket/stream_socket.h"
21 #include "net/third_party/quiche/src/quiche/quic/core/quic_versions.h"
22 
23 namespace net {
24 
25 namespace {
26 
IsNegotiatedProtocolTextBased(NextProto next_proto)27 bool IsNegotiatedProtocolTextBased(NextProto next_proto) {
28   return next_proto == kProtoUnknown || next_proto == kProtoHTTP11;
29 }
30 
RecordNetLogClosingSocket(const StreamSocket & stream_socket,std::string_view reason)31 void RecordNetLogClosingSocket(const StreamSocket& stream_socket,
32                                std::string_view reason) {
33   stream_socket.NetLog().AddEventWithStringParams(
34       NetLogEventType::HTTP_STREAM_POOL_CLOSING_SOCKET, "reason", reason);
35 }
36 
37 }  // namespace
38 
39 // static
40 base::expected<void, std::string_view>
IsIdleStreamSocketUsable(const IdleStreamSocket & idle)41 HttpStreamPool::Group::IsIdleStreamSocketUsable(const IdleStreamSocket& idle) {
42   base::TimeDelta timeout = idle.stream_socket->WasEverUsed()
43                                 ? kUsedIdleStreamSocketTimeout
44                                 : kUnusedIdleStreamSocketTimeout;
45   if (base::TimeTicks::Now() - idle.time_became_idle >= timeout) {
46     return base::unexpected(kIdleTimeLimitExpired);
47   }
48 
49   if (idle.stream_socket->WasEverUsed()) {
50     if (idle.stream_socket->IsConnectedAndIdle()) {
51       return base::ok();
52     }
53     if (idle.stream_socket->IsConnected()) {
54       return base::unexpected(kDataReceivedUnexpectedly);
55     } else {
56       return base::unexpected(kRemoteSideClosedConnection);
57     }
58   }
59 
60   if (idle.stream_socket->IsConnected()) {
61     return base::ok();
62   }
63 
64   return base::unexpected(kRemoteSideClosedConnection);
65 }
66 
IdleStreamSocket(std::unique_ptr<StreamSocket> stream_socket,base::TimeTicks time_became_idle)67 HttpStreamPool::Group::IdleStreamSocket::IdleStreamSocket(
68     std::unique_ptr<StreamSocket> stream_socket,
69     base::TimeTicks time_became_idle)
70     : stream_socket(std::move(stream_socket)),
71       time_became_idle(time_became_idle) {}
72 
73 HttpStreamPool::Group::IdleStreamSocket::~IdleStreamSocket() = default;
74 
Group(HttpStreamPool * pool,HttpStreamKey stream_key,std::optional<QuicSessionAliasKey> quic_session_alias_key)75 HttpStreamPool::Group::Group(
76     HttpStreamPool* pool,
77     HttpStreamKey stream_key,
78     std::optional<QuicSessionAliasKey> quic_session_alias_key)
79     : pool_(pool),
80       stream_key_(std::move(stream_key)),
81       spdy_session_key_(stream_key_.CalculateSpdySessionKey()),
82       quic_session_alias_key_(quic_session_alias_key.has_value()
83                                   ? std::move(*quic_session_alias_key)
84                                   : stream_key_.CalculateQuicSessionAliasKey()),
85       net_log_(
86           NetLogWithSource::Make(http_network_session()->net_log(),
87                                  NetLogSourceType::HTTP_STREAM_POOL_GROUP)),
88       force_quic_(
89           http_network_session()->ShouldForceQuic(stream_key_.destination(),
90                                                   ProxyInfo::Direct(),
91                                                   /*is_websocket=*/false)) {
92   net_log_.BeginEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE, [&] {
93     base::Value::Dict dict;
94     dict.Set("stream_key", stream_key_.ToValue());
95     dict.Set("force_quic", force_quic_);
96     return dict;
97   });
98 }
99 
~Group()100 HttpStreamPool::Group::~Group() {
101   // TODO(crbug.com/346835898): Ensure `pool_`'s total active stream counts
102   // are consistent.
103   net_log_.EndEvent(NetLogEventType::HTTP_STREAM_POOL_GROUP_ALIVE);
104 }
105 
CreateJob(Job::Delegate * delegate,quic::ParsedQuicVersion quic_version,NextProto expected_protocol,const NetLogWithSource & net_log)106 std::unique_ptr<HttpStreamPool::Job> HttpStreamPool::Group::CreateJob(
107     Job::Delegate* delegate,
108     quic::ParsedQuicVersion quic_version,
109     NextProto expected_protocol,
110     const NetLogWithSource& net_log) {
111   EnsureAttemptManager();
112   return std::make_unique<Job>(delegate, attempt_manager_.get(), quic_version,
113                                expected_protocol, net_log);
114 }
115 
Preconnect(size_t num_streams,quic::ParsedQuicVersion quic_version,CompletionOnceCallback callback)116 int HttpStreamPool::Group::Preconnect(size_t num_streams,
117                                       quic::ParsedQuicVersion quic_version,
118                                       CompletionOnceCallback callback) {
119   if (ActiveStreamSocketCount() >= num_streams) {
120     return OK;
121   }
122 
123   EnsureAttemptManager();
124   return attempt_manager_->Preconnect(num_streams, quic_version,
125                                       std::move(callback));
126 }
127 
CreateHandle(std::unique_ptr<StreamSocket> socket,StreamSocketHandle::SocketReuseType reuse_type,LoadTimingInfo::ConnectTiming connect_timing)128 std::unique_ptr<HttpStreamPoolHandle> HttpStreamPool::Group::CreateHandle(
129     std::unique_ptr<StreamSocket> socket,
130     StreamSocketHandle::SocketReuseType reuse_type,
131     LoadTimingInfo::ConnectTiming connect_timing) {
132   ++handed_out_stream_count_;
133   pool_->IncrementTotalHandedOutStreamCount();
134 
135   auto handle = std::make_unique<HttpStreamPoolHandle>(
136       weak_ptr_factory_.GetWeakPtr(), std::move(socket), generation_);
137   handle->set_connect_timing(connect_timing);
138   handle->set_reuse_type(reuse_type);
139   return handle;
140 }
141 
CreateTextBasedStream(std::unique_ptr<StreamSocket> socket,StreamSocketHandle::SocketReuseType reuse_type,LoadTimingInfo::ConnectTiming connect_timing)142 std::unique_ptr<HttpStream> HttpStreamPool::Group::CreateTextBasedStream(
143     std::unique_ptr<StreamSocket> socket,
144     StreamSocketHandle::SocketReuseType reuse_type,
145     LoadTimingInfo::ConnectTiming connect_timing) {
146   CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
147   return std::make_unique<HttpBasicStream>(
148       CreateHandle(std::move(socket), reuse_type, std::move(connect_timing)),
149       /*is_for_get_to_http_proxy=*/false);
150 }
151 
ReleaseStreamSocket(std::unique_ptr<StreamSocket> socket,int64_t generation)152 void HttpStreamPool::Group::ReleaseStreamSocket(
153     std::unique_ptr<StreamSocket> socket,
154     int64_t generation) {
155   CHECK_GT(handed_out_stream_count_, 0u);
156   --handed_out_stream_count_;
157   pool_->DecrementTotalHandedOutStreamCount();
158 
159   bool reusable = false;
160   std::string_view not_reusable_reason;
161   if (!socket->IsConnectedAndIdle()) {
162     not_reusable_reason = socket->IsConnected()
163                               ? kDataReceivedUnexpectedly
164                               : kClosedConnectionReturnedToPool;
165   } else if (generation != generation_) {
166     not_reusable_reason = kSocketGenerationOutOfDate;
167   } else if (ReachedMaxStreamLimit()) {
168     not_reusable_reason = kExceededSocketLimits;
169   } else {
170     reusable = true;
171   }
172 
173   if (reusable) {
174     AddIdleStreamSocket(std::move(socket));
175     ProcessPendingRequest();
176   } else {
177     RecordNetLogClosingSocket(*socket, not_reusable_reason);
178     socket.reset();
179   }
180 
181   pool_->ProcessPendingRequestsInGroups();
182   MaybeComplete();
183 }
184 
AddIdleStreamSocket(std::unique_ptr<StreamSocket> socket)185 void HttpStreamPool::Group::AddIdleStreamSocket(
186     std::unique_ptr<StreamSocket> socket) {
187   CHECK(socket->IsConnectedAndIdle());
188   CHECK(IsNegotiatedProtocolTextBased(socket->GetNegotiatedProtocol()));
189   CHECK_LE(ActiveStreamSocketCount(), pool_->max_stream_sockets_per_group());
190 
191   idle_stream_sockets_.emplace_back(std::move(socket), base::TimeTicks::Now());
192   pool_->IncrementTotalIdleStreamCount();
193   CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, kIdleTimeLimitExpired);
194   MaybeComplete();
195 }
196 
GetIdleStreamSocket()197 std::unique_ptr<StreamSocket> HttpStreamPool::Group::GetIdleStreamSocket() {
198   // Iterate through the idle streams from oldtest to newest and try to find a
199   // used idle stream. Prefer the newest used idle stream.
200   auto idle_it = idle_stream_sockets_.end();
201   for (auto it = idle_stream_sockets_.begin();
202        it != idle_stream_sockets_.end();) {
203     const base::expected<void, std::string_view> usable_result =
204         IsIdleStreamSocketUsable(*it);
205     if (!usable_result.has_value()) {
206       RecordNetLogClosingSocket(*it->stream_socket, usable_result.error());
207       it = idle_stream_sockets_.erase(it);
208       pool_->DecrementTotalIdleStreamCount();
209       continue;
210     }
211     if (it->stream_socket->WasEverUsed()) {
212       idle_it = it;
213     }
214     ++it;
215   }
216 
217   if (idle_stream_sockets_.empty()) {
218     return nullptr;
219   }
220 
221   if (idle_it == idle_stream_sockets_.end()) {
222     // There are no used idle streams. Pick the oldest (first) idle streams
223     // (FIFO).
224     idle_it = idle_stream_sockets_.begin();
225   }
226 
227   CHECK(idle_it != idle_stream_sockets_.end());
228 
229   std::unique_ptr<StreamSocket> stream_socket =
230       std::move(idle_it->stream_socket);
231   idle_stream_sockets_.erase(idle_it);
232   pool_->DecrementTotalIdleStreamCount();
233 
234   return stream_socket;
235 }
236 
ProcessPendingRequest()237 void HttpStreamPool::Group::ProcessPendingRequest() {
238   if (!attempt_manager_) {
239     return;
240   }
241   attempt_manager_->ProcessPendingJob();
242 }
243 
CloseOneIdleStreamSocket()244 bool HttpStreamPool::Group::CloseOneIdleStreamSocket() {
245   if (idle_stream_sockets_.empty()) {
246     return false;
247   }
248 
249   RecordNetLogClosingSocket(*idle_stream_sockets_.front().stream_socket,
250                             kExceededSocketLimits);
251   idle_stream_sockets_.pop_front();
252   pool_->DecrementTotalIdleStreamCount();
253   if (CanComplete()) {
254     // Use PostTask since MaybeComplete() may delete `this`, and this method
255     // could be called while iterating all groups.
256     base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
257         FROM_HERE,
258         base::BindOnce(&Group::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
259   }
260   return true;
261 }
262 
ConnectingStreamSocketCount() const263 size_t HttpStreamPool::Group::ConnectingStreamSocketCount() const {
264   return attempt_manager_ ? attempt_manager_->InFlightAttemptCount() : 0;
265 }
266 
ActiveStreamSocketCount() const267 size_t HttpStreamPool::Group::ActiveStreamSocketCount() const {
268   return handed_out_stream_count_ + idle_stream_sockets_.size() +
269          ConnectingStreamSocketCount();
270 }
271 
ReachedMaxStreamLimit() const272 bool HttpStreamPool::Group::ReachedMaxStreamLimit() const {
273   return ActiveStreamSocketCount() >= pool_->max_stream_sockets_per_group();
274 }
275 
276 std::optional<RequestPriority>
GetPriorityIfStalledByPoolLimit() const277 HttpStreamPool::Group::GetPriorityIfStalledByPoolLimit() const {
278   if (!attempt_manager_) {
279     return std::nullopt;
280   }
281 
282   return attempt_manager_->IsStalledByPoolLimit()
283              ? std::make_optional(attempt_manager_->GetPriority())
284              : std::nullopt;
285 }
286 
FlushWithError(int error,StreamCloseReason attempt_cancel_reason,std::string_view net_log_close_reason_utf8)287 void HttpStreamPool::Group::FlushWithError(
288     int error,
289     StreamCloseReason attempt_cancel_reason,
290     std::string_view net_log_close_reason_utf8) {
291   // Refresh() may delete this. Get a weak pointer to this and call CancelJobs()
292   // only when this is still alive.
293   base::WeakPtr<Group> weak_this = weak_ptr_factory_.GetWeakPtr();
294   Refresh(net_log_close_reason_utf8, attempt_cancel_reason);
295   if (weak_this) {
296     CancelJobs(error);
297   }
298 }
299 
Refresh(std::string_view net_log_close_reason_utf8,StreamCloseReason cancel_reason)300 void HttpStreamPool::Group::Refresh(std::string_view net_log_close_reason_utf8,
301                                     StreamCloseReason cancel_reason) {
302   ++generation_;
303   CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
304   if (attempt_manager_) {
305     attempt_manager_->CancelInFlightAttempts(cancel_reason);
306   }
307 }
308 
CloseIdleStreams(std::string_view net_log_close_reason_utf8)309 void HttpStreamPool::Group::CloseIdleStreams(
310     std::string_view net_log_close_reason_utf8) {
311   CleanupIdleStreamSockets(CleanupMode::kForce, net_log_close_reason_utf8);
312   if (CanComplete()) {
313     // Use PostTask since MaybeComplete() may delete `this`, and this method
314     // could be called while iterating all groups.
315     base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
316         FROM_HERE,
317         base::BindOnce(&Group::MaybeComplete, weak_ptr_factory_.GetWeakPtr()));
318   }
319 }
320 
CancelJobs(int error)321 void HttpStreamPool::Group::CancelJobs(int error) {
322   if (attempt_manager_) {
323     attempt_manager_->CancelJobs(error);
324   }
325 }
326 
OnRequiredHttp11()327 void HttpStreamPool::Group::OnRequiredHttp11() {
328   if (attempt_manager_) {
329     attempt_manager_->OnRequiredHttp11();
330   }
331 }
332 
OnAttemptManagerComplete()333 void HttpStreamPool::Group::OnAttemptManagerComplete() {
334   CHECK(attempt_manager_);
335   attempt_manager_.reset();
336   MaybeComplete();
337 }
338 
GetInfoAsValue() const339 base::Value::Dict HttpStreamPool::Group::GetInfoAsValue() const {
340   base::Value::Dict dict;
341   dict.Set("active_socket_count", static_cast<int>(ActiveStreamSocketCount()));
342   dict.Set("idle_socket_count", static_cast<int>(IdleStreamSocketCount()));
343   dict.Set("handed_out_socket_count",
344            static_cast<int>(HandedOutStreamSocketCount()));
345   if (attempt_manager_) {
346     dict.Set("attempt_state", attempt_manager_->GetInfoAsValue());
347   }
348   return dict;
349 }
350 
CleanupTimedoutIdleStreamSocketsForTesting()351 void HttpStreamPool::Group::CleanupTimedoutIdleStreamSocketsForTesting() {
352   CleanupIdleStreamSockets(CleanupMode::kTimeoutOnly, "For testing");
353 }
354 
CleanupIdleStreamSockets(CleanupMode mode,std::string_view net_log_close_reason_utf8)355 void HttpStreamPool::Group::CleanupIdleStreamSockets(
356     CleanupMode mode,
357     std::string_view net_log_close_reason_utf8) {
358   // Iterate though the idle sockets to delete any disconnected ones.
359   for (auto it = idle_stream_sockets_.begin();
360        it != idle_stream_sockets_.end();) {
361     bool should_delete = mode == CleanupMode::kForce;
362     const base::expected<void, std::string_view> usable_result =
363         IsIdleStreamSocketUsable(*it);
364     if (!usable_result.has_value()) {
365       should_delete = true;
366     }
367 
368     if (should_delete) {
369       RecordNetLogClosingSocket(*it->stream_socket, net_log_close_reason_utf8);
370       it = idle_stream_sockets_.erase(it);
371       pool_->DecrementTotalIdleStreamCount();
372     } else {
373       ++it;
374     }
375   }
376 }
377 
EnsureAttemptManager()378 void HttpStreamPool::Group::EnsureAttemptManager() {
379   if (attempt_manager_) {
380     return;
381   }
382   attempt_manager_ =
383       std::make_unique<AttemptManager>(this, http_network_session()->net_log());
384 }
385 
CanComplete() const386 bool HttpStreamPool::Group::CanComplete() const {
387   return ActiveStreamSocketCount() == 0 && !attempt_manager_;
388 }
389 
MaybeComplete()390 void HttpStreamPool::Group::MaybeComplete() {
391   if (!CanComplete()) {
392     return;
393   }
394 
395   pool_->OnGroupComplete(this);
396   // `this` is deleted.
397 }
398 
399 }  // namespace net
400