1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_session.h"
6
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10
11 #include "absl/memory/memory.h"
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
15 #include "quiche/quic/core/frames/quic_window_update_frame.h"
16 #include "quiche/quic/core/quic_connection.h"
17 #include "quiche/quic/core/quic_connection_context.h"
18 #include "quiche/quic/core/quic_error_codes.h"
19 #include "quiche/quic/core/quic_flow_controller.h"
20 #include "quiche/quic/core/quic_stream_priority.h"
21 #include "quiche/quic/core/quic_types.h"
22 #include "quiche/quic/core/quic_utils.h"
23 #include "quiche/quic/core/quic_versions.h"
24 #include "quiche/quic/core/quic_write_blocked_list.h"
25 #include "quiche/quic/platform/api/quic_bug_tracker.h"
26 #include "quiche/quic/platform/api/quic_flag_utils.h"
27 #include "quiche/quic/platform/api/quic_flags.h"
28 #include "quiche/quic/platform/api/quic_logging.h"
29 #include "quiche/quic/platform/api/quic_server_stats.h"
30 #include "quiche/quic/platform/api/quic_stack_trace.h"
31 #include "quiche/common/platform/api/quiche_logging.h"
32 #include "quiche/common/quiche_callbacks.h"
33 #include "quiche/common/quiche_text_utils.h"
34
35 namespace quic {
36
37 namespace {
38
39 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
40 public:
ClosedStreamsCleanUpDelegate(QuicSession * session)41 explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
42 : session_(session) {}
43 ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
44 ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
45 delete;
46
GetConnectionContext()47 QuicConnectionContext* GetConnectionContext() override {
48 return (session_->connection() == nullptr)
49 ? nullptr
50 : session_->connection()->context();
51 }
52
OnAlarm()53 void OnAlarm() override { session_->CleanUpClosedStreams(); }
54
55 private:
56 QuicSession* session_;
57 };
58
59 } // namespace
60
61 #define ENDPOINT \
62 (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
63
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)64 QuicSession::QuicSession(
65 QuicConnection* connection, Visitor* owner, const QuicConfig& config,
66 const ParsedQuicVersionVector& supported_versions,
67 QuicStreamCount num_expected_unidirectional_static_streams)
68 : QuicSession(connection, owner, config, supported_versions,
69 num_expected_unidirectional_static_streams, nullptr) {}
70
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams,std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)71 QuicSession::QuicSession(
72 QuicConnection* connection, Visitor* owner, const QuicConfig& config,
73 const ParsedQuicVersionVector& supported_versions,
74 QuicStreamCount num_expected_unidirectional_static_streams,
75 std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
76 : connection_(connection),
77 perspective_(connection->perspective()),
78 visitor_(owner),
79 write_blocked_streams_(std::make_unique<QuicWriteBlockedList>()),
80 config_(config),
81 stream_id_manager_(perspective(), connection->transport_version(),
82 kDefaultMaxStreamsPerConnection,
83 config_.GetMaxBidirectionalStreamsToSend()),
84 ietf_streamid_manager_(perspective(), connection->version(), this, 0,
85 num_expected_unidirectional_static_streams,
86 config_.GetMaxBidirectionalStreamsToSend(),
87 config_.GetMaxUnidirectionalStreamsToSend() +
88 num_expected_unidirectional_static_streams),
89 num_draining_streams_(0),
90 num_outgoing_draining_streams_(0),
91 num_static_streams_(0),
92 num_zombie_streams_(0),
93 flow_controller_(
94 this, QuicUtils::GetInvalidStreamId(connection->transport_version()),
95 /*is_connection_flow_controller*/ true,
96 connection->version().AllowsLowFlowControlLimits()
97 ? 0
98 : kMinimumFlowControlSendWindow,
99 config_.GetInitialSessionFlowControlWindowToSend(),
100 kSessionReceiveWindowLimit, perspective() == Perspective::IS_SERVER,
101 nullptr),
102 currently_writing_stream_id_(0),
103 transport_goaway_sent_(false),
104 transport_goaway_received_(false),
105 control_frame_manager_(this),
106 last_message_id_(0),
107 datagram_queue_(this, std::move(datagram_observer)),
108 closed_streams_clean_up_alarm_(nullptr),
109 supported_versions_(supported_versions),
110 is_configured_(false),
111 was_zero_rtt_rejected_(false),
112 liveness_testing_in_progress_(false),
113 limit_sending_max_streams_(
114 GetQuicReloadableFlag(quic_limit_sending_max_streams2)) {
115 closed_streams_clean_up_alarm_ =
116 absl::WrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
117 new ClosedStreamsCleanUpDelegate(this)));
118 if (VersionHasIetfQuicFrames(transport_version())) {
119 config_.SetMaxUnidirectionalStreamsToSend(
120 config_.GetMaxUnidirectionalStreamsToSend() +
121 num_expected_unidirectional_static_streams);
122 }
123 }
124
Initialize()125 void QuicSession::Initialize() {
126 connection_->set_visitor(this);
127 connection_->SetSessionNotifier(this);
128 connection_->SetDataProducer(this);
129 connection_->SetUnackedMapInitialCapacity();
130 connection_->SetFromConfig(config_);
131 if (perspective_ == Perspective::IS_CLIENT) {
132 if (config_.HasClientRequestedIndependentOption(kAFFE, perspective_) &&
133 version().HasIetfQuicFrames()) {
134 connection_->set_can_receive_ack_frequency_frame();
135 config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
136 }
137 }
138 if (perspective() == Perspective::IS_SERVER &&
139 connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
140 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
141 }
142
143 connection_->CreateConnectionIdManager();
144
145 // On the server side, version negotiation has been done by the dispatcher,
146 // and the server session is created with the right version.
147 if (perspective() == Perspective::IS_SERVER) {
148 connection_->OnSuccessfulVersionNegotiation();
149 }
150
151 if (QuicVersionUsesCryptoFrames(transport_version())) {
152 return;
153 }
154
155 QUICHE_DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
156 GetMutableCryptoStream()->id());
157 }
158
~QuicSession()159 QuicSession::~QuicSession() {
160 if (closed_streams_clean_up_alarm_ != nullptr) {
161 closed_streams_clean_up_alarm_->PermanentCancel();
162 }
163 }
164
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)165 PendingStream* QuicSession::PendingStreamOnStreamFrame(
166 const QuicStreamFrame& frame) {
167 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
168 QuicStreamId stream_id = frame.stream_id;
169
170 PendingStream* pending = GetOrCreatePendingStream(stream_id);
171
172 if (!pending) {
173 if (frame.fin) {
174 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
175 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
176 }
177 return nullptr;
178 }
179
180 pending->OnStreamFrame(frame);
181 if (!connection()->connected()) {
182 return nullptr;
183 }
184 return pending;
185 }
186
MaybeProcessPendingStream(PendingStream * pending)187 void QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
188 QUICHE_DCHECK(pending != nullptr);
189 QuicStreamId stream_id = pending->id();
190 std::optional<QuicResetStreamError> stop_sending_error_code =
191 pending->GetStopSendingErrorCode();
192 QuicStream* stream = ProcessPendingStream(pending);
193 if (stream != nullptr) {
194 // The pending stream should now be in the scope of normal streams.
195 QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
196 << "Stream " << stream_id << " not created";
197 pending_stream_map_.erase(stream_id);
198 if (stop_sending_error_code) {
199 stream->OnStopSending(*stop_sending_error_code);
200 if (!connection()->connected()) {
201 return;
202 }
203 }
204 stream->OnStreamCreatedFromPendingStream();
205 return;
206 }
207 // At this point, none of the bytes has been successfully consumed by the
208 // application layer. We should close the pending stream even if it is
209 // bidirectionl as no application will be able to write in a bidirectional
210 // stream with zero byte as input.
211 if (pending->sequencer()->IsClosed()) {
212 ClosePendingStream(stream_id);
213 }
214 }
215
PendingStreamOnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)216 void QuicSession::PendingStreamOnWindowUpdateFrame(
217 const QuicWindowUpdateFrame& frame) {
218 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
219 PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
220 if (pending) {
221 pending->OnWindowUpdateFrame(frame);
222 }
223 }
224
PendingStreamOnStopSendingFrame(const QuicStopSendingFrame & frame)225 void QuicSession::PendingStreamOnStopSendingFrame(
226 const QuicStopSendingFrame& frame) {
227 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
228 PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
229 if (pending) {
230 pending->OnStopSending(frame.error());
231 }
232 }
233
OnStreamFrame(const QuicStreamFrame & frame)234 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
235 QuicStreamId stream_id = frame.stream_id;
236 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
237 connection()->CloseConnection(
238 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
239 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
240 return;
241 }
242
243 if (ShouldProcessFrameByPendingStream(STREAM_FRAME, stream_id)) {
244 PendingStream* pending = PendingStreamOnStreamFrame(frame);
245 if (pending != nullptr && IsEncryptionEstablished()) {
246 MaybeProcessPendingStream(pending);
247 }
248 return;
249 }
250
251 QuicStream* stream = GetOrCreateStream(stream_id);
252
253 if (!stream) {
254 // The stream no longer exists, but we may still be interested in the
255 // final stream byte offset sent by the peer. A frame with a FIN can give
256 // us this offset.
257 if (frame.fin) {
258 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
259 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
260 }
261 return;
262 }
263 stream->OnStreamFrame(frame);
264 }
265
OnCryptoFrame(const QuicCryptoFrame & frame)266 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
267 GetMutableCryptoStream()->OnCryptoFrame(frame);
268 }
269
OnStopSendingFrame(const QuicStopSendingFrame & frame)270 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
271 // STOP_SENDING is in IETF QUIC only.
272 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
273 QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
274
275 QuicStreamId stream_id = frame.stream_id;
276 // If Stream ID is invalid then close the connection.
277 // TODO(ianswett): This check is redundant to checks for IsClosedStream,
278 // but removing it requires removing multiple QUICHE_DCHECKs.
279 // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
280 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
281 QUIC_DVLOG(1) << ENDPOINT
282 << "Received STOP_SENDING with invalid stream_id: "
283 << stream_id << " Closing connection";
284 connection()->CloseConnection(
285 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
286 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
287 return;
288 }
289
290 // If stream_id is READ_UNIDIRECTIONAL, close the connection.
291 if (QuicUtils::GetStreamType(stream_id, perspective(),
292 IsIncomingStream(stream_id),
293 version()) == READ_UNIDIRECTIONAL) {
294 QUIC_DVLOG(1) << ENDPOINT
295 << "Received STOP_SENDING for a read-only stream_id: "
296 << stream_id << ".";
297 connection()->CloseConnection(
298 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
299 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
300 return;
301 }
302
303 if (visitor_) {
304 visitor_->OnStopSendingReceived(frame);
305 }
306 if (ShouldProcessFrameByPendingStream(STOP_SENDING_FRAME, stream_id)) {
307 PendingStreamOnStopSendingFrame(frame);
308 return;
309 }
310
311 QuicStream* stream = GetOrCreateStream(stream_id);
312 if (!stream) {
313 // Errors are handled by GetOrCreateStream.
314 return;
315 }
316
317 stream->OnStopSending(frame.error());
318 }
319
OnPacketDecrypted(EncryptionLevel level)320 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
321 GetMutableCryptoStream()->OnPacketDecrypted(level);
322 if (liveness_testing_in_progress_) {
323 liveness_testing_in_progress_ = false;
324 OnCanCreateNewOutgoingStream(/*unidirectional=*/false);
325 }
326 }
327
OnOneRttPacketAcknowledged()328 void QuicSession::OnOneRttPacketAcknowledged() {
329 GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
330 }
331
OnHandshakePacketSent()332 void QuicSession::OnHandshakePacketSent() {
333 GetMutableCryptoStream()->OnHandshakePacketSent();
334 }
335
336 std::unique_ptr<QuicDecrypter>
AdvanceKeysAndCreateCurrentOneRttDecrypter()337 QuicSession::AdvanceKeysAndCreateCurrentOneRttDecrypter() {
338 return GetMutableCryptoStream()->AdvanceKeysAndCreateCurrentOneRttDecrypter();
339 }
340
CreateCurrentOneRttEncrypter()341 std::unique_ptr<QuicEncrypter> QuicSession::CreateCurrentOneRttEncrypter() {
342 return GetMutableCryptoStream()->CreateCurrentOneRttEncrypter();
343 }
344
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)345 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
346 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
347 QuicStreamId stream_id = frame.stream_id;
348
349 PendingStream* pending = GetOrCreatePendingStream(stream_id);
350
351 if (!pending) {
352 HandleRstOnValidNonexistentStream(frame);
353 return;
354 }
355
356 pending->OnRstStreamFrame(frame);
357 // At this point, none of the bytes has been consumed by the application
358 // layer. It is safe to close the pending stream even if it is bidirectionl as
359 // no application will be able to write in a bidirectional stream with zero
360 // byte as input.
361 ClosePendingStream(stream_id);
362 }
363
OnRstStream(const QuicRstStreamFrame & frame)364 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
365 QuicStreamId stream_id = frame.stream_id;
366 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
367 connection()->CloseConnection(
368 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
369 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
370 return;
371 }
372
373 if (VersionHasIetfQuicFrames(transport_version()) &&
374 QuicUtils::GetStreamType(stream_id, perspective(),
375 IsIncomingStream(stream_id),
376 version()) == WRITE_UNIDIRECTIONAL) {
377 connection()->CloseConnection(
378 QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
379 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
380 return;
381 }
382
383 if (visitor_) {
384 visitor_->OnRstStreamReceived(frame);
385 }
386
387 if (ShouldProcessFrameByPendingStream(RST_STREAM_FRAME, stream_id)) {
388 PendingStreamOnRstStream(frame);
389 return;
390 }
391
392 QuicStream* stream = GetOrCreateStream(stream_id);
393
394 if (!stream) {
395 HandleRstOnValidNonexistentStream(frame);
396 return; // Errors are handled by GetOrCreateStream.
397 }
398 stream->OnStreamReset(frame);
399 }
400
OnGoAway(const QuicGoAwayFrame &)401 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
402 QUIC_BUG_IF(quic_bug_12435_1, version().UsesHttp3())
403 << "gQUIC GOAWAY received on version " << version();
404
405 transport_goaway_received_ = true;
406 }
407
OnMessageReceived(absl::string_view message)408 void QuicSession::OnMessageReceived(absl::string_view message) {
409 QUIC_DVLOG(1) << ENDPOINT << "Received message of length "
410 << message.length();
411 QUIC_DVLOG(2) << ENDPOINT << "Contents of message of length "
412 << message.length() << ":" << std::endl
413 << quiche::QuicheTextUtils::HexDump(message);
414 }
415
OnHandshakeDoneReceived()416 void QuicSession::OnHandshakeDoneReceived() {
417 QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
418 GetMutableCryptoStream()->OnHandshakeDoneReceived();
419 }
420
OnNewTokenReceived(absl::string_view token)421 void QuicSession::OnNewTokenReceived(absl::string_view token) {
422 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
423 GetMutableCryptoStream()->OnNewTokenReceived(token);
424 }
425
426 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)427 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
428 ConnectionCloseSource source) {
429 if (error != QUIC_NO_ERROR) {
430 if (source == ConnectionCloseSource::FROM_SELF) {
431 QUIC_SERVER_HISTOGRAM_ENUM(
432 "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
433 "QuicErrorCode for server-closed connections.");
434 } else {
435 QUIC_SERVER_HISTOGRAM_ENUM(
436 "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
437 "QuicErrorCode for client-closed connections.");
438 }
439 }
440 }
441
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)442 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
443 ConnectionCloseSource source) {
444 QUICHE_DCHECK(!connection_->connected());
445 if (perspective() == Perspective::IS_SERVER) {
446 RecordConnectionCloseAtServer(frame.quic_error_code, source);
447 }
448
449 if (on_closed_frame_.quic_error_code == QUIC_NO_ERROR) {
450 // Save all of the connection close information
451 on_closed_frame_ = frame;
452 source_ = source;
453 }
454
455 GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
456
457 PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
458 QuicStreamId id = stream->id();
459 stream->OnConnectionClosed(frame.quic_error_code, source);
460 auto it = stream_map_.find(id);
461 if (it != stream_map_.end()) {
462 QUIC_BUG_IF(quic_bug_12435_2, !it->second->IsZombie())
463 << ENDPOINT << "Non-zombie stream " << id
464 << " failed to close under OnConnectionClosed";
465 }
466 return true;
467 });
468
469 closed_streams_clean_up_alarm_->Cancel();
470
471 if (visitor_) {
472 visitor_->OnConnectionClosed(connection_->GetOneActiveServerConnectionId(),
473 frame.quic_error_code, frame.error_details,
474 source);
475 }
476 }
477
OnWriteBlocked()478 void QuicSession::OnWriteBlocked() {
479 if (!connection_->connected()) {
480 return;
481 }
482 if (visitor_) {
483 visitor_->OnWriteBlocked(connection_);
484 }
485 }
486
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)487 void QuicSession::OnSuccessfulVersionNegotiation(
488 const ParsedQuicVersion& /*version*/) {}
489
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)490 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
491 const QuicSocketAddress& peer_address,
492 bool is_connectivity_probe) {
493 QUICHE_DCHECK(!connection_->ignore_gquic_probing());
494 if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
495 // Server only sends back a connectivity probe after received a
496 // connectivity probe from a new peer address.
497 connection_->SendConnectivityProbingPacket(nullptr, peer_address);
498 }
499 }
500
OnPathDegrading()501 void QuicSession::OnPathDegrading() {}
502
OnForwardProgressMadeAfterPathDegrading()503 void QuicSession::OnForwardProgressMadeAfterPathDegrading() {}
504
AllowSelfAddressChange() const505 bool QuicSession::AllowSelfAddressChange() const { return false; }
506
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)507 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
508 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
509 // assume that it still exists.
510 QuicStreamId stream_id = frame.stream_id;
511 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
512 // This is a window update that applies to the connection, rather than an
513 // individual stream.
514 QUIC_DVLOG(1) << ENDPOINT
515 << "Received connection level flow control window "
516 "update with max data: "
517 << frame.max_data;
518 flow_controller_.UpdateSendWindowOffset(frame.max_data);
519 return;
520 }
521
522 if (VersionHasIetfQuicFrames(transport_version()) &&
523 QuicUtils::GetStreamType(stream_id, perspective(),
524 IsIncomingStream(stream_id),
525 version()) == READ_UNIDIRECTIONAL) {
526 connection()->CloseConnection(
527 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
528 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
529 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
530 return;
531 }
532
533 if (ShouldProcessFrameByPendingStream(WINDOW_UPDATE_FRAME, stream_id)) {
534 PendingStreamOnWindowUpdateFrame(frame);
535 return;
536 }
537
538 QuicStream* stream = GetOrCreateStream(stream_id);
539 if (stream != nullptr) {
540 stream->OnWindowUpdateFrame(frame);
541 }
542 }
543
OnBlockedFrame(const QuicBlockedFrame & frame)544 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
545 // TODO(rjshade): Compare our flow control receive windows for specified
546 // streams: if we have a large window then maybe something
547 // had gone wrong with the flow control accounting.
548 QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
549 << frame.stream_id << ", offset: " << frame.offset;
550 }
551
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)552 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
553 uint64_t previous_bytes_written,
554 bool previous_fin_sent) {
555 if ( // Stream should not be closed.
556 !stream->write_side_closed() &&
557 // Not connection flow control blocked.
558 !flow_controller_.IsBlocked() &&
559 // Detect lack of forward progress.
560 previous_bytes_written == stream->stream_bytes_written() &&
561 previous_fin_sent == stream->fin_sent()) {
562 stream->set_busy_counter(stream->busy_counter() + 1);
563 QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
564 << stream->id() << " stream_bytes_written "
565 << stream->stream_bytes_written() << " fin "
566 << stream->fin_sent() << " count " << stream->busy_counter();
567 // Wait a few iterations before firing, the exact count is
568 // arbitrary, more than a few to cover a few test-only false
569 // positives.
570 if (stream->busy_counter() > 20) {
571 QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
572 << stream->id() << " stream_bytes_written "
573 << stream->stream_bytes_written() << " fin "
574 << stream->fin_sent();
575 return false;
576 }
577 } else {
578 stream->set_busy_counter(0);
579 }
580 return true;
581 }
582
CheckStreamWriteBlocked(QuicStream * stream) const583 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
584 if (!stream->write_side_closed() && stream->HasBufferedData() &&
585 !stream->IsFlowControlBlocked() &&
586 !write_blocked_streams_->IsStreamBlocked(stream->id())) {
587 QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
588 << " has buffered " << stream->BufferedDataBytes()
589 << " bytes, and is not flow control blocked, "
590 "but it is not in the write block list.";
591 return false;
592 }
593 return true;
594 }
595
OnCanWrite()596 void QuicSession::OnCanWrite() {
597 if (connection_->framer().is_processing_packet()) {
598 // Do not write data in the middle of packet processing because rest
599 // frames in the packet may change the data to write. For example, lost
600 // data could be acknowledged. Also, connection is going to emit
601 // OnCanWrite signal post packet processing.
602 QUIC_BUG(session_write_mid_packet_processing)
603 << ENDPOINT << "Try to write mid packet processing.";
604 return;
605 }
606 if (!RetransmitLostData()) {
607 // Cannot finish retransmitting lost data, connection is write blocked.
608 QUIC_DVLOG(1) << ENDPOINT
609 << "Cannot finish retransmitting lost data, connection is "
610 "write blocked.";
611 return;
612 }
613 // We limit the number of writes to the number of pending streams. If more
614 // streams become pending, WillingAndAbleToWrite will be true, which will
615 // cause the connection to request resumption before yielding to other
616 // connections.
617 // If we are connection level flow control blocked, then only allow the
618 // crypto and headers streams to try writing as all other streams will be
619 // blocked.
620 size_t num_writes = flow_controller_.IsBlocked()
621 ? write_blocked_streams_->NumBlockedSpecialStreams()
622 : write_blocked_streams_->NumBlockedStreams();
623 if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
624 datagram_queue_.empty() &&
625 (!QuicVersionUsesCryptoFrames(transport_version()) ||
626 !GetCryptoStream()->HasBufferedCryptoFrames())) {
627 return;
628 }
629
630 QuicConnection::ScopedPacketFlusher flusher(connection_);
631 if (QuicVersionUsesCryptoFrames(transport_version())) {
632 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
633 if (crypto_stream->HasBufferedCryptoFrames()) {
634 crypto_stream->WriteBufferedCryptoFrames();
635 }
636 if ((GetQuicReloadableFlag(
637 quic_no_write_control_frame_upon_connection_close) &&
638 !connection_->connected()) ||
639 crypto_stream->HasBufferedCryptoFrames()) {
640 // Cannot finish writing buffered crypto frames, connection is either
641 // write blocked or closed.
642 return;
643 }
644 }
645 if (control_frame_manager_.WillingToWrite()) {
646 control_frame_manager_.OnCanWrite();
647 }
648 if (version().UsesTls() && GetHandshakeState() != HANDSHAKE_CONFIRMED &&
649 connection_->in_probe_time_out()) {
650 QUIC_CODE_COUNT(quic_donot_pto_stream_data_before_handshake_confirmed);
651 // Do not PTO stream data before handshake gets confirmed.
652 return;
653 }
654 // TODO(b/147146815): this makes all datagrams go before stream data. We
655 // should have a better priority scheme for this.
656 if (!datagram_queue_.empty()) {
657 size_t written = datagram_queue_.SendDatagrams();
658 QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
659 if (!datagram_queue_.empty()) {
660 return;
661 }
662 }
663 std::vector<QuicStreamId> last_writing_stream_ids;
664 for (size_t i = 0; i < num_writes; ++i) {
665 if (!(write_blocked_streams_->HasWriteBlockedSpecialStream() ||
666 write_blocked_streams_->HasWriteBlockedDataStreams())) {
667 // Writing one stream removed another!? Something's broken.
668 QUIC_BUG(quic_bug_10866_1)
669 << "WriteBlockedStream is missing, num_writes: " << num_writes
670 << ", finished_writes: " << i
671 << ", connected: " << connection_->connected()
672 << ", connection level flow control blocked: "
673 << flow_controller_.IsBlocked();
674 for (QuicStreamId id : last_writing_stream_ids) {
675 QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
676 }
677 connection_->CloseConnection(QUIC_INTERNAL_ERROR,
678 "WriteBlockedStream is missing",
679 ConnectionCloseBehavior::SILENT_CLOSE);
680 return;
681 }
682 if (!CanWriteStreamData()) {
683 return;
684 }
685 currently_writing_stream_id_ = write_blocked_streams_->PopFront();
686 last_writing_stream_ids.push_back(currently_writing_stream_id_);
687 QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
688 << currently_writing_stream_id_ << " from write-blocked list";
689 QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
690 if (stream != nullptr && !stream->IsFlowControlBlocked()) {
691 // If the stream can't write all bytes it'll re-add itself to the blocked
692 // list.
693 uint64_t previous_bytes_written = stream->stream_bytes_written();
694 bool previous_fin_sent = stream->fin_sent();
695 QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
696 << " bytes_written " << previous_bytes_written << " fin "
697 << previous_fin_sent;
698 stream->OnCanWrite();
699 QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
700 QUICHE_DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
701 previous_fin_sent));
702 }
703 currently_writing_stream_id_ = 0;
704 }
705 }
706
WillingAndAbleToWrite() const707 bool QuicSession::WillingAndAbleToWrite() const {
708 // Schedule a write when:
709 // 1) control frame manager has pending or new control frames, or
710 // 2) any stream has pending retransmissions, or
711 // 3) If the crypto or headers streams are blocked, or
712 // 4) connection is not flow control blocked and there are write blocked
713 // streams.
714 if (QuicVersionUsesCryptoFrames(transport_version())) {
715 if (HasPendingHandshake()) {
716 return true;
717 }
718 if (!IsEncryptionEstablished()) {
719 return false;
720 }
721 }
722 if (control_frame_manager_.WillingToWrite() ||
723 !streams_with_pending_retransmission_.empty()) {
724 return true;
725 }
726 if (flow_controller_.IsBlocked()) {
727 if (VersionUsesHttp3(transport_version())) {
728 return false;
729 }
730 // Crypto and headers streams are not blocked by connection level flow
731 // control.
732 return write_blocked_streams_->HasWriteBlockedSpecialStream();
733 }
734 return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
735 write_blocked_streams_->HasWriteBlockedDataStreams();
736 }
737
GetStreamsInfoForLogging() const738 std::string QuicSession::GetStreamsInfoForLogging() const {
739 std::string info = absl::StrCat(
740 "num_active_streams: ", GetNumActiveStreams(),
741 ", num_pending_streams: ", pending_streams_size(),
742 ", num_outgoing_draining_streams: ", num_outgoing_draining_streams(),
743 " ");
744 // Log info for up to 5 streams.
745 size_t i = 5;
746 for (const auto& it : stream_map_) {
747 if (it.second->is_static()) {
748 continue;
749 }
750 // Calculate the stream creation delay.
751 const QuicTime::Delta delay =
752 connection_->clock()->ApproximateNow() - it.second->creation_time();
753 absl::StrAppend(
754 &info, "{", it.second->id(), ":", delay.ToDebuggingValue(), ";",
755 it.second->stream_bytes_written(), ",", it.second->fin_sent(), ",",
756 it.second->HasBufferedData(), ",", it.second->fin_buffered(), ";",
757 it.second->stream_bytes_read(), ",", it.second->fin_received(), "}");
758 --i;
759 if (i == 0) {
760 break;
761 }
762 }
763 return info;
764 }
765
HasPendingHandshake() const766 bool QuicSession::HasPendingHandshake() const {
767 if (QuicVersionUsesCryptoFrames(transport_version())) {
768 return GetCryptoStream()->HasPendingCryptoRetransmission() ||
769 GetCryptoStream()->HasBufferedCryptoFrames();
770 }
771 return streams_with_pending_retransmission_.contains(
772 QuicUtils::GetCryptoStreamId(transport_version())) ||
773 write_blocked_streams_->IsStreamBlocked(
774 QuicUtils::GetCryptoStreamId(transport_version()));
775 }
776
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)777 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
778 const QuicSocketAddress& peer_address,
779 const QuicReceivedPacket& packet) {
780 QuicConnectionContextSwitcher cs(connection_->context());
781 connection_->ProcessUdpPacket(self_address, peer_address, packet);
782 }
783
on_closed_frame_string() const784 std::string QuicSession::on_closed_frame_string() const {
785 std::stringstream ss;
786 ss << on_closed_frame_;
787 if (source_.has_value()) {
788 ss << " " << ConnectionCloseSourceToString(*source_);
789 }
790 return ss.str();
791 }
792
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,EncryptionLevel level)793 QuicConsumedData QuicSession::WritevData(QuicStreamId id, size_t write_length,
794 QuicStreamOffset offset,
795 StreamSendingState state,
796 TransmissionType type,
797 EncryptionLevel level) {
798 QUIC_BUG_IF(session writevdata when disconnected, !connection()->connected())
799 << ENDPOINT << "Try to write stream data when connection is closed: "
800 << on_closed_frame_string();
801 if (!IsEncryptionEstablished() &&
802 !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
803 // Do not let streams write without encryption. The calling stream will end
804 // up write blocked until OnCanWrite is next called.
805 if (was_zero_rtt_rejected_ && !OneRttKeysAvailable()) {
806 QUICHE_DCHECK(version().UsesTls() &&
807 perspective() == Perspective::IS_CLIENT);
808 QUIC_DLOG(INFO) << ENDPOINT
809 << "Suppress the write while 0-RTT gets rejected and "
810 "1-RTT keys are not available. Version: "
811 << ParsedQuicVersionToString(version());
812 } else if (version().UsesTls() || perspective() == Perspective::IS_SERVER) {
813 QUIC_BUG(quic_bug_10866_2)
814 << ENDPOINT << "Try to send data of stream " << id
815 << " before encryption is established. Version: "
816 << ParsedQuicVersionToString(version());
817 } else {
818 // In QUIC crypto, this could happen when the client sends full CHLO and
819 // 0-RTT request, then receives an inchoate REJ and sends an inchoate
820 // CHLO. The client then gets the ACK of the inchoate CHLO or the client
821 // gets the full REJ and needs to verify the proof (before it sends the
822 // full CHLO), such that there is no outstanding crypto data.
823 // Retransmission alarm fires in TLP mode which tries to retransmit the
824 // 0-RTT request (without encryption).
825 QUIC_DLOG(INFO) << ENDPOINT << "Try to send data of stream " << id
826 << " before encryption is established.";
827 }
828 return QuicConsumedData(0, false);
829 }
830
831 SetTransmissionType(type);
832 QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
833
834 QuicConsumedData data =
835 connection_->SendStreamData(id, write_length, offset, state);
836 if (type == NOT_RETRANSMISSION) {
837 // This is new stream data.
838 write_blocked_streams_->UpdateBytesForStream(id, data.bytes_consumed);
839 }
840
841 return data;
842 }
843
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)844 size_t QuicSession::SendCryptoData(EncryptionLevel level, size_t write_length,
845 QuicStreamOffset offset,
846 TransmissionType type) {
847 QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
848 if (!connection()->framer().HasEncrypterOfEncryptionLevel(level)) {
849 const std::string error_details = absl::StrCat(
850 "Try to send crypto data with missing keys of encryption level: ",
851 EncryptionLevelToString(level));
852 QUIC_BUG(quic_bug_10866_3) << ENDPOINT << error_details;
853 connection()->CloseConnection(
854 QUIC_MISSING_WRITE_KEYS, error_details,
855 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
856 return 0;
857 }
858 SetTransmissionType(type);
859 QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
860 const auto bytes_consumed =
861 connection_->SendCryptoData(level, write_length, offset);
862 return bytes_consumed;
863 }
864
OnControlFrameManagerError(QuicErrorCode error_code,std::string error_details)865 void QuicSession::OnControlFrameManagerError(QuicErrorCode error_code,
866 std::string error_details) {
867 connection_->CloseConnection(
868 error_code, error_details,
869 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
870 }
871
WriteControlFrame(const QuicFrame & frame,TransmissionType type)872 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
873 TransmissionType type) {
874 QUIC_BUG_IF(quic_bug_12435_11, !connection()->connected())
875 << ENDPOINT
876 << absl::StrCat("Try to write control frame: ", QuicFrameToString(frame),
877 " when connection is closed: ")
878 << on_closed_frame_string();
879 if (!IsEncryptionEstablished()) {
880 // Suppress the write before encryption gets established.
881 return false;
882 }
883 if (limit_sending_max_streams_ &&
884 connection_->framer().is_processing_packet()) {
885 QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 3, 3);
886 // The frame will be sent when OnCanWrite() is called.
887 return false;
888 }
889 SetTransmissionType(type);
890 QuicConnection::ScopedEncryptionLevelContext context(
891 connection(), GetEncryptionLevelToSendApplicationData());
892 return connection_->SendControlFrame(frame);
893 }
894
ResetStream(QuicStreamId id,QuicRstStreamErrorCode error)895 void QuicSession::ResetStream(QuicStreamId id, QuicRstStreamErrorCode error) {
896 QuicStream* stream = GetStream(id);
897 if (stream != nullptr && stream->is_static()) {
898 connection()->CloseConnection(
899 QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
900 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
901 return;
902 }
903
904 if (stream != nullptr) {
905 stream->Reset(error);
906 return;
907 }
908
909 QuicConnection::ScopedPacketFlusher flusher(connection());
910 MaybeSendStopSendingFrame(id, QuicResetStreamError::FromInternal(error));
911 MaybeSendRstStreamFrame(id, QuicResetStreamError::FromInternal(error), 0);
912 }
913
MaybeSendRstStreamFrame(QuicStreamId id,QuicResetStreamError error,QuicStreamOffset bytes_written)914 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
915 QuicResetStreamError error,
916 QuicStreamOffset bytes_written) {
917 if (!connection()->connected()) {
918 return;
919 }
920 if (!VersionHasIetfQuicFrames(transport_version()) ||
921 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
922 version()) != READ_UNIDIRECTIONAL) {
923 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
924 }
925
926 connection_->OnStreamReset(id, error.internal_code());
927 }
928
MaybeSendStopSendingFrame(QuicStreamId id,QuicResetStreamError error)929 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
930 QuicResetStreamError error) {
931 if (!connection()->connected()) {
932 return;
933 }
934 if (VersionHasIetfQuicFrames(transport_version()) &&
935 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
936 version()) != WRITE_UNIDIRECTIONAL) {
937 control_frame_manager_.WriteOrBufferStopSending(error, id);
938 }
939 }
940
SendGoAway(QuicErrorCode error_code,const std::string & reason)941 void QuicSession::SendGoAway(QuicErrorCode error_code,
942 const std::string& reason) {
943 // GOAWAY frame is not supported in IETF QUIC.
944 QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
945 if (!IsEncryptionEstablished()) {
946 QUIC_CODE_COUNT(quic_goaway_before_encryption_established);
947 connection_->CloseConnection(
948 error_code, reason,
949 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
950 return;
951 }
952 if (transport_goaway_sent_) {
953 return;
954 }
955 transport_goaway_sent_ = true;
956
957 QUICHE_DCHECK_EQ(perspective(), Perspective::IS_SERVER);
958 control_frame_manager_.WriteOrBufferGoAway(
959 error_code,
960 QuicUtils::GetMaxClientInitiatedBidirectionalStreamId(
961 transport_version()),
962 reason);
963 }
964
SendBlocked(QuicStreamId id,QuicStreamOffset byte_offset)965 void QuicSession::SendBlocked(QuicStreamId id, QuicStreamOffset byte_offset) {
966 control_frame_manager_.WriteOrBufferBlocked(id, byte_offset);
967 }
968
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)969 void QuicSession::SendWindowUpdate(QuicStreamId id,
970 QuicStreamOffset byte_offset) {
971 control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
972 }
973
OnStreamError(QuicErrorCode error_code,std::string error_details)974 void QuicSession::OnStreamError(QuicErrorCode error_code,
975 std::string error_details) {
976 connection_->CloseConnection(
977 error_code, error_details,
978 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
979 }
980
OnStreamError(QuicErrorCode error_code,QuicIetfTransportErrorCodes ietf_error,std::string error_details)981 void QuicSession::OnStreamError(QuicErrorCode error_code,
982 QuicIetfTransportErrorCodes ietf_error,
983 std::string error_details) {
984 connection_->CloseConnection(
985 error_code, ietf_error, error_details,
986 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
987 }
988
CanSendMaxStreams()989 bool QuicSession::CanSendMaxStreams() {
990 if (!limit_sending_max_streams_) {
991 return true;
992 }
993 QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 1, 3);
994 return control_frame_manager_.NumBufferedMaxStreams() < 2;
995 }
996
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)997 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
998 bool unidirectional) {
999 if (!is_configured_) {
1000 QUIC_BUG(quic_bug_10866_5)
1001 << "Try to send max streams before config negotiated.";
1002 return;
1003 }
1004 control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
1005 }
1006
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)1007 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
1008 const QuicStreamId id, QuicStreamOffset offset) {
1009 locally_closed_streams_highest_offset_[id] = offset;
1010 }
1011
OnStreamClosed(QuicStreamId stream_id)1012 void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
1013 QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
1014 StreamMap::iterator it = stream_map_.find(stream_id);
1015 if (it == stream_map_.end()) {
1016 QUIC_BUG(quic_bug_10866_6)
1017 << ENDPOINT << "Stream is already closed: " << stream_id;
1018 return;
1019 }
1020 QuicStream* stream = it->second.get();
1021 StreamType type = stream->type();
1022
1023 const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
1024 if (stream_waiting_for_acks) {
1025 // The stream needs to be kept alive because it's waiting for acks.
1026 ++num_zombie_streams_;
1027 } else {
1028 closed_streams_.push_back(std::move(it->second));
1029 stream_map_.erase(it);
1030 // Do not retransmit data of a closed stream.
1031 streams_with_pending_retransmission_.erase(stream_id);
1032 if (!closed_streams_clean_up_alarm_->IsSet()) {
1033 closed_streams_clean_up_alarm_->Set(
1034 connection_->clock()->ApproximateNow());
1035 }
1036 connection_->QuicBugIfHasPendingFrames(stream_id);
1037 }
1038
1039 if (!stream->HasReceivedFinalOffset()) {
1040 // If we haven't received a FIN or RST for this stream, we need to keep
1041 // track of the how many bytes the stream's flow controller believes it has
1042 // received, for accurate connection level flow control accounting.
1043 // If this is an outgoing stream, it is technically open from peer's
1044 // perspective. Do not inform stream Id manager yet.
1045 QUICHE_DCHECK(!stream->was_draining());
1046 InsertLocallyClosedStreamsHighestOffset(
1047 stream_id, stream->highest_received_byte_offset());
1048 return;
1049 }
1050
1051 const bool stream_was_draining = stream->was_draining();
1052 QUIC_DVLOG_IF(1, stream_was_draining)
1053 << ENDPOINT << "Stream " << stream_id << " was draining";
1054 if (stream_was_draining) {
1055 QUIC_BUG_IF(quic_bug_12435_4, num_draining_streams_ == 0);
1056 --num_draining_streams_;
1057 if (!IsIncomingStream(stream_id)) {
1058 QUIC_BUG_IF(quic_bug_12435_5, num_outgoing_draining_streams_ == 0);
1059 --num_outgoing_draining_streams_;
1060 }
1061 // Stream Id manager has been informed with draining streams.
1062 return;
1063 }
1064 if (!VersionHasIetfQuicFrames(transport_version())) {
1065 stream_id_manager_.OnStreamClosed(
1066 /*is_incoming=*/IsIncomingStream(stream_id));
1067 }
1068 if (!connection_->connected()) {
1069 return;
1070 }
1071 if (IsIncomingStream(stream_id)) {
1072 // Stream Id manager is only interested in peer initiated stream IDs.
1073 if (VersionHasIetfQuicFrames(transport_version())) {
1074 ietf_streamid_manager_.OnStreamClosed(stream_id);
1075 }
1076 return;
1077 }
1078 if (!VersionHasIetfQuicFrames(transport_version())) {
1079 OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
1080 }
1081 }
1082
ClosePendingStream(QuicStreamId stream_id)1083 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
1084 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
1085 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1086 pending_stream_map_.erase(stream_id);
1087 if (connection_->connected()) {
1088 ietf_streamid_manager_.OnStreamClosed(stream_id);
1089 }
1090 }
1091
ShouldProcessFrameByPendingStream(QuicFrameType type,QuicStreamId id) const1092 bool QuicSession::ShouldProcessFrameByPendingStream(QuicFrameType type,
1093 QuicStreamId id) const {
1094 return UsesPendingStreamForFrame(type, id) &&
1095 stream_map_.find(id) == stream_map_.end();
1096 }
1097
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)1098 void QuicSession::OnFinalByteOffsetReceived(
1099 QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
1100 auto it = locally_closed_streams_highest_offset_.find(stream_id);
1101 if (it == locally_closed_streams_highest_offset_.end()) {
1102 return;
1103 }
1104
1105 QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
1106 << final_byte_offset << " for stream " << stream_id;
1107 QuicByteCount offset_diff = final_byte_offset - it->second;
1108 if (flow_controller_.UpdateHighestReceivedOffset(
1109 flow_controller_.highest_received_byte_offset() + offset_diff)) {
1110 // If the final offset violates flow control, close the connection now.
1111 if (flow_controller_.FlowControlViolation()) {
1112 connection_->CloseConnection(
1113 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
1114 "Connection level flow control violation",
1115 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1116 return;
1117 }
1118 }
1119
1120 flow_controller_.AddBytesConsumed(offset_diff);
1121 locally_closed_streams_highest_offset_.erase(it);
1122 if (!VersionHasIetfQuicFrames(transport_version())) {
1123 stream_id_manager_.OnStreamClosed(
1124 /*is_incoming=*/IsIncomingStream(stream_id));
1125 }
1126 if (IsIncomingStream(stream_id)) {
1127 if (VersionHasIetfQuicFrames(transport_version())) {
1128 ietf_streamid_manager_.OnStreamClosed(stream_id);
1129 }
1130 } else if (!VersionHasIetfQuicFrames(transport_version())) {
1131 OnCanCreateNewOutgoingStream(false);
1132 }
1133 }
1134
IsEncryptionEstablished() const1135 bool QuicSession::IsEncryptionEstablished() const {
1136 if (GetCryptoStream() == nullptr) {
1137 return false;
1138 }
1139 return GetCryptoStream()->encryption_established();
1140 }
1141
OneRttKeysAvailable() const1142 bool QuicSession::OneRttKeysAvailable() const {
1143 if (GetCryptoStream() == nullptr) {
1144 return false;
1145 }
1146 return GetCryptoStream()->one_rtt_keys_available();
1147 }
1148
OnConfigNegotiated()1149 void QuicSession::OnConfigNegotiated() {
1150 // In versions with TLS, the configs will be set twice if 0-RTT is available.
1151 // In the second config setting, 1-RTT keys are guaranteed to be available.
1152 if (version().UsesTls() && is_configured_ &&
1153 connection_->encryption_level() != ENCRYPTION_FORWARD_SECURE) {
1154 QUIC_BUG(quic_bug_12435_6)
1155 << ENDPOINT
1156 << "1-RTT keys missing when config is negotiated for the second time.";
1157 connection_->CloseConnection(
1158 QUIC_INTERNAL_ERROR,
1159 "1-RTT keys missing when config is negotiated for the second time.",
1160 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1161 return;
1162 }
1163
1164 QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
1165 connection_->SetFromConfig(config_);
1166
1167 if (VersionHasIetfQuicFrames(transport_version())) {
1168 uint32_t max_streams = 0;
1169 if (config_.HasReceivedMaxBidirectionalStreams()) {
1170 max_streams = config_.ReceivedMaxBidirectionalStreams();
1171 }
1172 if (was_zero_rtt_rejected_ &&
1173 max_streams <
1174 ietf_streamid_manager_.outgoing_bidirectional_stream_count()) {
1175 connection_->CloseConnection(
1176 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1177 absl::StrCat(
1178 "Server rejected 0-RTT, aborting because new bidirectional "
1179 "initial stream limit ",
1180 max_streams, " is less than current open streams: ",
1181 ietf_streamid_manager_.outgoing_bidirectional_stream_count()),
1182 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1183 return;
1184 }
1185 QUIC_DVLOG(1) << ENDPOINT
1186 << "Setting Bidirectional outgoing_max_streams_ to "
1187 << max_streams;
1188 if (perspective_ == Perspective::IS_CLIENT &&
1189 max_streams <
1190 ietf_streamid_manager_.max_outgoing_bidirectional_streams()) {
1191 connection_->CloseConnection(
1192 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1193 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1194 absl::StrCat(
1195 was_zero_rtt_rejected_
1196 ? "Server rejected 0-RTT, aborting because "
1197 : "",
1198 "new bidirectional limit ", max_streams,
1199 " decreases the current limit: ",
1200 ietf_streamid_manager_.max_outgoing_bidirectional_streams()),
1201 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1202 return;
1203 }
1204 if (ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1205 max_streams)) {
1206 OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1207 }
1208
1209 max_streams = 0;
1210 if (config_.HasReceivedMaxUnidirectionalStreams()) {
1211 max_streams = config_.ReceivedMaxUnidirectionalStreams();
1212 }
1213
1214 if (was_zero_rtt_rejected_ &&
1215 max_streams <
1216 ietf_streamid_manager_.outgoing_unidirectional_stream_count()) {
1217 connection_->CloseConnection(
1218 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1219 absl::StrCat(
1220 "Server rejected 0-RTT, aborting because new unidirectional "
1221 "initial stream limit ",
1222 max_streams, " is less than current open streams: ",
1223 ietf_streamid_manager_.outgoing_unidirectional_stream_count()),
1224 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1225 return;
1226 }
1227
1228 if (max_streams <
1229 ietf_streamid_manager_.max_outgoing_unidirectional_streams()) {
1230 connection_->CloseConnection(
1231 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1232 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1233 absl::StrCat(
1234 was_zero_rtt_rejected_
1235 ? "Server rejected 0-RTT, aborting because "
1236 : "",
1237 "new unidirectional limit ", max_streams,
1238 " decreases the current limit: ",
1239 ietf_streamid_manager_.max_outgoing_unidirectional_streams()),
1240 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1241 return;
1242 }
1243 QUIC_DVLOG(1) << ENDPOINT
1244 << "Setting Unidirectional outgoing_max_streams_ to "
1245 << max_streams;
1246 if (ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1247 max_streams)) {
1248 OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1249 }
1250 } else {
1251 uint32_t max_streams = 0;
1252 if (config_.HasReceivedMaxBidirectionalStreams()) {
1253 max_streams = config_.ReceivedMaxBidirectionalStreams();
1254 }
1255 QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1256 << max_streams;
1257 if (was_zero_rtt_rejected_ &&
1258 max_streams < stream_id_manager_.num_open_outgoing_streams()) {
1259 connection_->CloseConnection(
1260 QUIC_INTERNAL_ERROR,
1261 absl::StrCat(
1262 "Server rejected 0-RTT, aborting because new stream limit ",
1263 max_streams, " is less than current open streams: ",
1264 stream_id_manager_.num_open_outgoing_streams()),
1265 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1266 return;
1267 }
1268 stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1269 }
1270
1271 if (perspective() == Perspective::IS_SERVER) {
1272 if (config_.HasReceivedConnectionOptions()) {
1273 // The following variations change the initial receive flow control
1274 // window sizes.
1275 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1276 AdjustInitialFlowControlWindows(64 * 1024);
1277 }
1278 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1279 AdjustInitialFlowControlWindows(128 * 1024);
1280 }
1281 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1282 AdjustInitialFlowControlWindows(256 * 1024);
1283 }
1284 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1285 AdjustInitialFlowControlWindows(512 * 1024);
1286 }
1287 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1288 AdjustInitialFlowControlWindows(1024 * 1024);
1289 }
1290 }
1291
1292 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1293 }
1294
1295 if (VersionHasIetfQuicFrames(transport_version())) {
1296 ietf_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1297 config_.GetMaxBidirectionalStreamsToSend());
1298 ietf_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1299 config_.GetMaxUnidirectionalStreamsToSend());
1300 } else {
1301 // A small number of additional incoming streams beyond the limit should be
1302 // allowed. This helps avoid early connection termination when FIN/RSTs for
1303 // old streams are lost or arrive out of order.
1304 // Use a minimum number of additional streams, or a percentage increase,
1305 // whichever is larger.
1306 uint32_t max_incoming_streams_to_send =
1307 config_.GetMaxBidirectionalStreamsToSend();
1308 uint32_t max_incoming_streams =
1309 std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1310 static_cast<uint32_t>(max_incoming_streams_to_send *
1311 kMaxStreamsMultiplier));
1312 stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1313 }
1314
1315 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1316 // When using IETF-style TLS transport parameters, inform existing streams
1317 // of new flow-control limits.
1318 if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1319 OnNewStreamOutgoingBidirectionalFlowControlWindow(
1320 config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1321 }
1322 if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1323 OnNewStreamIncomingBidirectionalFlowControlWindow(
1324 config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1325 }
1326 if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1327 OnNewStreamUnidirectionalFlowControlWindow(
1328 config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1329 }
1330 } else { // The version uses Google QUIC Crypto.
1331 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1332 // Streams which were created before the SHLO was received (0-RTT
1333 // requests) are now informed of the peer's initial flow control window.
1334 OnNewStreamFlowControlWindow(
1335 config_.ReceivedInitialStreamFlowControlWindowBytes());
1336 }
1337 }
1338
1339 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1340 OnNewSessionFlowControlWindow(
1341 config_.ReceivedInitialSessionFlowControlWindowBytes());
1342 }
1343
1344 if (perspective_ == Perspective::IS_SERVER && version().HasIetfQuicFrames() &&
1345 connection_->effective_peer_address().IsInitialized()) {
1346 if (config_.HasClientSentConnectionOption(kSPAD, perspective_)) {
1347 quiche::IpAddressFamily address_family =
1348 connection_->effective_peer_address()
1349 .Normalized()
1350 .host()
1351 .address_family();
1352 std::optional<QuicSocketAddress> preferred_address =
1353 config_.GetPreferredAddressToSend(address_family);
1354 if (preferred_address.has_value()) {
1355 // Set connection ID and token if SPAD has received and a preferred
1356 // address of the same address family is configured.
1357 std::optional<QuicNewConnectionIdFrame> frame =
1358 connection_->MaybeIssueNewConnectionIdForPreferredAddress();
1359 if (frame.has_value()) {
1360 config_.SetPreferredAddressConnectionIdAndTokenToSend(
1361 frame->connection_id, frame->stateless_reset_token);
1362 }
1363 connection_->set_sent_server_preferred_address(*preferred_address);
1364 }
1365 // Clear the alternative address of the other address family in the
1366 // config.
1367 config_.ClearAlternateServerAddressToSend(
1368 address_family == quiche::IpAddressFamily::IP_V4
1369 ? quiche::IpAddressFamily::IP_V6
1370 : quiche::IpAddressFamily::IP_V4);
1371 } else {
1372 // Clear alternative IPv(4|6) addresses in config if the server hasn't
1373 // received 'SPAD' connection option.
1374 config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V4);
1375 config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V6);
1376 }
1377 }
1378
1379 is_configured_ = true;
1380 connection()->OnConfigNegotiated();
1381
1382 // Ask flow controllers to try again since the config could have unblocked us.
1383 // Or if this session is configured on TLS enabled QUIC versions,
1384 // attempt to retransmit 0-RTT data if there's any.
1385 // TODO(fayang): consider removing this OnCanWrite call.
1386 if (!connection_->framer().is_processing_packet() &&
1387 (connection_->version().AllowsLowFlowControlLimits() ||
1388 version().UsesTls())) {
1389 QUIC_CODE_COUNT(quic_session_on_can_write_on_config_negotiated);
1390 OnCanWrite();
1391 }
1392 }
1393
OnAlpsData(const uint8_t *,size_t)1394 std::optional<std::string> QuicSession::OnAlpsData(const uint8_t* /*alps_data*/,
1395 size_t /*alps_length*/) {
1396 return std::nullopt;
1397 }
1398
AdjustInitialFlowControlWindows(size_t stream_window)1399 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1400 const float session_window_multiplier =
1401 config_.GetInitialStreamFlowControlWindowToSend()
1402 ? static_cast<float>(
1403 config_.GetInitialSessionFlowControlWindowToSend()) /
1404 config_.GetInitialStreamFlowControlWindowToSend()
1405 : 1.5;
1406
1407 QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1408 config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1409
1410 size_t session_window = session_window_multiplier * stream_window;
1411 QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1412 << session_window;
1413 config_.SetInitialSessionFlowControlWindowToSend(session_window);
1414 flow_controller_.UpdateReceiveWindowSize(session_window);
1415 // Inform all existing streams about the new window.
1416 for (auto const& kv : stream_map_) {
1417 kv.second->UpdateReceiveWindowSize(stream_window);
1418 }
1419 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1420 GetMutableCryptoStream()->UpdateReceiveWindowSize(stream_window);
1421 }
1422 }
1423
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1424 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1425 QuicStreamId stream_id) {
1426 QUICHE_DCHECK(!IsClosedStream(stream_id));
1427 // Received a frame for a locally-created stream that is not currently
1428 // active. This is an error.
1429 if (VersionHasIetfQuicFrames(transport_version())) {
1430 connection()->CloseConnection(
1431 QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1432 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1433 return;
1434 }
1435 connection()->CloseConnection(
1436 QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1437 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1438 }
1439
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1440 void QuicSession::HandleRstOnValidNonexistentStream(
1441 const QuicRstStreamFrame& frame) {
1442 // If the stream is neither originally in active streams nor created in
1443 // GetOrCreateStream(), it could be a closed stream in which case its
1444 // final received byte offset need to be updated.
1445 if (IsClosedStream(frame.stream_id)) {
1446 // The RST frame contains the final byte offset for the stream: we can now
1447 // update the connection level flow controller if needed.
1448 OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1449 }
1450 }
1451
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1452 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1453 QUICHE_DCHECK(version().UsesQuicCrypto());
1454 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1455 if (new_window < kMinimumFlowControlSendWindow) {
1456 QUIC_LOG_FIRST_N(ERROR, 1)
1457 << "Peer sent us an invalid stream flow control send window: "
1458 << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1459 connection_->CloseConnection(
1460 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1461 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1462 return;
1463 }
1464
1465 // Inform all existing streams about the new window.
1466 for (auto const& kv : stream_map_) {
1467 QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1468 << " of new stream flow control window " << new_window;
1469 if (!kv.second->MaybeConfigSendWindowOffset(
1470 new_window, /* was_zero_rtt_rejected = */ false)) {
1471 return;
1472 }
1473 }
1474 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1475 QUIC_DVLOG(1)
1476 << ENDPOINT
1477 << "Informing crypto stream of new stream flow control window "
1478 << new_window;
1479 GetMutableCryptoStream()->MaybeConfigSendWindowOffset(
1480 new_window, /* was_zero_rtt_rejected = */ false);
1481 }
1482 }
1483
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1484 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1485 QuicStreamOffset new_window) {
1486 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1487 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1488 << new_window;
1489 // Inform all existing outgoing unidirectional streams about the new window.
1490 for (auto const& kv : stream_map_) {
1491 const QuicStreamId id = kv.first;
1492 if (!version().HasIetfQuicFrames()) {
1493 if (kv.second->type() == BIDIRECTIONAL) {
1494 continue;
1495 }
1496 } else {
1497 if (QuicUtils::IsBidirectionalStreamId(id, version())) {
1498 continue;
1499 }
1500 }
1501 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1502 perspective())) {
1503 continue;
1504 }
1505 QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1506 << " of new stream flow control window " << new_window;
1507 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1508 was_zero_rtt_rejected_)) {
1509 return;
1510 }
1511 }
1512 }
1513
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1514 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1515 QuicStreamOffset new_window) {
1516 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1517 QUIC_DVLOG(1) << ENDPOINT
1518 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1519 << new_window;
1520 // Inform all existing outgoing bidirectional streams about the new window.
1521 for (auto const& kv : stream_map_) {
1522 const QuicStreamId id = kv.first;
1523 if (!version().HasIetfQuicFrames()) {
1524 if (kv.second->type() != BIDIRECTIONAL) {
1525 continue;
1526 }
1527 } else {
1528 if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1529 continue;
1530 }
1531 }
1532 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1533 perspective())) {
1534 continue;
1535 }
1536 QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1537 << id << " of new stream flow control window " << new_window;
1538 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1539 was_zero_rtt_rejected_)) {
1540 return;
1541 }
1542 }
1543 }
1544
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1545 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1546 QuicStreamOffset new_window) {
1547 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1548 QUIC_DVLOG(1) << ENDPOINT
1549 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1550 << new_window;
1551 // Inform all existing incoming bidirectional streams about the new window.
1552 for (auto const& kv : stream_map_) {
1553 const QuicStreamId id = kv.first;
1554 if (!version().HasIetfQuicFrames()) {
1555 if (kv.second->type() != BIDIRECTIONAL) {
1556 continue;
1557 }
1558 } else {
1559 if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1560 continue;
1561 }
1562 }
1563 if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1564 perspective())) {
1565 continue;
1566 }
1567 QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1568 << id << " of new stream flow control window " << new_window;
1569 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1570 was_zero_rtt_rejected_)) {
1571 return;
1572 }
1573 }
1574 }
1575
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1576 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1577 QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1578
1579 if (was_zero_rtt_rejected_ && new_window < flow_controller_.bytes_sent()) {
1580 std::string error_details = absl::StrCat(
1581 "Server rejected 0-RTT. Aborting because the client received session "
1582 "flow control send window: ",
1583 new_window,
1584 ", which is below currently used: ", flow_controller_.bytes_sent());
1585 QUIC_LOG(ERROR) << error_details;
1586 connection_->CloseConnection(
1587 QUIC_ZERO_RTT_UNRETRANSMITTABLE, error_details,
1588 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1589 return;
1590 }
1591 if (!connection_->version().AllowsLowFlowControlLimits() &&
1592 new_window < kMinimumFlowControlSendWindow) {
1593 std::string error_details = absl::StrCat(
1594 "Peer sent us an invalid session flow control send window: ",
1595 new_window, ", below minimum: ", kMinimumFlowControlSendWindow);
1596 QUIC_LOG_FIRST_N(ERROR, 1) << error_details;
1597 connection_->CloseConnection(
1598 QUIC_FLOW_CONTROL_INVALID_WINDOW, error_details,
1599 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1600 return;
1601 }
1602 if (perspective_ == Perspective::IS_CLIENT &&
1603 new_window < flow_controller_.send_window_offset()) {
1604 // The client receives a lower limit than remembered, violating
1605 // https://tools.ietf.org/html/draft-ietf-quic-transport-27#section-7.3.1
1606 std::string error_details = absl::StrCat(
1607 was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
1608 : "",
1609 "new session max data ", new_window,
1610 " decreases current limit: ", flow_controller_.send_window_offset());
1611 QUIC_LOG(ERROR) << error_details;
1612 connection_->CloseConnection(
1613 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1614 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1615 error_details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1616 return;
1617 }
1618
1619 flow_controller_.UpdateSendWindowOffset(new_window);
1620 }
1621
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1622 bool QuicSession::OnNewDecryptionKeyAvailable(
1623 EncryptionLevel level, std::unique_ptr<QuicDecrypter> decrypter,
1624 bool set_alternative_decrypter, bool latch_once_used) {
1625 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1626 !connection()->framer().HasEncrypterOfEncryptionLevel(
1627 QuicUtils::GetEncryptionLevelToSendAckofSpace(
1628 QuicUtils::GetPacketNumberSpace(level)))) {
1629 // This should never happen because connection should never decrypt a packet
1630 // while an ACK for it cannot be encrypted.
1631 return false;
1632 }
1633 if (connection()->version().KnowsWhichDecrypterToUse()) {
1634 connection()->InstallDecrypter(level, std::move(decrypter));
1635 return true;
1636 }
1637 if (set_alternative_decrypter) {
1638 connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1639 latch_once_used);
1640 return true;
1641 }
1642 connection()->SetDecrypter(level, std::move(decrypter));
1643 return true;
1644 }
1645
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1646 void QuicSession::OnNewEncryptionKeyAvailable(
1647 EncryptionLevel level, std::unique_ptr<QuicEncrypter> encrypter) {
1648 connection()->SetEncrypter(level, std::move(encrypter));
1649 if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
1650 return;
1651 }
1652
1653 bool reset_encryption_level = false;
1654 if (IsEncryptionEstablished() && level == ENCRYPTION_HANDSHAKE) {
1655 // ENCRYPTION_HANDSHAKE keys are only used for the handshake. If
1656 // ENCRYPTION_ZERO_RTT keys exist, it is possible for a client to send
1657 // stream data, which must not be sent at the ENCRYPTION_HANDSHAKE level.
1658 // Therefore, we avoid setting the default encryption level to
1659 // ENCRYPTION_HANDSHAKE.
1660 reset_encryption_level = true;
1661 }
1662 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1663 connection()->SetDefaultEncryptionLevel(level);
1664 if (reset_encryption_level) {
1665 connection()->SetDefaultEncryptionLevel(ENCRYPTION_ZERO_RTT);
1666 }
1667 QUIC_BUG_IF(quic_bug_12435_7,
1668 IsEncryptionEstablished() &&
1669 (connection()->encryption_level() == ENCRYPTION_INITIAL ||
1670 connection()->encryption_level() == ENCRYPTION_HANDSHAKE))
1671 << "Encryption is established, but the encryption level " << level
1672 << " does not support sending stream data";
1673 }
1674
SetDefaultEncryptionLevel(EncryptionLevel level)1675 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1676 QUICHE_DCHECK_EQ(PROTOCOL_QUIC_CRYPTO,
1677 connection_->version().handshake_protocol);
1678 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1679 connection()->SetDefaultEncryptionLevel(level);
1680
1681 switch (level) {
1682 case ENCRYPTION_INITIAL:
1683 break;
1684 case ENCRYPTION_ZERO_RTT:
1685 if (perspective() == Perspective::IS_CLIENT) {
1686 // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1687 // they can't be decrypted by the server.
1688 connection_->MarkZeroRttPacketsForRetransmission(0);
1689 if (!connection_->framer().is_processing_packet()) {
1690 // TODO(fayang): consider removing this OnCanWrite call.
1691 // Given any streams blocked by encryption a chance to write.
1692 QUIC_CODE_COUNT(
1693 quic_session_on_can_write_set_default_encryption_level);
1694 OnCanWrite();
1695 }
1696 }
1697 break;
1698 case ENCRYPTION_HANDSHAKE:
1699 break;
1700 case ENCRYPTION_FORWARD_SECURE:
1701 QUIC_BUG_IF(quic_bug_12435_8, !config_.negotiated())
1702 << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1703 connection()->mutable_stats().handshake_completion_time =
1704 connection()->clock()->ApproximateNow();
1705 break;
1706 default:
1707 QUIC_BUG(quic_bug_10866_7) << "Unknown encryption level: " << level;
1708 }
1709 }
1710
OnTlsHandshakeComplete()1711 void QuicSession::OnTlsHandshakeComplete() {
1712 QUICHE_DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1713 QUIC_BUG_IF(quic_bug_12435_9,
1714 !GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1715 << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1716 QUIC_BUG_IF(quic_bug_12435_10, !config_.negotiated())
1717 << ENDPOINT << "Handshake completes without parameter negotiation.";
1718 connection()->mutable_stats().handshake_completion_time =
1719 connection()->clock()->ApproximateNow();
1720 if (connection()->version().UsesTls() &&
1721 perspective_ == Perspective::IS_SERVER) {
1722 // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1723 // to the client.
1724 control_frame_manager_.WriteOrBufferHandshakeDone();
1725 if (connection()->version().HasIetfQuicFrames()) {
1726 MaybeSendAddressToken();
1727 }
1728 }
1729 }
1730
MaybeSendAddressToken()1731 bool QuicSession::MaybeSendAddressToken() {
1732 QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER &&
1733 connection()->version().HasIetfQuicFrames());
1734 std::optional<CachedNetworkParameters> cached_network_params =
1735 GenerateCachedNetworkParameters();
1736
1737 std::string address_token = GetCryptoStream()->GetAddressToken(
1738 cached_network_params.has_value() ? &*cached_network_params : nullptr);
1739 if (address_token.empty()) {
1740 return false;
1741 }
1742 const size_t buf_len = address_token.length() + 1;
1743 auto buffer = std::make_unique<char[]>(buf_len);
1744 QuicDataWriter writer(buf_len, buffer.get());
1745 // Add |kAddressTokenPrefix| for token sent in NEW_TOKEN frame.
1746 writer.WriteUInt8(kAddressTokenPrefix);
1747 writer.WriteBytes(address_token.data(), address_token.length());
1748 control_frame_manager_.WriteOrBufferNewToken(
1749 absl::string_view(buffer.get(), buf_len));
1750 if (cached_network_params.has_value()) {
1751 connection()->OnSendConnectionState(*cached_network_params);
1752 }
1753 return true;
1754 }
1755
DiscardOldDecryptionKey(EncryptionLevel level)1756 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1757 if (!connection()->version().KnowsWhichDecrypterToUse()) {
1758 return;
1759 }
1760 connection()->RemoveDecrypter(level);
1761 }
1762
DiscardOldEncryptionKey(EncryptionLevel level)1763 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1764 QUIC_DLOG(INFO) << ENDPOINT << "Discarding " << level << " keys";
1765 if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1766 connection()->RemoveEncrypter(level);
1767 }
1768 switch (level) {
1769 case ENCRYPTION_INITIAL:
1770 NeuterUnencryptedData();
1771 break;
1772 case ENCRYPTION_HANDSHAKE:
1773 NeuterHandshakeData();
1774 break;
1775 case ENCRYPTION_ZERO_RTT:
1776 break;
1777 case ENCRYPTION_FORWARD_SECURE:
1778 QUIC_BUG(quic_bug_10866_8)
1779 << ENDPOINT << "Discarding 1-RTT keys is not allowed";
1780 break;
1781 default:
1782 QUIC_BUG(quic_bug_10866_9)
1783 << ENDPOINT
1784 << "Cannot discard keys for unknown encryption level: " << level;
1785 }
1786 }
1787
NeuterHandshakeData()1788 void QuicSession::NeuterHandshakeData() {
1789 GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(
1790 ENCRYPTION_HANDSHAKE);
1791 connection()->OnHandshakeComplete();
1792 }
1793
OnZeroRttRejected(int reason)1794 void QuicSession::OnZeroRttRejected(int reason) {
1795 was_zero_rtt_rejected_ = true;
1796 connection_->MarkZeroRttPacketsForRetransmission(reason);
1797 if (connection_->encryption_level() == ENCRYPTION_FORWARD_SECURE) {
1798 QUIC_BUG(quic_bug_10866_10)
1799 << "1-RTT keys already available when 0-RTT is rejected.";
1800 connection_->CloseConnection(
1801 QUIC_INTERNAL_ERROR,
1802 "1-RTT keys already available when 0-RTT is rejected.",
1803 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1804 }
1805 }
1806
FillTransportParameters(TransportParameters * params)1807 bool QuicSession::FillTransportParameters(TransportParameters* params) {
1808 if (version().UsesTls()) {
1809 if (perspective() == Perspective::IS_SERVER) {
1810 config_.SetOriginalConnectionIdToSend(
1811 connection_->GetOriginalDestinationConnectionId());
1812 config_.SetInitialSourceConnectionIdToSend(connection_->connection_id());
1813 } else {
1814 config_.SetInitialSourceConnectionIdToSend(
1815 connection_->client_connection_id());
1816 }
1817 }
1818 return config_.FillTransportParameters(params);
1819 }
1820
ProcessTransportParameters(const TransportParameters & params,bool is_resumption,std::string * error_details)1821 QuicErrorCode QuicSession::ProcessTransportParameters(
1822 const TransportParameters& params, bool is_resumption,
1823 std::string* error_details) {
1824 return config_.ProcessTransportParameters(params, is_resumption,
1825 error_details);
1826 }
1827
OnHandshakeCallbackDone()1828 void QuicSession::OnHandshakeCallbackDone() {
1829 if (!connection_->connected()) {
1830 return;
1831 }
1832
1833 if (!connection()->is_processing_packet()) {
1834 connection()->MaybeProcessUndecryptablePackets();
1835 }
1836 }
1837
PacketFlusherAttached() const1838 bool QuicSession::PacketFlusherAttached() const {
1839 QUICHE_DCHECK(connection_->connected());
1840 return connection()->packet_creator().PacketFlusherAttached();
1841 }
1842
OnEncryptedClientHelloSent(absl::string_view client_hello) const1843 void QuicSession::OnEncryptedClientHelloSent(
1844 absl::string_view client_hello) const {
1845 connection()->OnEncryptedClientHelloSent(client_hello);
1846 }
1847
OnEncryptedClientHelloReceived(absl::string_view client_hello) const1848 void QuicSession::OnEncryptedClientHelloReceived(
1849 absl::string_view client_hello) const {
1850 connection()->OnEncryptedClientHelloReceived(client_hello);
1851 }
1852
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1853 void QuicSession::OnCryptoHandshakeMessageSent(
1854 const CryptoHandshakeMessage& /*message*/) {}
1855
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1856 void QuicSession::OnCryptoHandshakeMessageReceived(
1857 const CryptoHandshakeMessage& /*message*/) {}
1858
RegisterStreamPriority(QuicStreamId id,bool is_static,const QuicStreamPriority & priority)1859 void QuicSession::RegisterStreamPriority(QuicStreamId id, bool is_static,
1860 const QuicStreamPriority& priority) {
1861 write_blocked_streams()->RegisterStream(id, is_static, priority);
1862 }
1863
UnregisterStreamPriority(QuicStreamId id)1864 void QuicSession::UnregisterStreamPriority(QuicStreamId id) {
1865 write_blocked_streams()->UnregisterStream(id);
1866 }
1867
UpdateStreamPriority(QuicStreamId id,const QuicStreamPriority & new_priority)1868 void QuicSession::UpdateStreamPriority(QuicStreamId id,
1869 const QuicStreamPriority& new_priority) {
1870 write_blocked_streams()->UpdateStreamPriority(id, new_priority);
1871 }
1872
ActivateStream(std::unique_ptr<QuicStream> stream)1873 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1874 const bool should_keep_alive = ShouldKeepConnectionAlive();
1875 QuicStreamId stream_id = stream->id();
1876 bool is_static = stream->is_static();
1877 QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1878 << ". activating stream " << stream_id;
1879 QUICHE_DCHECK(!stream_map_.contains(stream_id));
1880 stream_map_[stream_id] = std::move(stream);
1881 if (is_static) {
1882 ++num_static_streams_;
1883 return;
1884 }
1885 if (!VersionHasIetfQuicFrames(transport_version())) {
1886 // Do not inform stream ID manager of static streams.
1887 stream_id_manager_.ActivateStream(
1888 /*is_incoming=*/IsIncomingStream(stream_id));
1889 }
1890 if (perspective() == Perspective::IS_CLIENT &&
1891 connection()->multi_port_stats() != nullptr && !should_keep_alive &&
1892 ShouldKeepConnectionAlive()) {
1893 connection()->MaybeProbeMultiPortPath();
1894 }
1895 }
1896
GetNextOutgoingBidirectionalStreamId()1897 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1898 if (VersionHasIetfQuicFrames(transport_version())) {
1899 return ietf_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1900 }
1901 return stream_id_manager_.GetNextOutgoingStreamId();
1902 }
1903
GetNextOutgoingUnidirectionalStreamId()1904 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1905 if (VersionHasIetfQuicFrames(transport_version())) {
1906 return ietf_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1907 }
1908 return stream_id_manager_.GetNextOutgoingStreamId();
1909 }
1910
CanOpenNextOutgoingBidirectionalStream()1911 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1912 if (liveness_testing_in_progress_) {
1913 QUICHE_DCHECK_EQ(Perspective::IS_CLIENT, perspective());
1914 QUIC_CODE_COUNT(
1915 quic_client_fails_to_create_stream_liveness_testing_in_progress);
1916 return false;
1917 }
1918 if (!VersionHasIetfQuicFrames(transport_version())) {
1919 if (!stream_id_manager_.CanOpenNextOutgoingStream()) {
1920 return false;
1921 }
1922 } else {
1923 if (!ietf_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1924 QUIC_CODE_COUNT(
1925 quic_fails_to_create_stream_close_too_many_streams_created);
1926 if (is_configured_) {
1927 // Send STREAM_BLOCKED after config negotiated.
1928 control_frame_manager_.WriteOrBufferStreamsBlocked(
1929 ietf_streamid_manager_.max_outgoing_bidirectional_streams(),
1930 /*unidirectional=*/false);
1931 }
1932 return false;
1933 }
1934 }
1935 if (perspective() == Perspective::IS_CLIENT &&
1936 connection_->MaybeTestLiveness()) {
1937 // Now is relatively close to the idle timeout having the risk that requests
1938 // could be discarded at the server.
1939 liveness_testing_in_progress_ = true;
1940 QUIC_CODE_COUNT(quic_client_fails_to_create_stream_close_to_idle_timeout);
1941 return false;
1942 }
1943 return true;
1944 }
1945
CanOpenNextOutgoingUnidirectionalStream()1946 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1947 if (!VersionHasIetfQuicFrames(transport_version())) {
1948 return stream_id_manager_.CanOpenNextOutgoingStream();
1949 }
1950 if (ietf_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
1951 return true;
1952 }
1953 if (is_configured_) {
1954 // Send STREAM_BLOCKED after config negotiated.
1955 control_frame_manager_.WriteOrBufferStreamsBlocked(
1956 ietf_streamid_manager_.max_outgoing_unidirectional_streams(),
1957 /*unidirectional=*/true);
1958 }
1959 return false;
1960 }
1961
GetAdvertisedMaxIncomingBidirectionalStreams() const1962 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
1963 const {
1964 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1965 return ietf_streamid_manager_.advertised_max_incoming_bidirectional_streams();
1966 }
1967
GetOrCreateStream(const QuicStreamId stream_id)1968 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1969 QUICHE_DCHECK(!pending_stream_map_.contains(stream_id));
1970 if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
1971 return GetMutableCryptoStream();
1972 }
1973
1974 StreamMap::iterator it = stream_map_.find(stream_id);
1975 if (it != stream_map_.end()) {
1976 return it->second->IsZombie() ? nullptr : it->second.get();
1977 }
1978
1979 if (IsClosedStream(stream_id)) {
1980 return nullptr;
1981 }
1982
1983 if (!IsIncomingStream(stream_id)) {
1984 HandleFrameOnNonexistentOutgoingStream(stream_id);
1985 return nullptr;
1986 }
1987
1988 // TODO(fkastenholz): If we are creating a new stream and we have sent a
1989 // goaway, we should ignore the stream creation. Need to add code to A) test
1990 // if goaway was sent ("if (transport_goaway_sent_)") and B) reject stream
1991 // creation ("return nullptr")
1992
1993 if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1994 return nullptr;
1995 }
1996
1997 if (!VersionHasIetfQuicFrames(transport_version()) &&
1998 !stream_id_manager_.CanOpenIncomingStream()) {
1999 // Refuse to open the stream.
2000 ResetStream(stream_id, QUIC_REFUSED_STREAM);
2001 return nullptr;
2002 }
2003
2004 return CreateIncomingStream(stream_id);
2005 }
2006
StreamDraining(QuicStreamId stream_id,bool unidirectional)2007 void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
2008 QUICHE_DCHECK(stream_map_.contains(stream_id));
2009 QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
2010 if (VersionHasIetfQuicFrames(transport_version())) {
2011 ietf_streamid_manager_.OnStreamClosed(stream_id);
2012 } else {
2013 stream_id_manager_.OnStreamClosed(
2014 /*is_incoming=*/IsIncomingStream(stream_id));
2015 }
2016 ++num_draining_streams_;
2017 if (!IsIncomingStream(stream_id)) {
2018 ++num_outgoing_draining_streams_;
2019 if (!VersionHasIetfQuicFrames(transport_version())) {
2020 OnCanCreateNewOutgoingStream(unidirectional);
2021 }
2022 }
2023 }
2024
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)2025 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
2026 const QuicStreamId stream_id) {
2027 if (VersionHasIetfQuicFrames(transport_version())) {
2028 std::string error_details;
2029 if (ietf_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
2030 stream_id, &error_details)) {
2031 return true;
2032 }
2033 connection()->CloseConnection(
2034 QUIC_INVALID_STREAM_ID, error_details,
2035 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2036 return false;
2037 }
2038 if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
2039 connection()->CloseConnection(
2040 QUIC_TOO_MANY_AVAILABLE_STREAMS,
2041 absl::StrCat(stream_id, " exceeds available streams ",
2042 stream_id_manager_.MaxAvailableStreams()),
2043 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2044 return false;
2045 }
2046 return true;
2047 }
2048
ShouldYield(QuicStreamId stream_id)2049 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
2050 if (stream_id == currently_writing_stream_id_) {
2051 return false;
2052 }
2053 return write_blocked_streams()->ShouldYield(stream_id);
2054 }
2055
GetOrCreatePendingStream(QuicStreamId stream_id)2056 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
2057 auto it = pending_stream_map_.find(stream_id);
2058 if (it != pending_stream_map_.end()) {
2059 return it->second.get();
2060 }
2061
2062 if (IsClosedStream(stream_id) ||
2063 !MaybeIncreaseLargestPeerStreamId(stream_id)) {
2064 return nullptr;
2065 }
2066
2067 auto pending = std::make_unique<PendingStream>(stream_id, this);
2068 PendingStream* unowned_pending = pending.get();
2069 pending_stream_map_[stream_id] = std::move(pending);
2070 return unowned_pending;
2071 }
2072
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)2073 void QuicSession::set_largest_peer_created_stream_id(
2074 QuicStreamId largest_peer_created_stream_id) {
2075 QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
2076 stream_id_manager_.set_largest_peer_created_stream_id(
2077 largest_peer_created_stream_id);
2078 }
2079
GetLargestPeerCreatedStreamId(bool unidirectional) const2080 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
2081 bool unidirectional) const {
2082 // This method is only used in IETF QUIC.
2083 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2084 return ietf_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
2085 }
2086
DeleteConnection()2087 void QuicSession::DeleteConnection() {
2088 if (connection_) {
2089 delete connection_;
2090 connection_ = nullptr;
2091 }
2092 }
2093
MaybeSetStreamPriority(QuicStreamId stream_id,const QuicStreamPriority & priority)2094 bool QuicSession::MaybeSetStreamPriority(QuicStreamId stream_id,
2095 const QuicStreamPriority& priority) {
2096 auto active_stream = stream_map_.find(stream_id);
2097 if (active_stream != stream_map_.end()) {
2098 active_stream->second->SetPriority(priority);
2099 return true;
2100 }
2101
2102 return false;
2103 }
2104
IsClosedStream(QuicStreamId id)2105 bool QuicSession::IsClosedStream(QuicStreamId id) {
2106 QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2107 if (IsOpenStream(id)) {
2108 // Stream is active
2109 return false;
2110 }
2111
2112 if (VersionHasIetfQuicFrames(transport_version())) {
2113 return !ietf_streamid_manager_.IsAvailableStream(id);
2114 }
2115
2116 return !stream_id_manager_.IsAvailableStream(id);
2117 }
2118
IsOpenStream(QuicStreamId id)2119 bool QuicSession::IsOpenStream(QuicStreamId id) {
2120 QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2121 const StreamMap::iterator it = stream_map_.find(id);
2122 if (it != stream_map_.end()) {
2123 return !it->second->IsZombie();
2124 }
2125 if (pending_stream_map_.contains(id) ||
2126 QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2127 // Stream is active
2128 return true;
2129 }
2130 return false;
2131 }
2132
IsStaticStream(QuicStreamId id) const2133 bool QuicSession::IsStaticStream(QuicStreamId id) const {
2134 auto it = stream_map_.find(id);
2135 if (it == stream_map_.end()) {
2136 return false;
2137 }
2138 return it->second->is_static();
2139 }
2140
GetNumActiveStreams() const2141 size_t QuicSession::GetNumActiveStreams() const {
2142 QUICHE_DCHECK_GE(
2143 static_cast<QuicStreamCount>(stream_map_.size()),
2144 num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
2145 return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
2146 num_zombie_streams_;
2147 }
2148
MarkConnectionLevelWriteBlocked(QuicStreamId id)2149 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
2150 if (GetOrCreateStream(id) == nullptr) {
2151 QUIC_BUG(quic_bug_10866_11)
2152 << "Marking unknown stream " << id << " blocked.";
2153 QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
2154 }
2155
2156 QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
2157 << " to write-blocked list";
2158
2159 write_blocked_streams_->AddStream(id);
2160 }
2161
HasDataToWrite() const2162 bool QuicSession::HasDataToWrite() const {
2163 return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
2164 write_blocked_streams_->HasWriteBlockedDataStreams() ||
2165 connection_->HasQueuedData() ||
2166 !streams_with_pending_retransmission_.empty() ||
2167 control_frame_manager_.WillingToWrite();
2168 }
2169
OnAckNeedsRetransmittableFrame()2170 void QuicSession::OnAckNeedsRetransmittableFrame() {
2171 flow_controller_.SendWindowUpdate();
2172 }
2173
SendAckFrequency(const QuicAckFrequencyFrame & frame)2174 void QuicSession::SendAckFrequency(const QuicAckFrequencyFrame& frame) {
2175 control_frame_manager_.WriteOrBufferAckFrequency(frame);
2176 }
2177
SendNewConnectionId(const QuicNewConnectionIdFrame & frame)2178 void QuicSession::SendNewConnectionId(const QuicNewConnectionIdFrame& frame) {
2179 control_frame_manager_.WriteOrBufferNewConnectionId(
2180 frame.connection_id, frame.sequence_number, frame.retire_prior_to,
2181 frame.stateless_reset_token);
2182 }
2183
SendRetireConnectionId(uint64_t sequence_number)2184 void QuicSession::SendRetireConnectionId(uint64_t sequence_number) {
2185 control_frame_manager_.WriteOrBufferRetireConnectionId(sequence_number);
2186 }
2187
MaybeReserveConnectionId(const QuicConnectionId & server_connection_id)2188 bool QuicSession::MaybeReserveConnectionId(
2189 const QuicConnectionId& server_connection_id) {
2190 if (visitor_) {
2191 return visitor_->TryAddNewConnectionId(
2192 connection_->GetOneActiveServerConnectionId(), server_connection_id);
2193 }
2194 return true;
2195 }
2196
OnServerConnectionIdRetired(const QuicConnectionId & server_connection_id)2197 void QuicSession::OnServerConnectionIdRetired(
2198 const QuicConnectionId& server_connection_id) {
2199 if (visitor_) {
2200 visitor_->OnConnectionIdRetired(server_connection_id);
2201 }
2202 }
2203
IsConnectionFlowControlBlocked() const2204 bool QuicSession::IsConnectionFlowControlBlocked() const {
2205 return flow_controller_.IsBlocked();
2206 }
2207
IsStreamFlowControlBlocked()2208 bool QuicSession::IsStreamFlowControlBlocked() {
2209 for (auto const& kv : stream_map_) {
2210 if (kv.second->IsFlowControlBlocked()) {
2211 return true;
2212 }
2213 }
2214 if (!QuicVersionUsesCryptoFrames(transport_version()) &&
2215 GetMutableCryptoStream()->IsFlowControlBlocked()) {
2216 return true;
2217 }
2218 return false;
2219 }
2220
MaxAvailableBidirectionalStreams() const2221 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
2222 if (VersionHasIetfQuicFrames(transport_version())) {
2223 return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2224 }
2225 return stream_id_manager_.MaxAvailableStreams();
2226 }
2227
MaxAvailableUnidirectionalStreams() const2228 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
2229 if (VersionHasIetfQuicFrames(transport_version())) {
2230 return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2231 }
2232 return stream_id_manager_.MaxAvailableStreams();
2233 }
2234
IsIncomingStream(QuicStreamId id) const2235 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
2236 if (VersionHasIetfQuicFrames(transport_version())) {
2237 return !QuicUtils::IsOutgoingStreamId(version(), id, perspective_);
2238 }
2239 return stream_id_manager_.IsIncomingStream(id);
2240 }
2241
MaybeCloseZombieStream(QuicStreamId id)2242 void QuicSession::MaybeCloseZombieStream(QuicStreamId id) {
2243 auto it = stream_map_.find(id);
2244 if (it == stream_map_.end()) {
2245 return;
2246 }
2247 --num_zombie_streams_;
2248 closed_streams_.push_back(std::move(it->second));
2249 stream_map_.erase(it);
2250
2251 if (!closed_streams_clean_up_alarm_->IsSet()) {
2252 closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
2253 }
2254 // Do not retransmit data of a closed stream.
2255 streams_with_pending_retransmission_.erase(id);
2256 connection_->QuicBugIfHasPendingFrames(id);
2257 }
2258
GetStream(QuicStreamId id) const2259 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
2260 auto active_stream = stream_map_.find(id);
2261 if (active_stream != stream_map_.end()) {
2262 return active_stream->second.get();
2263 }
2264
2265 if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2266 return const_cast<QuicCryptoStream*>(GetCryptoStream());
2267 }
2268
2269 return nullptr;
2270 }
2271
GetActiveStream(QuicStreamId id) const2272 QuicStream* QuicSession::GetActiveStream(QuicStreamId id) const {
2273 auto stream = stream_map_.find(id);
2274 if (stream != stream_map_.end() && !stream->second->is_static()) {
2275 return stream->second.get();
2276 }
2277 return nullptr;
2278 }
2279
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)2280 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
2281 QuicTime::Delta ack_delay_time,
2282 QuicTime receive_timestamp) {
2283 if (frame.type == MESSAGE_FRAME) {
2284 OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
2285 return true;
2286 }
2287 if (frame.type == CRYPTO_FRAME) {
2288 return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
2289 ack_delay_time);
2290 }
2291 if (frame.type != STREAM_FRAME) {
2292 bool acked = control_frame_manager_.OnControlFrameAcked(frame);
2293 if (limit_sending_max_streams_ && acked &&
2294 frame.type == MAX_STREAMS_FRAME) {
2295 // Since there is a 2 frame limit on the number of outstanding max_streams
2296 // frames, when an outstanding max_streams frame is ack'd that frees up
2297 // room to potntially send another.
2298 QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 2, 3);
2299 ietf_streamid_manager_.MaybeSendMaxStreamsFrame();
2300 }
2301 return acked;
2302 }
2303 bool new_stream_data_acked = false;
2304 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2305 // Stream can already be reset when sent frame gets acked.
2306 if (stream != nullptr) {
2307 QuicByteCount newly_acked_length = 0;
2308 new_stream_data_acked = stream->OnStreamFrameAcked(
2309 frame.stream_frame.offset, frame.stream_frame.data_length,
2310 frame.stream_frame.fin, ack_delay_time, receive_timestamp,
2311 &newly_acked_length);
2312 if (!stream->HasPendingRetransmission()) {
2313 streams_with_pending_retransmission_.erase(stream->id());
2314 }
2315 }
2316 return new_stream_data_acked;
2317 }
2318
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)2319 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
2320 QuicStream* stream = GetStream(frame.stream_id);
2321 if (stream == nullptr) {
2322 QUIC_BUG(quic_bug_10866_12)
2323 << "Stream: " << frame.stream_id << " is closed when " << frame
2324 << " is retransmitted.";
2325 connection()->CloseConnection(
2326 QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
2327 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2328 return;
2329 }
2330 stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
2331 frame.fin);
2332 }
2333
OnFrameLost(const QuicFrame & frame)2334 void QuicSession::OnFrameLost(const QuicFrame& frame) {
2335 if (frame.type == MESSAGE_FRAME) {
2336 ++total_datagrams_lost_;
2337 OnMessageLost(frame.message_frame->message_id);
2338 return;
2339 }
2340 if (frame.type == CRYPTO_FRAME) {
2341 GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
2342 return;
2343 }
2344 if (frame.type != STREAM_FRAME) {
2345 control_frame_manager_.OnControlFrameLost(frame);
2346 return;
2347 }
2348 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2349 if (stream == nullptr) {
2350 return;
2351 }
2352 stream->OnStreamFrameLost(frame.stream_frame.offset,
2353 frame.stream_frame.data_length,
2354 frame.stream_frame.fin);
2355 if (stream->HasPendingRetransmission() &&
2356 !streams_with_pending_retransmission_.contains(
2357 frame.stream_frame.stream_id)) {
2358 streams_with_pending_retransmission_.insert(
2359 std::make_pair(frame.stream_frame.stream_id, true));
2360 }
2361 }
2362
RetransmitFrames(const QuicFrames & frames,TransmissionType type)2363 bool QuicSession::RetransmitFrames(const QuicFrames& frames,
2364 TransmissionType type) {
2365 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2366 for (const QuicFrame& frame : frames) {
2367 if (frame.type == MESSAGE_FRAME) {
2368 // Do not retransmit MESSAGE frames.
2369 continue;
2370 }
2371 if (frame.type == CRYPTO_FRAME) {
2372 if (!GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type)) {
2373 return false;
2374 }
2375 continue;
2376 }
2377 if (frame.type != STREAM_FRAME) {
2378 if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
2379 return false;
2380 }
2381 continue;
2382 }
2383 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2384 if (stream != nullptr &&
2385 !stream->RetransmitStreamData(frame.stream_frame.offset,
2386 frame.stream_frame.data_length,
2387 frame.stream_frame.fin, type)) {
2388 return false;
2389 }
2390 }
2391 return true;
2392 }
2393
IsFrameOutstanding(const QuicFrame & frame) const2394 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2395 if (frame.type == MESSAGE_FRAME) {
2396 return false;
2397 }
2398 if (frame.type == CRYPTO_FRAME) {
2399 return GetCryptoStream()->IsFrameOutstanding(
2400 frame.crypto_frame->level, frame.crypto_frame->offset,
2401 frame.crypto_frame->data_length);
2402 }
2403 if (frame.type != STREAM_FRAME) {
2404 return control_frame_manager_.IsControlFrameOutstanding(frame);
2405 }
2406 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2407 return stream != nullptr &&
2408 stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2409 frame.stream_frame.data_length,
2410 frame.stream_frame.fin);
2411 }
2412
HasUnackedCryptoData() const2413 bool QuicSession::HasUnackedCryptoData() const {
2414 const QuicCryptoStream* crypto_stream = GetCryptoStream();
2415 return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2416 }
2417
HasUnackedStreamData() const2418 bool QuicSession::HasUnackedStreamData() const {
2419 for (const auto& it : stream_map_) {
2420 if (it.second->IsWaitingForAcks()) {
2421 return true;
2422 }
2423 }
2424 return false;
2425 }
2426
GetHandshakeState() const2427 HandshakeState QuicSession::GetHandshakeState() const {
2428 return GetCryptoStream()->GetHandshakeState();
2429 }
2430
GetFlowControlSendWindowSize(QuicStreamId id)2431 QuicByteCount QuicSession::GetFlowControlSendWindowSize(QuicStreamId id) {
2432 QuicStream* stream = GetActiveStream(id);
2433 if (stream == nullptr) {
2434 // No flow control for invalid or inactive stream ids. Returning uint64max
2435 // allows QuicPacketCreator to write as much data as possible.
2436 return std::numeric_limits<QuicByteCount>::max();
2437 }
2438 return stream->CalculateSendWindowSize();
2439 }
2440
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2441 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2442 QuicStreamOffset offset,
2443 QuicByteCount data_length,
2444 QuicDataWriter* writer) {
2445 QuicStream* stream = GetStream(id);
2446 if (stream == nullptr) {
2447 // This causes the connection to be closed because of failed to serialize
2448 // packet.
2449 QUIC_BUG(quic_bug_10866_13)
2450 << "Stream " << id << " does not exist when trying to write data."
2451 << " version:" << transport_version();
2452 return STREAM_MISSING;
2453 }
2454 if (stream->WriteStreamData(offset, data_length, writer)) {
2455 return WRITE_SUCCESS;
2456 }
2457 return WRITE_FAILED;
2458 }
2459
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2460 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2461 QuicStreamOffset offset,
2462 QuicByteCount data_length,
2463 QuicDataWriter* writer) {
2464 return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2465 writer);
2466 }
2467
GetStatelessResetToken() const2468 StatelessResetToken QuicSession::GetStatelessResetToken() const {
2469 return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2470 }
2471
CanWriteStreamData() const2472 bool QuicSession::CanWriteStreamData() const {
2473 // Don't write stream data if there are queued data packets.
2474 if (connection_->HasQueuedPackets()) {
2475 return false;
2476 }
2477 // Immediately write handshake data.
2478 if (HasPendingHandshake()) {
2479 return true;
2480 }
2481 return connection_->CanWrite(HAS_RETRANSMITTABLE_DATA);
2482 }
2483
RetransmitLostData()2484 bool QuicSession::RetransmitLostData() {
2485 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2486 // Retransmit crypto data first.
2487 bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2488 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2489 if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2490 crypto_stream->WritePendingCryptoRetransmission();
2491 }
2492 // Retransmit crypto data in stream 1 frames (version < 47).
2493 if (!uses_crypto_frames &&
2494 streams_with_pending_retransmission_.contains(
2495 QuicUtils::GetCryptoStreamId(transport_version()))) {
2496 // Retransmit crypto data first.
2497 QuicStream* crypto_stream =
2498 GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2499 crypto_stream->OnCanWrite();
2500 QUICHE_DCHECK(CheckStreamWriteBlocked(crypto_stream));
2501 if (crypto_stream->HasPendingRetransmission()) {
2502 // Connection is write blocked.
2503 return false;
2504 } else {
2505 streams_with_pending_retransmission_.erase(
2506 QuicUtils::GetCryptoStreamId(transport_version()));
2507 }
2508 }
2509 if (control_frame_manager_.HasPendingRetransmission()) {
2510 control_frame_manager_.OnCanWrite();
2511 if (control_frame_manager_.HasPendingRetransmission()) {
2512 return false;
2513 }
2514 }
2515 while (!streams_with_pending_retransmission_.empty()) {
2516 if (!CanWriteStreamData()) {
2517 break;
2518 }
2519 // Retransmit lost data on headers and data streams.
2520 const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2521 QuicStream* stream = GetStream(id);
2522 if (stream != nullptr) {
2523 stream->OnCanWrite();
2524 QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
2525 if (stream->HasPendingRetransmission()) {
2526 // Connection is write blocked.
2527 break;
2528 } else if (!streams_with_pending_retransmission_.empty() &&
2529 streams_with_pending_retransmission_.begin()->first == id) {
2530 // Retransmit lost data may cause connection close. If this stream
2531 // has not yet sent fin, a RST_STREAM will be sent and it will be
2532 // removed from streams_with_pending_retransmission_.
2533 streams_with_pending_retransmission_.pop_front();
2534 }
2535 } else {
2536 QUIC_BUG(quic_bug_10866_14)
2537 << "Try to retransmit data of a closed stream";
2538 streams_with_pending_retransmission_.pop_front();
2539 }
2540 }
2541
2542 return streams_with_pending_retransmission_.empty();
2543 }
2544
NeuterUnencryptedData()2545 void QuicSession::NeuterUnencryptedData() {
2546 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2547 crypto_stream->NeuterUnencryptedStreamData();
2548 if (!crypto_stream->HasPendingRetransmission() &&
2549 !QuicVersionUsesCryptoFrames(transport_version())) {
2550 streams_with_pending_retransmission_.erase(
2551 QuicUtils::GetCryptoStreamId(transport_version()));
2552 }
2553 connection_->NeuterUnencryptedPackets();
2554 }
2555
SetTransmissionType(TransmissionType type)2556 void QuicSession::SetTransmissionType(TransmissionType type) {
2557 connection_->SetTransmissionType(type);
2558 }
2559
SendMessage(absl::Span<quiche::QuicheMemSlice> message)2560 MessageResult QuicSession::SendMessage(
2561 absl::Span<quiche::QuicheMemSlice> message) {
2562 return SendMessage(message, /*flush=*/false);
2563 }
2564
SendMessage(quiche::QuicheMemSlice message)2565 MessageResult QuicSession::SendMessage(quiche::QuicheMemSlice message) {
2566 return SendMessage(absl::MakeSpan(&message, 1), /*flush=*/false);
2567 }
2568
SendMessage(absl::Span<quiche::QuicheMemSlice> message,bool flush)2569 MessageResult QuicSession::SendMessage(
2570 absl::Span<quiche::QuicheMemSlice> message, bool flush) {
2571 QUICHE_DCHECK(connection_->connected())
2572 << ENDPOINT << "Try to write messages when connection is closed.";
2573 if (!IsEncryptionEstablished()) {
2574 return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2575 }
2576 QuicConnection::ScopedEncryptionLevelContext context(
2577 connection(), GetEncryptionLevelToSendApplicationData());
2578 MessageStatus result =
2579 connection_->SendMessage(last_message_id_ + 1, message, flush);
2580 if (result == MESSAGE_STATUS_SUCCESS) {
2581 return {result, ++last_message_id_};
2582 }
2583 return {result, 0};
2584 }
2585
OnMessageAcked(QuicMessageId message_id,QuicTime)2586 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2587 QuicTime /*receive_timestamp*/) {
2588 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2589 }
2590
OnMessageLost(QuicMessageId message_id)2591 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2592 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2593 << " is considered lost";
2594 }
2595
CleanUpClosedStreams()2596 void QuicSession::CleanUpClosedStreams() { closed_streams_.clear(); }
2597
GetCurrentLargestMessagePayload() const2598 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2599 return connection_->GetCurrentLargestMessagePayload();
2600 }
2601
GetGuaranteedLargestMessagePayload() const2602 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2603 return connection_->GetGuaranteedLargestMessagePayload();
2604 }
2605
next_outgoing_bidirectional_stream_id() const2606 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2607 if (VersionHasIetfQuicFrames(transport_version())) {
2608 return ietf_streamid_manager_.next_outgoing_bidirectional_stream_id();
2609 }
2610 return stream_id_manager_.next_outgoing_stream_id();
2611 }
2612
next_outgoing_unidirectional_stream_id() const2613 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2614 if (VersionHasIetfQuicFrames(transport_version())) {
2615 return ietf_streamid_manager_.next_outgoing_unidirectional_stream_id();
2616 }
2617 return stream_id_manager_.next_outgoing_stream_id();
2618 }
2619
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2620 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2621 const bool allow_new_streams =
2622 frame.unidirectional
2623 ? ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2624 frame.stream_count)
2625 : ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2626 frame.stream_count);
2627 if (allow_new_streams) {
2628 OnCanCreateNewOutgoingStream(frame.unidirectional);
2629 }
2630
2631 return true;
2632 }
2633
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2634 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2635 std::string error_details;
2636 if (ietf_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2637 return true;
2638 }
2639 connection_->CloseConnection(
2640 QUIC_STREAMS_BLOCKED_ERROR, error_details,
2641 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2642 return false;
2643 }
2644
max_open_incoming_bidirectional_streams() const2645 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2646 if (VersionHasIetfQuicFrames(transport_version())) {
2647 return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2648 }
2649 return stream_id_manager_.max_open_incoming_streams();
2650 }
2651
max_open_incoming_unidirectional_streams() const2652 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2653 if (VersionHasIetfQuicFrames(transport_version())) {
2654 return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2655 }
2656 return stream_id_manager_.max_open_incoming_streams();
2657 }
2658
SelectAlpn(const std::vector<absl::string_view> & alpns) const2659 std::vector<absl::string_view>::const_iterator QuicSession::SelectAlpn(
2660 const std::vector<absl::string_view>& alpns) const {
2661 const std::string alpn = AlpnForVersion(connection()->version());
2662 return std::find(alpns.cbegin(), alpns.cend(), alpn);
2663 }
2664
OnAlpnSelected(absl::string_view alpn)2665 void QuicSession::OnAlpnSelected(absl::string_view alpn) {
2666 QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2667 : "Client: ")
2668 << "ALPN selected: " << alpn;
2669 }
2670
NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level)2671 void QuicSession::NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level) {
2672 GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
2673 }
2674
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action)2675 void QuicSession::PerformActionOnActiveStreams(
2676 quiche::UnretainedCallback<bool(QuicStream*)> action) {
2677 std::vector<QuicStream*> active_streams;
2678 for (const auto& it : stream_map_) {
2679 if (!it.second->is_static() && !it.second->IsZombie()) {
2680 active_streams.push_back(it.second.get());
2681 }
2682 }
2683
2684 for (QuicStream* stream : active_streams) {
2685 if (!action(stream)) {
2686 return;
2687 }
2688 }
2689 }
2690
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action) const2691 void QuicSession::PerformActionOnActiveStreams(
2692 quiche::UnretainedCallback<bool(QuicStream*)> action) const {
2693 for (const auto& it : stream_map_) {
2694 if (!it.second->is_static() && !it.second->IsZombie() &&
2695 !action(it.second.get())) {
2696 return;
2697 }
2698 }
2699 }
2700
GetEncryptionLevelToSendApplicationData() const2701 EncryptionLevel QuicSession::GetEncryptionLevelToSendApplicationData() const {
2702 return connection_->framer().GetEncryptionLevelToSendApplicationData();
2703 }
2704
ProcessAllPendingStreams()2705 void QuicSession::ProcessAllPendingStreams() {
2706 std::vector<PendingStream*> pending_streams;
2707 pending_streams.reserve(pending_stream_map_.size());
2708 for (auto it = pending_stream_map_.cbegin(); it != pending_stream_map_.cend();
2709 ++it) {
2710 pending_streams.push_back(it->second.get());
2711 }
2712 for (auto* pending_stream : pending_streams) {
2713 MaybeProcessPendingStream(pending_stream);
2714 if (!connection()->connected()) {
2715 return;
2716 }
2717 }
2718 }
2719
ValidatePath(std::unique_ptr<QuicPathValidationContext> context,std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,PathValidationReason reason)2720 void QuicSession::ValidatePath(
2721 std::unique_ptr<QuicPathValidationContext> context,
2722 std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,
2723 PathValidationReason reason) {
2724 connection_->ValidatePath(std::move(context), std::move(result_delegate),
2725 reason);
2726 }
2727
HasPendingPathValidation() const2728 bool QuicSession::HasPendingPathValidation() const {
2729 return connection_->HasPendingPathValidation();
2730 }
2731
MigratePath(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,QuicPacketWriter * writer,bool owns_writer)2732 bool QuicSession::MigratePath(const QuicSocketAddress& self_address,
2733 const QuicSocketAddress& peer_address,
2734 QuicPacketWriter* writer, bool owns_writer) {
2735 return connection_->MigratePath(self_address, peer_address, writer,
2736 owns_writer);
2737 }
2738
ValidateToken(absl::string_view token)2739 bool QuicSession::ValidateToken(absl::string_view token) {
2740 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_SERVER);
2741 if (GetQuicFlag(quic_reject_retry_token_in_initial_packet)) {
2742 return false;
2743 }
2744 if (token.empty() || token[0] != kAddressTokenPrefix) {
2745 // Validate the prefix for token received in NEW_TOKEN frame.
2746 return false;
2747 }
2748 const bool valid = GetCryptoStream()->ValidateAddressToken(
2749 absl::string_view(token.data() + 1, token.length() - 1));
2750 if (valid) {
2751 const CachedNetworkParameters* cached_network_params =
2752 GetCryptoStream()->PreviousCachedNetworkParams();
2753 if (cached_network_params != nullptr &&
2754 cached_network_params->timestamp() > 0) {
2755 connection()->OnReceiveConnectionState(*cached_network_params);
2756 }
2757 }
2758 return valid;
2759 }
2760
OnServerPreferredAddressAvailable(const QuicSocketAddress & server_preferred_address)2761 void QuicSession::OnServerPreferredAddressAvailable(
2762 const QuicSocketAddress& server_preferred_address) {
2763 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
2764 if (visitor_ != nullptr) {
2765 visitor_->OnServerPreferredAddressAvailable(server_preferred_address);
2766 }
2767 }
2768
ProcessPendingStream(PendingStream * pending)2769 QuicStream* QuicSession::ProcessPendingStream(PendingStream* pending) {
2770 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
2771 QUICHE_DCHECK(connection()->connected());
2772 QuicStreamId stream_id = pending->id();
2773 QUIC_BUG_IF(bad pending stream, !IsIncomingStream(stream_id))
2774 << "Pending stream " << stream_id << " is not an incoming stream.";
2775 // TODO(b/305051334) check if this stream is incoming stream before making it
2776 // pending. If not, connection should be closed.
2777 StreamType stream_type = QuicUtils::GetStreamType(
2778 stream_id, perspective(), /*peer_initiated=*/true, version());
2779 switch (stream_type) {
2780 case BIDIRECTIONAL: {
2781 return ProcessBidirectionalPendingStream(pending);
2782 }
2783 case READ_UNIDIRECTIONAL: {
2784 return ProcessReadUnidirectionalPendingStream(pending);
2785 }
2786 case WRITE_UNIDIRECTIONAL:
2787 ABSL_FALLTHROUGH_INTENDED;
2788 case CRYPTO:
2789 QUICHE_BUG(unexpected pending stream)
2790 << "Unexpected pending stream " << stream_id << " with type "
2791 << stream_type;
2792 return nullptr;
2793 }
2794 return nullptr; // Unreachable, unless the enum value is out-of-range
2795 // (potentially undefined behavior)
2796 }
2797
2798 #undef ENDPOINT // undef for jumbo builds
2799 } // namespace quic
2800