• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_session.h"
6 
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10 
11 #include "absl/memory/memory.h"
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
15 #include "quiche/quic/core/frames/quic_window_update_frame.h"
16 #include "quiche/quic/core/quic_connection.h"
17 #include "quiche/quic/core/quic_connection_context.h"
18 #include "quiche/quic/core/quic_error_codes.h"
19 #include "quiche/quic/core/quic_flow_controller.h"
20 #include "quiche/quic/core/quic_stream_priority.h"
21 #include "quiche/quic/core/quic_types.h"
22 #include "quiche/quic/core/quic_utils.h"
23 #include "quiche/quic/core/quic_versions.h"
24 #include "quiche/quic/core/quic_write_blocked_list.h"
25 #include "quiche/quic/platform/api/quic_bug_tracker.h"
26 #include "quiche/quic/platform/api/quic_flag_utils.h"
27 #include "quiche/quic/platform/api/quic_flags.h"
28 #include "quiche/quic/platform/api/quic_logging.h"
29 #include "quiche/quic/platform/api/quic_server_stats.h"
30 #include "quiche/quic/platform/api/quic_stack_trace.h"
31 #include "quiche/common/platform/api/quiche_logging.h"
32 #include "quiche/common/quiche_callbacks.h"
33 #include "quiche/common/quiche_text_utils.h"
34 
35 namespace quic {
36 
37 namespace {
38 
39 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
40  public:
ClosedStreamsCleanUpDelegate(QuicSession * session)41   explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
42       : session_(session) {}
43   ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
44   ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
45       delete;
46 
GetConnectionContext()47   QuicConnectionContext* GetConnectionContext() override {
48     return (session_->connection() == nullptr)
49                ? nullptr
50                : session_->connection()->context();
51   }
52 
OnAlarm()53   void OnAlarm() override { session_->CleanUpClosedStreams(); }
54 
55  private:
56   QuicSession* session_;
57 };
58 
59 }  // namespace
60 
61 #define ENDPOINT \
62   (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
63 
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)64 QuicSession::QuicSession(
65     QuicConnection* connection, Visitor* owner, const QuicConfig& config,
66     const ParsedQuicVersionVector& supported_versions,
67     QuicStreamCount num_expected_unidirectional_static_streams)
68     : QuicSession(connection, owner, config, supported_versions,
69                   num_expected_unidirectional_static_streams, nullptr) {}
70 
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams,std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)71 QuicSession::QuicSession(
72     QuicConnection* connection, Visitor* owner, const QuicConfig& config,
73     const ParsedQuicVersionVector& supported_versions,
74     QuicStreamCount num_expected_unidirectional_static_streams,
75     std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
76     : connection_(connection),
77       perspective_(connection->perspective()),
78       visitor_(owner),
79       write_blocked_streams_(std::make_unique<QuicWriteBlockedList>()),
80       config_(config),
81       stream_id_manager_(perspective(), connection->transport_version(),
82                          kDefaultMaxStreamsPerConnection,
83                          config_.GetMaxBidirectionalStreamsToSend()),
84       ietf_streamid_manager_(perspective(), connection->version(), this, 0,
85                              num_expected_unidirectional_static_streams,
86                              config_.GetMaxBidirectionalStreamsToSend(),
87                              config_.GetMaxUnidirectionalStreamsToSend() +
88                                  num_expected_unidirectional_static_streams),
89       num_draining_streams_(0),
90       num_outgoing_draining_streams_(0),
91       num_static_streams_(0),
92       num_zombie_streams_(0),
93       flow_controller_(
94           this, QuicUtils::GetInvalidStreamId(connection->transport_version()),
95           /*is_connection_flow_controller*/ true,
96           connection->version().AllowsLowFlowControlLimits()
97               ? 0
98               : kMinimumFlowControlSendWindow,
99           config_.GetInitialSessionFlowControlWindowToSend(),
100           kSessionReceiveWindowLimit, perspective() == Perspective::IS_SERVER,
101           nullptr),
102       currently_writing_stream_id_(0),
103       transport_goaway_sent_(false),
104       transport_goaway_received_(false),
105       control_frame_manager_(this),
106       last_message_id_(0),
107       datagram_queue_(this, std::move(datagram_observer)),
108       closed_streams_clean_up_alarm_(nullptr),
109       supported_versions_(supported_versions),
110       is_configured_(false),
111       was_zero_rtt_rejected_(false),
112       liveness_testing_in_progress_(false),
113       limit_sending_max_streams_(
114           GetQuicReloadableFlag(quic_limit_sending_max_streams2)) {
115   closed_streams_clean_up_alarm_ =
116       absl::WrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
117           new ClosedStreamsCleanUpDelegate(this)));
118   if (VersionHasIetfQuicFrames(transport_version())) {
119     config_.SetMaxUnidirectionalStreamsToSend(
120         config_.GetMaxUnidirectionalStreamsToSend() +
121         num_expected_unidirectional_static_streams);
122   }
123 }
124 
Initialize()125 void QuicSession::Initialize() {
126   connection_->set_visitor(this);
127   connection_->SetSessionNotifier(this);
128   connection_->SetDataProducer(this);
129   connection_->SetUnackedMapInitialCapacity();
130   connection_->SetFromConfig(config_);
131   if (perspective_ == Perspective::IS_CLIENT) {
132     if (config_.HasClientRequestedIndependentOption(kAFFE, perspective_) &&
133         version().HasIetfQuicFrames()) {
134       connection_->set_can_receive_ack_frequency_frame();
135       config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
136     }
137   }
138   if (perspective() == Perspective::IS_SERVER &&
139       connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
140     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
141   }
142 
143   connection_->CreateConnectionIdManager();
144 
145   // On the server side, version negotiation has been done by the dispatcher,
146   // and the server session is created with the right version.
147   if (perspective() == Perspective::IS_SERVER) {
148     connection_->OnSuccessfulVersionNegotiation();
149   }
150 
151   if (QuicVersionUsesCryptoFrames(transport_version())) {
152     return;
153   }
154 
155   QUICHE_DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
156                    GetMutableCryptoStream()->id());
157 }
158 
~QuicSession()159 QuicSession::~QuicSession() {
160   if (closed_streams_clean_up_alarm_ != nullptr) {
161     closed_streams_clean_up_alarm_->PermanentCancel();
162   }
163 }
164 
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)165 PendingStream* QuicSession::PendingStreamOnStreamFrame(
166     const QuicStreamFrame& frame) {
167   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
168   QuicStreamId stream_id = frame.stream_id;
169 
170   PendingStream* pending = GetOrCreatePendingStream(stream_id);
171 
172   if (!pending) {
173     if (frame.fin) {
174       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
175       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
176     }
177     return nullptr;
178   }
179 
180   pending->OnStreamFrame(frame);
181   if (!connection()->connected()) {
182     return nullptr;
183   }
184   return pending;
185 }
186 
MaybeProcessPendingStream(PendingStream * pending)187 void QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
188   QUICHE_DCHECK(pending != nullptr);
189   QuicStreamId stream_id = pending->id();
190   std::optional<QuicResetStreamError> stop_sending_error_code =
191       pending->GetStopSendingErrorCode();
192   QuicStream* stream = ProcessPendingStream(pending);
193   if (stream != nullptr) {
194     // The pending stream should now be in the scope of normal streams.
195     QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
196         << "Stream " << stream_id << " not created";
197     pending_stream_map_.erase(stream_id);
198     if (stop_sending_error_code) {
199       stream->OnStopSending(*stop_sending_error_code);
200       if (!connection()->connected()) {
201         return;
202       }
203     }
204     stream->OnStreamCreatedFromPendingStream();
205     return;
206   }
207   // At this point, none of the bytes has been successfully consumed by the
208   // application layer. We should close the pending stream even if it is
209   // bidirectionl as no application will be able to write in a bidirectional
210   // stream with zero byte as input.
211   if (pending->sequencer()->IsClosed()) {
212     ClosePendingStream(stream_id);
213   }
214 }
215 
PendingStreamOnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)216 void QuicSession::PendingStreamOnWindowUpdateFrame(
217     const QuicWindowUpdateFrame& frame) {
218   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
219   PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
220   if (pending) {
221     pending->OnWindowUpdateFrame(frame);
222   }
223 }
224 
PendingStreamOnStopSendingFrame(const QuicStopSendingFrame & frame)225 void QuicSession::PendingStreamOnStopSendingFrame(
226     const QuicStopSendingFrame& frame) {
227   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
228   PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
229   if (pending) {
230     pending->OnStopSending(frame.error());
231   }
232 }
233 
OnStreamFrame(const QuicStreamFrame & frame)234 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
235   QuicStreamId stream_id = frame.stream_id;
236   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
237     connection()->CloseConnection(
238         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
239         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
240     return;
241   }
242 
243   if (ShouldProcessFrameByPendingStream(STREAM_FRAME, stream_id)) {
244     PendingStream* pending = PendingStreamOnStreamFrame(frame);
245     if (pending != nullptr && IsEncryptionEstablished()) {
246       MaybeProcessPendingStream(pending);
247     }
248     return;
249   }
250 
251   QuicStream* stream = GetOrCreateStream(stream_id);
252 
253   if (!stream) {
254     // The stream no longer exists, but we may still be interested in the
255     // final stream byte offset sent by the peer. A frame with a FIN can give
256     // us this offset.
257     if (frame.fin) {
258       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
259       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
260     }
261     return;
262   }
263   stream->OnStreamFrame(frame);
264 }
265 
OnCryptoFrame(const QuicCryptoFrame & frame)266 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
267   GetMutableCryptoStream()->OnCryptoFrame(frame);
268 }
269 
OnStopSendingFrame(const QuicStopSendingFrame & frame)270 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
271   // STOP_SENDING is in IETF QUIC only.
272   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
273   QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
274 
275   QuicStreamId stream_id = frame.stream_id;
276   // If Stream ID is invalid then close the connection.
277   // TODO(ianswett): This check is redundant to checks for IsClosedStream,
278   // but removing it requires removing multiple QUICHE_DCHECKs.
279   // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
280   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
281     QUIC_DVLOG(1) << ENDPOINT
282                   << "Received STOP_SENDING with invalid stream_id: "
283                   << stream_id << " Closing connection";
284     connection()->CloseConnection(
285         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
286         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
287     return;
288   }
289 
290   // If stream_id is READ_UNIDIRECTIONAL, close the connection.
291   if (QuicUtils::GetStreamType(stream_id, perspective(),
292                                IsIncomingStream(stream_id),
293                                version()) == READ_UNIDIRECTIONAL) {
294     QUIC_DVLOG(1) << ENDPOINT
295                   << "Received STOP_SENDING for a read-only stream_id: "
296                   << stream_id << ".";
297     connection()->CloseConnection(
298         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
299         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
300     return;
301   }
302 
303   if (visitor_) {
304     visitor_->OnStopSendingReceived(frame);
305   }
306   if (ShouldProcessFrameByPendingStream(STOP_SENDING_FRAME, stream_id)) {
307     PendingStreamOnStopSendingFrame(frame);
308     return;
309   }
310 
311   QuicStream* stream = GetOrCreateStream(stream_id);
312   if (!stream) {
313     // Errors are handled by GetOrCreateStream.
314     return;
315   }
316 
317   stream->OnStopSending(frame.error());
318 }
319 
OnPacketDecrypted(EncryptionLevel level)320 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
321   GetMutableCryptoStream()->OnPacketDecrypted(level);
322   if (liveness_testing_in_progress_) {
323     liveness_testing_in_progress_ = false;
324     OnCanCreateNewOutgoingStream(/*unidirectional=*/false);
325   }
326 }
327 
OnOneRttPacketAcknowledged()328 void QuicSession::OnOneRttPacketAcknowledged() {
329   GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
330 }
331 
OnHandshakePacketSent()332 void QuicSession::OnHandshakePacketSent() {
333   GetMutableCryptoStream()->OnHandshakePacketSent();
334 }
335 
336 std::unique_ptr<QuicDecrypter>
AdvanceKeysAndCreateCurrentOneRttDecrypter()337 QuicSession::AdvanceKeysAndCreateCurrentOneRttDecrypter() {
338   return GetMutableCryptoStream()->AdvanceKeysAndCreateCurrentOneRttDecrypter();
339 }
340 
CreateCurrentOneRttEncrypter()341 std::unique_ptr<QuicEncrypter> QuicSession::CreateCurrentOneRttEncrypter() {
342   return GetMutableCryptoStream()->CreateCurrentOneRttEncrypter();
343 }
344 
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)345 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
346   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
347   QuicStreamId stream_id = frame.stream_id;
348 
349   PendingStream* pending = GetOrCreatePendingStream(stream_id);
350 
351   if (!pending) {
352     HandleRstOnValidNonexistentStream(frame);
353     return;
354   }
355 
356   pending->OnRstStreamFrame(frame);
357   // At this point, none of the bytes has been consumed by the application
358   // layer. It is safe to close the pending stream even if it is bidirectionl as
359   // no application will be able to write in a bidirectional stream with zero
360   // byte as input.
361   ClosePendingStream(stream_id);
362 }
363 
OnRstStream(const QuicRstStreamFrame & frame)364 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
365   QuicStreamId stream_id = frame.stream_id;
366   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
367     connection()->CloseConnection(
368         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
369         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
370     return;
371   }
372 
373   if (VersionHasIetfQuicFrames(transport_version()) &&
374       QuicUtils::GetStreamType(stream_id, perspective(),
375                                IsIncomingStream(stream_id),
376                                version()) == WRITE_UNIDIRECTIONAL) {
377     connection()->CloseConnection(
378         QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
379         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
380     return;
381   }
382 
383   if (visitor_) {
384     visitor_->OnRstStreamReceived(frame);
385   }
386 
387   if (ShouldProcessFrameByPendingStream(RST_STREAM_FRAME, stream_id)) {
388     PendingStreamOnRstStream(frame);
389     return;
390   }
391 
392   QuicStream* stream = GetOrCreateStream(stream_id);
393 
394   if (!stream) {
395     HandleRstOnValidNonexistentStream(frame);
396     return;  // Errors are handled by GetOrCreateStream.
397   }
398   stream->OnStreamReset(frame);
399 }
400 
OnGoAway(const QuicGoAwayFrame &)401 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
402   QUIC_BUG_IF(quic_bug_12435_1, version().UsesHttp3())
403       << "gQUIC GOAWAY received on version " << version();
404 
405   transport_goaway_received_ = true;
406 }
407 
OnMessageReceived(absl::string_view message)408 void QuicSession::OnMessageReceived(absl::string_view message) {
409   QUIC_DVLOG(1) << ENDPOINT << "Received message of length "
410                 << message.length();
411   QUIC_DVLOG(2) << ENDPOINT << "Contents of message of length "
412                 << message.length() << ":" << std::endl
413                 << quiche::QuicheTextUtils::HexDump(message);
414 }
415 
OnHandshakeDoneReceived()416 void QuicSession::OnHandshakeDoneReceived() {
417   QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
418   GetMutableCryptoStream()->OnHandshakeDoneReceived();
419 }
420 
OnNewTokenReceived(absl::string_view token)421 void QuicSession::OnNewTokenReceived(absl::string_view token) {
422   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
423   GetMutableCryptoStream()->OnNewTokenReceived(token);
424 }
425 
426 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)427 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
428                                                 ConnectionCloseSource source) {
429   if (error != QUIC_NO_ERROR) {
430     if (source == ConnectionCloseSource::FROM_SELF) {
431       QUIC_SERVER_HISTOGRAM_ENUM(
432           "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
433           "QuicErrorCode for server-closed connections.");
434     } else {
435       QUIC_SERVER_HISTOGRAM_ENUM(
436           "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
437           "QuicErrorCode for client-closed connections.");
438     }
439   }
440 }
441 
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)442 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
443                                      ConnectionCloseSource source) {
444   QUICHE_DCHECK(!connection_->connected());
445   if (perspective() == Perspective::IS_SERVER) {
446     RecordConnectionCloseAtServer(frame.quic_error_code, source);
447   }
448 
449   if (on_closed_frame_.quic_error_code == QUIC_NO_ERROR) {
450     // Save all of the connection close information
451     on_closed_frame_ = frame;
452     source_ = source;
453   }
454 
455   GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
456 
457   PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
458     QuicStreamId id = stream->id();
459     stream->OnConnectionClosed(frame.quic_error_code, source);
460     auto it = stream_map_.find(id);
461     if (it != stream_map_.end()) {
462       QUIC_BUG_IF(quic_bug_12435_2, !it->second->IsZombie())
463           << ENDPOINT << "Non-zombie stream " << id
464           << " failed to close under OnConnectionClosed";
465     }
466     return true;
467   });
468 
469   closed_streams_clean_up_alarm_->Cancel();
470 
471   if (visitor_) {
472     visitor_->OnConnectionClosed(connection_->GetOneActiveServerConnectionId(),
473                                  frame.quic_error_code, frame.error_details,
474                                  source);
475   }
476 }
477 
OnWriteBlocked()478 void QuicSession::OnWriteBlocked() {
479   if (!connection_->connected()) {
480     return;
481   }
482   if (visitor_) {
483     visitor_->OnWriteBlocked(connection_);
484   }
485 }
486 
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)487 void QuicSession::OnSuccessfulVersionNegotiation(
488     const ParsedQuicVersion& /*version*/) {}
489 
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)490 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
491                                    const QuicSocketAddress& peer_address,
492                                    bool is_connectivity_probe) {
493   QUICHE_DCHECK(!connection_->ignore_gquic_probing());
494   if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
495     // Server only sends back a connectivity probe after received a
496     // connectivity probe from a new peer address.
497     connection_->SendConnectivityProbingPacket(nullptr, peer_address);
498   }
499 }
500 
OnPathDegrading()501 void QuicSession::OnPathDegrading() {}
502 
OnForwardProgressMadeAfterPathDegrading()503 void QuicSession::OnForwardProgressMadeAfterPathDegrading() {}
504 
AllowSelfAddressChange() const505 bool QuicSession::AllowSelfAddressChange() const { return false; }
506 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)507 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
508   // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
509   // assume that it still exists.
510   QuicStreamId stream_id = frame.stream_id;
511   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
512     // This is a window update that applies to the connection, rather than an
513     // individual stream.
514     QUIC_DVLOG(1) << ENDPOINT
515                   << "Received connection level flow control window "
516                      "update with max data: "
517                   << frame.max_data;
518     flow_controller_.UpdateSendWindowOffset(frame.max_data);
519     return;
520   }
521 
522   if (VersionHasIetfQuicFrames(transport_version()) &&
523       QuicUtils::GetStreamType(stream_id, perspective(),
524                                IsIncomingStream(stream_id),
525                                version()) == READ_UNIDIRECTIONAL) {
526     connection()->CloseConnection(
527         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
528         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
529         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
530     return;
531   }
532 
533   if (ShouldProcessFrameByPendingStream(WINDOW_UPDATE_FRAME, stream_id)) {
534     PendingStreamOnWindowUpdateFrame(frame);
535     return;
536   }
537 
538   QuicStream* stream = GetOrCreateStream(stream_id);
539   if (stream != nullptr) {
540     stream->OnWindowUpdateFrame(frame);
541   }
542 }
543 
OnBlockedFrame(const QuicBlockedFrame & frame)544 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
545   // TODO(rjshade): Compare our flow control receive windows for specified
546   //                streams: if we have a large window then maybe something
547   //                had gone wrong with the flow control accounting.
548   QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
549                   << frame.stream_id << ", offset: " << frame.offset;
550 }
551 
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)552 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
553                                             uint64_t previous_bytes_written,
554                                             bool previous_fin_sent) {
555   if (  // Stream should not be closed.
556       !stream->write_side_closed() &&
557       // Not connection flow control blocked.
558       !flow_controller_.IsBlocked() &&
559       // Detect lack of forward progress.
560       previous_bytes_written == stream->stream_bytes_written() &&
561       previous_fin_sent == stream->fin_sent()) {
562     stream->set_busy_counter(stream->busy_counter() + 1);
563     QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
564                   << stream->id() << " stream_bytes_written "
565                   << stream->stream_bytes_written() << " fin "
566                   << stream->fin_sent() << " count " << stream->busy_counter();
567     // Wait a few iterations before firing, the exact count is
568     // arbitrary, more than a few to cover a few test-only false
569     // positives.
570     if (stream->busy_counter() > 20) {
571       QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
572                       << stream->id() << " stream_bytes_written "
573                       << stream->stream_bytes_written() << " fin "
574                       << stream->fin_sent();
575       return false;
576     }
577   } else {
578     stream->set_busy_counter(0);
579   }
580   return true;
581 }
582 
CheckStreamWriteBlocked(QuicStream * stream) const583 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
584   if (!stream->write_side_closed() && stream->HasBufferedData() &&
585       !stream->IsFlowControlBlocked() &&
586       !write_blocked_streams_->IsStreamBlocked(stream->id())) {
587     QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
588                      << " has buffered " << stream->BufferedDataBytes()
589                      << " bytes, and is not flow control blocked, "
590                         "but it is not in the write block list.";
591     return false;
592   }
593   return true;
594 }
595 
OnCanWrite()596 void QuicSession::OnCanWrite() {
597   if (connection_->framer().is_processing_packet()) {
598     // Do not write data in the middle of packet processing because rest
599     // frames in the packet may change the data to write. For example, lost
600     // data could be acknowledged. Also, connection is going to emit
601     // OnCanWrite signal post packet processing.
602     QUIC_BUG(session_write_mid_packet_processing)
603         << ENDPOINT << "Try to write mid packet processing.";
604     return;
605   }
606   if (!RetransmitLostData()) {
607     // Cannot finish retransmitting lost data, connection is write blocked.
608     QUIC_DVLOG(1) << ENDPOINT
609                   << "Cannot finish retransmitting lost data, connection is "
610                      "write blocked.";
611     return;
612   }
613   // We limit the number of writes to the number of pending streams. If more
614   // streams become pending, WillingAndAbleToWrite will be true, which will
615   // cause the connection to request resumption before yielding to other
616   // connections.
617   // If we are connection level flow control blocked, then only allow the
618   // crypto and headers streams to try writing as all other streams will be
619   // blocked.
620   size_t num_writes = flow_controller_.IsBlocked()
621                           ? write_blocked_streams_->NumBlockedSpecialStreams()
622                           : write_blocked_streams_->NumBlockedStreams();
623   if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
624       datagram_queue_.empty() &&
625       (!QuicVersionUsesCryptoFrames(transport_version()) ||
626        !GetCryptoStream()->HasBufferedCryptoFrames())) {
627     return;
628   }
629 
630   QuicConnection::ScopedPacketFlusher flusher(connection_);
631   if (QuicVersionUsesCryptoFrames(transport_version())) {
632     QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
633     if (crypto_stream->HasBufferedCryptoFrames()) {
634       crypto_stream->WriteBufferedCryptoFrames();
635     }
636     if ((GetQuicReloadableFlag(
637              quic_no_write_control_frame_upon_connection_close) &&
638          !connection_->connected()) ||
639         crypto_stream->HasBufferedCryptoFrames()) {
640       // Cannot finish writing buffered crypto frames, connection is either
641       // write blocked or closed.
642       return;
643     }
644   }
645   if (control_frame_manager_.WillingToWrite()) {
646     control_frame_manager_.OnCanWrite();
647   }
648   if (version().UsesTls() && GetHandshakeState() != HANDSHAKE_CONFIRMED &&
649       connection_->in_probe_time_out()) {
650     QUIC_CODE_COUNT(quic_donot_pto_stream_data_before_handshake_confirmed);
651     // Do not PTO stream data before handshake gets confirmed.
652     return;
653   }
654   // TODO(b/147146815): this makes all datagrams go before stream data.  We
655   // should have a better priority scheme for this.
656   if (!datagram_queue_.empty()) {
657     size_t written = datagram_queue_.SendDatagrams();
658     QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
659     if (!datagram_queue_.empty()) {
660       return;
661     }
662   }
663   std::vector<QuicStreamId> last_writing_stream_ids;
664   for (size_t i = 0; i < num_writes; ++i) {
665     if (!(write_blocked_streams_->HasWriteBlockedSpecialStream() ||
666           write_blocked_streams_->HasWriteBlockedDataStreams())) {
667       // Writing one stream removed another!? Something's broken.
668       QUIC_BUG(quic_bug_10866_1)
669           << "WriteBlockedStream is missing, num_writes: " << num_writes
670           << ", finished_writes: " << i
671           << ", connected: " << connection_->connected()
672           << ", connection level flow control blocked: "
673           << flow_controller_.IsBlocked();
674       for (QuicStreamId id : last_writing_stream_ids) {
675         QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
676       }
677       connection_->CloseConnection(QUIC_INTERNAL_ERROR,
678                                    "WriteBlockedStream is missing",
679                                    ConnectionCloseBehavior::SILENT_CLOSE);
680       return;
681     }
682     if (!CanWriteStreamData()) {
683       return;
684     }
685     currently_writing_stream_id_ = write_blocked_streams_->PopFront();
686     last_writing_stream_ids.push_back(currently_writing_stream_id_);
687     QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
688                   << currently_writing_stream_id_ << " from write-blocked list";
689     QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
690     if (stream != nullptr && !stream->IsFlowControlBlocked()) {
691       // If the stream can't write all bytes it'll re-add itself to the blocked
692       // list.
693       uint64_t previous_bytes_written = stream->stream_bytes_written();
694       bool previous_fin_sent = stream->fin_sent();
695       QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
696                     << " bytes_written " << previous_bytes_written << " fin "
697                     << previous_fin_sent;
698       stream->OnCanWrite();
699       QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
700       QUICHE_DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
701                                               previous_fin_sent));
702     }
703     currently_writing_stream_id_ = 0;
704   }
705 }
706 
WillingAndAbleToWrite() const707 bool QuicSession::WillingAndAbleToWrite() const {
708   // Schedule a write when:
709   // 1) control frame manager has pending or new control frames, or
710   // 2) any stream has pending retransmissions, or
711   // 3) If the crypto or headers streams are blocked, or
712   // 4) connection is not flow control blocked and there are write blocked
713   // streams.
714   if (QuicVersionUsesCryptoFrames(transport_version())) {
715     if (HasPendingHandshake()) {
716       return true;
717     }
718     if (!IsEncryptionEstablished()) {
719       return false;
720     }
721   }
722   if (control_frame_manager_.WillingToWrite() ||
723       !streams_with_pending_retransmission_.empty()) {
724     return true;
725   }
726   if (flow_controller_.IsBlocked()) {
727     if (VersionUsesHttp3(transport_version())) {
728       return false;
729     }
730     // Crypto and headers streams are not blocked by connection level flow
731     // control.
732     return write_blocked_streams_->HasWriteBlockedSpecialStream();
733   }
734   return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
735          write_blocked_streams_->HasWriteBlockedDataStreams();
736 }
737 
GetStreamsInfoForLogging() const738 std::string QuicSession::GetStreamsInfoForLogging() const {
739   std::string info = absl::StrCat(
740       "num_active_streams: ", GetNumActiveStreams(),
741       ", num_pending_streams: ", pending_streams_size(),
742       ", num_outgoing_draining_streams: ", num_outgoing_draining_streams(),
743       " ");
744   // Log info for up to 5 streams.
745   size_t i = 5;
746   for (const auto& it : stream_map_) {
747     if (it.second->is_static()) {
748       continue;
749     }
750     // Calculate the stream creation delay.
751     const QuicTime::Delta delay =
752         connection_->clock()->ApproximateNow() - it.second->creation_time();
753     absl::StrAppend(
754         &info, "{", it.second->id(), ":", delay.ToDebuggingValue(), ";",
755         it.second->stream_bytes_written(), ",", it.second->fin_sent(), ",",
756         it.second->HasBufferedData(), ",", it.second->fin_buffered(), ";",
757         it.second->stream_bytes_read(), ",", it.second->fin_received(), "}");
758     --i;
759     if (i == 0) {
760       break;
761     }
762   }
763   return info;
764 }
765 
HasPendingHandshake() const766 bool QuicSession::HasPendingHandshake() const {
767   if (QuicVersionUsesCryptoFrames(transport_version())) {
768     return GetCryptoStream()->HasPendingCryptoRetransmission() ||
769            GetCryptoStream()->HasBufferedCryptoFrames();
770   }
771   return streams_with_pending_retransmission_.contains(
772              QuicUtils::GetCryptoStreamId(transport_version())) ||
773          write_blocked_streams_->IsStreamBlocked(
774              QuicUtils::GetCryptoStreamId(transport_version()));
775 }
776 
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)777 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
778                                    const QuicSocketAddress& peer_address,
779                                    const QuicReceivedPacket& packet) {
780   QuicConnectionContextSwitcher cs(connection_->context());
781   connection_->ProcessUdpPacket(self_address, peer_address, packet);
782 }
783 
on_closed_frame_string() const784 std::string QuicSession::on_closed_frame_string() const {
785   std::stringstream ss;
786   ss << on_closed_frame_;
787   if (source_.has_value()) {
788     ss << " " << ConnectionCloseSourceToString(*source_);
789   }
790   return ss.str();
791 }
792 
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,EncryptionLevel level)793 QuicConsumedData QuicSession::WritevData(QuicStreamId id, size_t write_length,
794                                          QuicStreamOffset offset,
795                                          StreamSendingState state,
796                                          TransmissionType type,
797                                          EncryptionLevel level) {
798   QUIC_BUG_IF(session writevdata when disconnected, !connection()->connected())
799       << ENDPOINT << "Try to write stream data when connection is closed: "
800       << on_closed_frame_string();
801   if (!IsEncryptionEstablished() &&
802       !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
803     // Do not let streams write without encryption. The calling stream will end
804     // up write blocked until OnCanWrite is next called.
805     if (was_zero_rtt_rejected_ && !OneRttKeysAvailable()) {
806       QUICHE_DCHECK(version().UsesTls() &&
807                     perspective() == Perspective::IS_CLIENT);
808       QUIC_DLOG(INFO) << ENDPOINT
809                       << "Suppress the write while 0-RTT gets rejected and "
810                          "1-RTT keys are not available. Version: "
811                       << ParsedQuicVersionToString(version());
812     } else if (version().UsesTls() || perspective() == Perspective::IS_SERVER) {
813       QUIC_BUG(quic_bug_10866_2)
814           << ENDPOINT << "Try to send data of stream " << id
815           << " before encryption is established. Version: "
816           << ParsedQuicVersionToString(version());
817     } else {
818       // In QUIC crypto, this could happen when the client sends full CHLO and
819       // 0-RTT request, then receives an inchoate REJ and sends an inchoate
820       // CHLO. The client then gets the ACK of the inchoate CHLO or the client
821       // gets the full REJ and needs to verify the proof (before it sends the
822       // full CHLO), such that there is no outstanding crypto data.
823       // Retransmission alarm fires in TLP mode which tries to retransmit the
824       // 0-RTT request (without encryption).
825       QUIC_DLOG(INFO) << ENDPOINT << "Try to send data of stream " << id
826                       << " before encryption is established.";
827     }
828     return QuicConsumedData(0, false);
829   }
830 
831   SetTransmissionType(type);
832   QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
833 
834   QuicConsumedData data =
835       connection_->SendStreamData(id, write_length, offset, state);
836   if (type == NOT_RETRANSMISSION) {
837     // This is new stream data.
838     write_blocked_streams_->UpdateBytesForStream(id, data.bytes_consumed);
839   }
840 
841   return data;
842 }
843 
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)844 size_t QuicSession::SendCryptoData(EncryptionLevel level, size_t write_length,
845                                    QuicStreamOffset offset,
846                                    TransmissionType type) {
847   QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
848   if (!connection()->framer().HasEncrypterOfEncryptionLevel(level)) {
849     const std::string error_details = absl::StrCat(
850         "Try to send crypto data with missing keys of encryption level: ",
851         EncryptionLevelToString(level));
852     QUIC_BUG(quic_bug_10866_3) << ENDPOINT << error_details;
853     connection()->CloseConnection(
854         QUIC_MISSING_WRITE_KEYS, error_details,
855         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
856     return 0;
857   }
858   SetTransmissionType(type);
859   QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
860   const auto bytes_consumed =
861       connection_->SendCryptoData(level, write_length, offset);
862   return bytes_consumed;
863 }
864 
OnControlFrameManagerError(QuicErrorCode error_code,std::string error_details)865 void QuicSession::OnControlFrameManagerError(QuicErrorCode error_code,
866                                              std::string error_details) {
867   connection_->CloseConnection(
868       error_code, error_details,
869       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
870 }
871 
WriteControlFrame(const QuicFrame & frame,TransmissionType type)872 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
873                                     TransmissionType type) {
874   QUIC_BUG_IF(quic_bug_12435_11, !connection()->connected())
875       << ENDPOINT
876       << absl::StrCat("Try to write control frame: ", QuicFrameToString(frame),
877                       " when connection is closed: ")
878       << on_closed_frame_string();
879   if (!IsEncryptionEstablished()) {
880     // Suppress the write before encryption gets established.
881     return false;
882   }
883   if (limit_sending_max_streams_ &&
884       connection_->framer().is_processing_packet()) {
885     QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 3, 3);
886     // The frame will be sent when OnCanWrite() is called.
887     return false;
888   }
889   SetTransmissionType(type);
890   QuicConnection::ScopedEncryptionLevelContext context(
891       connection(), GetEncryptionLevelToSendApplicationData());
892   return connection_->SendControlFrame(frame);
893 }
894 
ResetStream(QuicStreamId id,QuicRstStreamErrorCode error)895 void QuicSession::ResetStream(QuicStreamId id, QuicRstStreamErrorCode error) {
896   QuicStream* stream = GetStream(id);
897   if (stream != nullptr && stream->is_static()) {
898     connection()->CloseConnection(
899         QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
900         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
901     return;
902   }
903 
904   if (stream != nullptr) {
905     stream->Reset(error);
906     return;
907   }
908 
909   QuicConnection::ScopedPacketFlusher flusher(connection());
910   MaybeSendStopSendingFrame(id, QuicResetStreamError::FromInternal(error));
911   MaybeSendRstStreamFrame(id, QuicResetStreamError::FromInternal(error), 0);
912 }
913 
MaybeSendRstStreamFrame(QuicStreamId id,QuicResetStreamError error,QuicStreamOffset bytes_written)914 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
915                                           QuicResetStreamError error,
916                                           QuicStreamOffset bytes_written) {
917   if (!connection()->connected()) {
918     return;
919   }
920   if (!VersionHasIetfQuicFrames(transport_version()) ||
921       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
922                                version()) != READ_UNIDIRECTIONAL) {
923     control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
924   }
925 
926   connection_->OnStreamReset(id, error.internal_code());
927 }
928 
MaybeSendStopSendingFrame(QuicStreamId id,QuicResetStreamError error)929 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
930                                             QuicResetStreamError error) {
931   if (!connection()->connected()) {
932     return;
933   }
934   if (VersionHasIetfQuicFrames(transport_version()) &&
935       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
936                                version()) != WRITE_UNIDIRECTIONAL) {
937     control_frame_manager_.WriteOrBufferStopSending(error, id);
938   }
939 }
940 
SendGoAway(QuicErrorCode error_code,const std::string & reason)941 void QuicSession::SendGoAway(QuicErrorCode error_code,
942                              const std::string& reason) {
943   // GOAWAY frame is not supported in IETF QUIC.
944   QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
945   if (!IsEncryptionEstablished()) {
946     QUIC_CODE_COUNT(quic_goaway_before_encryption_established);
947     connection_->CloseConnection(
948         error_code, reason,
949         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
950     return;
951   }
952   if (transport_goaway_sent_) {
953     return;
954   }
955   transport_goaway_sent_ = true;
956 
957   QUICHE_DCHECK_EQ(perspective(), Perspective::IS_SERVER);
958   control_frame_manager_.WriteOrBufferGoAway(
959       error_code,
960       QuicUtils::GetMaxClientInitiatedBidirectionalStreamId(
961           transport_version()),
962       reason);
963 }
964 
SendBlocked(QuicStreamId id,QuicStreamOffset byte_offset)965 void QuicSession::SendBlocked(QuicStreamId id, QuicStreamOffset byte_offset) {
966   control_frame_manager_.WriteOrBufferBlocked(id, byte_offset);
967 }
968 
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)969 void QuicSession::SendWindowUpdate(QuicStreamId id,
970                                    QuicStreamOffset byte_offset) {
971   control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
972 }
973 
OnStreamError(QuicErrorCode error_code,std::string error_details)974 void QuicSession::OnStreamError(QuicErrorCode error_code,
975                                 std::string error_details) {
976   connection_->CloseConnection(
977       error_code, error_details,
978       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
979 }
980 
OnStreamError(QuicErrorCode error_code,QuicIetfTransportErrorCodes ietf_error,std::string error_details)981 void QuicSession::OnStreamError(QuicErrorCode error_code,
982                                 QuicIetfTransportErrorCodes ietf_error,
983                                 std::string error_details) {
984   connection_->CloseConnection(
985       error_code, ietf_error, error_details,
986       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
987 }
988 
CanSendMaxStreams()989 bool QuicSession::CanSendMaxStreams() {
990   if (!limit_sending_max_streams_) {
991     return true;
992   }
993   QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 1, 3);
994   return control_frame_manager_.NumBufferedMaxStreams() < 2;
995 }
996 
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)997 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
998                                  bool unidirectional) {
999   if (!is_configured_) {
1000     QUIC_BUG(quic_bug_10866_5)
1001         << "Try to send max streams before config negotiated.";
1002     return;
1003   }
1004   control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
1005 }
1006 
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)1007 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
1008     const QuicStreamId id, QuicStreamOffset offset) {
1009   locally_closed_streams_highest_offset_[id] = offset;
1010 }
1011 
OnStreamClosed(QuicStreamId stream_id)1012 void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
1013   QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
1014   StreamMap::iterator it = stream_map_.find(stream_id);
1015   if (it == stream_map_.end()) {
1016     QUIC_BUG(quic_bug_10866_6)
1017         << ENDPOINT << "Stream is already closed: " << stream_id;
1018     return;
1019   }
1020   QuicStream* stream = it->second.get();
1021   StreamType type = stream->type();
1022 
1023   const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
1024   if (stream_waiting_for_acks) {
1025     // The stream needs to be kept alive because it's waiting for acks.
1026     ++num_zombie_streams_;
1027   } else {
1028     closed_streams_.push_back(std::move(it->second));
1029     stream_map_.erase(it);
1030     // Do not retransmit data of a closed stream.
1031     streams_with_pending_retransmission_.erase(stream_id);
1032     if (!closed_streams_clean_up_alarm_->IsSet()) {
1033       closed_streams_clean_up_alarm_->Set(
1034           connection_->clock()->ApproximateNow());
1035     }
1036     connection_->QuicBugIfHasPendingFrames(stream_id);
1037   }
1038 
1039   if (!stream->HasReceivedFinalOffset()) {
1040     // If we haven't received a FIN or RST for this stream, we need to keep
1041     // track of the how many bytes the stream's flow controller believes it has
1042     // received, for accurate connection level flow control accounting.
1043     // If this is an outgoing stream, it is technically open from peer's
1044     // perspective. Do not inform stream Id manager yet.
1045     QUICHE_DCHECK(!stream->was_draining());
1046     InsertLocallyClosedStreamsHighestOffset(
1047         stream_id, stream->highest_received_byte_offset());
1048     return;
1049   }
1050 
1051   const bool stream_was_draining = stream->was_draining();
1052   QUIC_DVLOG_IF(1, stream_was_draining)
1053       << ENDPOINT << "Stream " << stream_id << " was draining";
1054   if (stream_was_draining) {
1055     QUIC_BUG_IF(quic_bug_12435_4, num_draining_streams_ == 0);
1056     --num_draining_streams_;
1057     if (!IsIncomingStream(stream_id)) {
1058       QUIC_BUG_IF(quic_bug_12435_5, num_outgoing_draining_streams_ == 0);
1059       --num_outgoing_draining_streams_;
1060     }
1061     // Stream Id manager has been informed with draining streams.
1062     return;
1063   }
1064   if (!VersionHasIetfQuicFrames(transport_version())) {
1065     stream_id_manager_.OnStreamClosed(
1066         /*is_incoming=*/IsIncomingStream(stream_id));
1067   }
1068   if (!connection_->connected()) {
1069     return;
1070   }
1071   if (IsIncomingStream(stream_id)) {
1072     // Stream Id manager is only interested in peer initiated stream IDs.
1073     if (VersionHasIetfQuicFrames(transport_version())) {
1074       ietf_streamid_manager_.OnStreamClosed(stream_id);
1075     }
1076     return;
1077   }
1078   if (!VersionHasIetfQuicFrames(transport_version())) {
1079     OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
1080   }
1081 }
1082 
ClosePendingStream(QuicStreamId stream_id)1083 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
1084   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
1085   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1086   pending_stream_map_.erase(stream_id);
1087   if (connection_->connected()) {
1088     ietf_streamid_manager_.OnStreamClosed(stream_id);
1089   }
1090 }
1091 
ShouldProcessFrameByPendingStream(QuicFrameType type,QuicStreamId id) const1092 bool QuicSession::ShouldProcessFrameByPendingStream(QuicFrameType type,
1093                                                     QuicStreamId id) const {
1094   return UsesPendingStreamForFrame(type, id) &&
1095          stream_map_.find(id) == stream_map_.end();
1096 }
1097 
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)1098 void QuicSession::OnFinalByteOffsetReceived(
1099     QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
1100   auto it = locally_closed_streams_highest_offset_.find(stream_id);
1101   if (it == locally_closed_streams_highest_offset_.end()) {
1102     return;
1103   }
1104 
1105   QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
1106                 << final_byte_offset << " for stream " << stream_id;
1107   QuicByteCount offset_diff = final_byte_offset - it->second;
1108   if (flow_controller_.UpdateHighestReceivedOffset(
1109           flow_controller_.highest_received_byte_offset() + offset_diff)) {
1110     // If the final offset violates flow control, close the connection now.
1111     if (flow_controller_.FlowControlViolation()) {
1112       connection_->CloseConnection(
1113           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
1114           "Connection level flow control violation",
1115           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1116       return;
1117     }
1118   }
1119 
1120   flow_controller_.AddBytesConsumed(offset_diff);
1121   locally_closed_streams_highest_offset_.erase(it);
1122   if (!VersionHasIetfQuicFrames(transport_version())) {
1123     stream_id_manager_.OnStreamClosed(
1124         /*is_incoming=*/IsIncomingStream(stream_id));
1125   }
1126   if (IsIncomingStream(stream_id)) {
1127     if (VersionHasIetfQuicFrames(transport_version())) {
1128       ietf_streamid_manager_.OnStreamClosed(stream_id);
1129     }
1130   } else if (!VersionHasIetfQuicFrames(transport_version())) {
1131     OnCanCreateNewOutgoingStream(false);
1132   }
1133 }
1134 
IsEncryptionEstablished() const1135 bool QuicSession::IsEncryptionEstablished() const {
1136   if (GetCryptoStream() == nullptr) {
1137     return false;
1138   }
1139   return GetCryptoStream()->encryption_established();
1140 }
1141 
OneRttKeysAvailable() const1142 bool QuicSession::OneRttKeysAvailable() const {
1143   if (GetCryptoStream() == nullptr) {
1144     return false;
1145   }
1146   return GetCryptoStream()->one_rtt_keys_available();
1147 }
1148 
OnConfigNegotiated()1149 void QuicSession::OnConfigNegotiated() {
1150   // In versions with TLS, the configs will be set twice if 0-RTT is available.
1151   // In the second config setting, 1-RTT keys are guaranteed to be available.
1152   if (version().UsesTls() && is_configured_ &&
1153       connection_->encryption_level() != ENCRYPTION_FORWARD_SECURE) {
1154     QUIC_BUG(quic_bug_12435_6)
1155         << ENDPOINT
1156         << "1-RTT keys missing when config is negotiated for the second time.";
1157     connection_->CloseConnection(
1158         QUIC_INTERNAL_ERROR,
1159         "1-RTT keys missing when config is negotiated for the second time.",
1160         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1161     return;
1162   }
1163 
1164   QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
1165   connection_->SetFromConfig(config_);
1166 
1167   if (VersionHasIetfQuicFrames(transport_version())) {
1168     uint32_t max_streams = 0;
1169     if (config_.HasReceivedMaxBidirectionalStreams()) {
1170       max_streams = config_.ReceivedMaxBidirectionalStreams();
1171     }
1172     if (was_zero_rtt_rejected_ &&
1173         max_streams <
1174             ietf_streamid_manager_.outgoing_bidirectional_stream_count()) {
1175       connection_->CloseConnection(
1176           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1177           absl::StrCat(
1178               "Server rejected 0-RTT, aborting because new bidirectional "
1179               "initial stream limit ",
1180               max_streams, " is less than current open streams: ",
1181               ietf_streamid_manager_.outgoing_bidirectional_stream_count()),
1182           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1183       return;
1184     }
1185     QUIC_DVLOG(1) << ENDPOINT
1186                   << "Setting Bidirectional outgoing_max_streams_ to "
1187                   << max_streams;
1188     if (perspective_ == Perspective::IS_CLIENT &&
1189         max_streams <
1190             ietf_streamid_manager_.max_outgoing_bidirectional_streams()) {
1191       connection_->CloseConnection(
1192           was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1193                                  : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1194           absl::StrCat(
1195               was_zero_rtt_rejected_
1196                   ? "Server rejected 0-RTT, aborting because "
1197                   : "",
1198               "new bidirectional limit ", max_streams,
1199               " decreases the current limit: ",
1200               ietf_streamid_manager_.max_outgoing_bidirectional_streams()),
1201           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1202       return;
1203     }
1204     if (ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1205             max_streams)) {
1206       OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1207     }
1208 
1209     max_streams = 0;
1210     if (config_.HasReceivedMaxUnidirectionalStreams()) {
1211       max_streams = config_.ReceivedMaxUnidirectionalStreams();
1212     }
1213 
1214     if (was_zero_rtt_rejected_ &&
1215         max_streams <
1216             ietf_streamid_manager_.outgoing_unidirectional_stream_count()) {
1217       connection_->CloseConnection(
1218           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1219           absl::StrCat(
1220               "Server rejected 0-RTT, aborting because new unidirectional "
1221               "initial stream limit ",
1222               max_streams, " is less than current open streams: ",
1223               ietf_streamid_manager_.outgoing_unidirectional_stream_count()),
1224           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1225       return;
1226     }
1227 
1228     if (max_streams <
1229         ietf_streamid_manager_.max_outgoing_unidirectional_streams()) {
1230       connection_->CloseConnection(
1231           was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1232                                  : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1233           absl::StrCat(
1234               was_zero_rtt_rejected_
1235                   ? "Server rejected 0-RTT, aborting because "
1236                   : "",
1237               "new unidirectional limit ", max_streams,
1238               " decreases the current limit: ",
1239               ietf_streamid_manager_.max_outgoing_unidirectional_streams()),
1240           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1241       return;
1242     }
1243     QUIC_DVLOG(1) << ENDPOINT
1244                   << "Setting Unidirectional outgoing_max_streams_ to "
1245                   << max_streams;
1246     if (ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1247             max_streams)) {
1248       OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1249     }
1250   } else {
1251     uint32_t max_streams = 0;
1252     if (config_.HasReceivedMaxBidirectionalStreams()) {
1253       max_streams = config_.ReceivedMaxBidirectionalStreams();
1254     }
1255     QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1256                   << max_streams;
1257     if (was_zero_rtt_rejected_ &&
1258         max_streams < stream_id_manager_.num_open_outgoing_streams()) {
1259       connection_->CloseConnection(
1260           QUIC_INTERNAL_ERROR,
1261           absl::StrCat(
1262               "Server rejected 0-RTT, aborting because new stream limit ",
1263               max_streams, " is less than current open streams: ",
1264               stream_id_manager_.num_open_outgoing_streams()),
1265           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1266       return;
1267     }
1268     stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1269   }
1270 
1271   if (perspective() == Perspective::IS_SERVER) {
1272     if (config_.HasReceivedConnectionOptions()) {
1273       // The following variations change the initial receive flow control
1274       // window sizes.
1275       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1276         AdjustInitialFlowControlWindows(64 * 1024);
1277       }
1278       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1279         AdjustInitialFlowControlWindows(128 * 1024);
1280       }
1281       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1282         AdjustInitialFlowControlWindows(256 * 1024);
1283       }
1284       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1285         AdjustInitialFlowControlWindows(512 * 1024);
1286       }
1287       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1288         AdjustInitialFlowControlWindows(1024 * 1024);
1289       }
1290     }
1291 
1292     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1293   }
1294 
1295   if (VersionHasIetfQuicFrames(transport_version())) {
1296     ietf_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1297         config_.GetMaxBidirectionalStreamsToSend());
1298     ietf_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1299         config_.GetMaxUnidirectionalStreamsToSend());
1300   } else {
1301     // A small number of additional incoming streams beyond the limit should be
1302     // allowed. This helps avoid early connection termination when FIN/RSTs for
1303     // old streams are lost or arrive out of order.
1304     // Use a minimum number of additional streams, or a percentage increase,
1305     // whichever is larger.
1306     uint32_t max_incoming_streams_to_send =
1307         config_.GetMaxBidirectionalStreamsToSend();
1308     uint32_t max_incoming_streams =
1309         std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1310                  static_cast<uint32_t>(max_incoming_streams_to_send *
1311                                        kMaxStreamsMultiplier));
1312     stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1313   }
1314 
1315   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1316     // When using IETF-style TLS transport parameters, inform existing streams
1317     // of new flow-control limits.
1318     if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1319       OnNewStreamOutgoingBidirectionalFlowControlWindow(
1320           config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1321     }
1322     if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1323       OnNewStreamIncomingBidirectionalFlowControlWindow(
1324           config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1325     }
1326     if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1327       OnNewStreamUnidirectionalFlowControlWindow(
1328           config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1329     }
1330   } else {  // The version uses Google QUIC Crypto.
1331     if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1332       // Streams which were created before the SHLO was received (0-RTT
1333       // requests) are now informed of the peer's initial flow control window.
1334       OnNewStreamFlowControlWindow(
1335           config_.ReceivedInitialStreamFlowControlWindowBytes());
1336     }
1337   }
1338 
1339   if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1340     OnNewSessionFlowControlWindow(
1341         config_.ReceivedInitialSessionFlowControlWindowBytes());
1342   }
1343 
1344   if (perspective_ == Perspective::IS_SERVER && version().HasIetfQuicFrames() &&
1345       connection_->effective_peer_address().IsInitialized()) {
1346     if (config_.HasClientSentConnectionOption(kSPAD, perspective_)) {
1347       quiche::IpAddressFamily address_family =
1348           connection_->effective_peer_address()
1349               .Normalized()
1350               .host()
1351               .address_family();
1352       std::optional<QuicSocketAddress> preferred_address =
1353           config_.GetPreferredAddressToSend(address_family);
1354       if (preferred_address.has_value()) {
1355         // Set connection ID and token if SPAD has received and a preferred
1356         // address of the same address family is configured.
1357         std::optional<QuicNewConnectionIdFrame> frame =
1358             connection_->MaybeIssueNewConnectionIdForPreferredAddress();
1359         if (frame.has_value()) {
1360           config_.SetPreferredAddressConnectionIdAndTokenToSend(
1361               frame->connection_id, frame->stateless_reset_token);
1362         }
1363         connection_->set_sent_server_preferred_address(*preferred_address);
1364       }
1365       // Clear the alternative address of the other address family in the
1366       // config.
1367       config_.ClearAlternateServerAddressToSend(
1368           address_family == quiche::IpAddressFamily::IP_V4
1369               ? quiche::IpAddressFamily::IP_V6
1370               : quiche::IpAddressFamily::IP_V4);
1371     } else {
1372       // Clear alternative IPv(4|6) addresses in config if the server hasn't
1373       // received 'SPAD' connection option.
1374       config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V4);
1375       config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V6);
1376     }
1377   }
1378 
1379   is_configured_ = true;
1380   connection()->OnConfigNegotiated();
1381 
1382   // Ask flow controllers to try again since the config could have unblocked us.
1383   // Or if this session is configured on TLS enabled QUIC versions,
1384   // attempt to retransmit 0-RTT data if there's any.
1385   // TODO(fayang): consider removing this OnCanWrite call.
1386   if (!connection_->framer().is_processing_packet() &&
1387       (connection_->version().AllowsLowFlowControlLimits() ||
1388        version().UsesTls())) {
1389     QUIC_CODE_COUNT(quic_session_on_can_write_on_config_negotiated);
1390     OnCanWrite();
1391   }
1392 }
1393 
OnAlpsData(const uint8_t *,size_t)1394 std::optional<std::string> QuicSession::OnAlpsData(const uint8_t* /*alps_data*/,
1395                                                    size_t /*alps_length*/) {
1396   return std::nullopt;
1397 }
1398 
AdjustInitialFlowControlWindows(size_t stream_window)1399 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1400   const float session_window_multiplier =
1401       config_.GetInitialStreamFlowControlWindowToSend()
1402           ? static_cast<float>(
1403                 config_.GetInitialSessionFlowControlWindowToSend()) /
1404                 config_.GetInitialStreamFlowControlWindowToSend()
1405           : 1.5;
1406 
1407   QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1408   config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1409 
1410   size_t session_window = session_window_multiplier * stream_window;
1411   QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1412                 << session_window;
1413   config_.SetInitialSessionFlowControlWindowToSend(session_window);
1414   flow_controller_.UpdateReceiveWindowSize(session_window);
1415   // Inform all existing streams about the new window.
1416   for (auto const& kv : stream_map_) {
1417     kv.second->UpdateReceiveWindowSize(stream_window);
1418   }
1419   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1420     GetMutableCryptoStream()->UpdateReceiveWindowSize(stream_window);
1421   }
1422 }
1423 
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1424 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1425     QuicStreamId stream_id) {
1426   QUICHE_DCHECK(!IsClosedStream(stream_id));
1427   // Received a frame for a locally-created stream that is not currently
1428   // active. This is an error.
1429   if (VersionHasIetfQuicFrames(transport_version())) {
1430     connection()->CloseConnection(
1431         QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1432         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1433     return;
1434   }
1435   connection()->CloseConnection(
1436       QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1437       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1438 }
1439 
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1440 void QuicSession::HandleRstOnValidNonexistentStream(
1441     const QuicRstStreamFrame& frame) {
1442   // If the stream is neither originally in active streams nor created in
1443   // GetOrCreateStream(), it could be a closed stream in which case its
1444   // final received byte offset need to be updated.
1445   if (IsClosedStream(frame.stream_id)) {
1446     // The RST frame contains the final byte offset for the stream: we can now
1447     // update the connection level flow controller if needed.
1448     OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1449   }
1450 }
1451 
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1452 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1453   QUICHE_DCHECK(version().UsesQuicCrypto());
1454   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1455   if (new_window < kMinimumFlowControlSendWindow) {
1456     QUIC_LOG_FIRST_N(ERROR, 1)
1457         << "Peer sent us an invalid stream flow control send window: "
1458         << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1459     connection_->CloseConnection(
1460         QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1461         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1462     return;
1463   }
1464 
1465   // Inform all existing streams about the new window.
1466   for (auto const& kv : stream_map_) {
1467     QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1468                   << " of new stream flow control window " << new_window;
1469     if (!kv.second->MaybeConfigSendWindowOffset(
1470             new_window, /* was_zero_rtt_rejected = */ false)) {
1471       return;
1472     }
1473   }
1474   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1475     QUIC_DVLOG(1)
1476         << ENDPOINT
1477         << "Informing crypto stream of new stream flow control window "
1478         << new_window;
1479     GetMutableCryptoStream()->MaybeConfigSendWindowOffset(
1480         new_window, /* was_zero_rtt_rejected = */ false);
1481   }
1482 }
1483 
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1484 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1485     QuicStreamOffset new_window) {
1486   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1487   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1488                 << new_window;
1489   // Inform all existing outgoing unidirectional streams about the new window.
1490   for (auto const& kv : stream_map_) {
1491     const QuicStreamId id = kv.first;
1492     if (!version().HasIetfQuicFrames()) {
1493       if (kv.second->type() == BIDIRECTIONAL) {
1494         continue;
1495       }
1496     } else {
1497       if (QuicUtils::IsBidirectionalStreamId(id, version())) {
1498         continue;
1499       }
1500     }
1501     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1502                                        perspective())) {
1503       continue;
1504     }
1505     QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1506                   << " of new stream flow control window " << new_window;
1507     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1508                                                 was_zero_rtt_rejected_)) {
1509       return;
1510     }
1511   }
1512 }
1513 
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1514 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1515     QuicStreamOffset new_window) {
1516   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1517   QUIC_DVLOG(1) << ENDPOINT
1518                 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1519                 << new_window;
1520   // Inform all existing outgoing bidirectional streams about the new window.
1521   for (auto const& kv : stream_map_) {
1522     const QuicStreamId id = kv.first;
1523     if (!version().HasIetfQuicFrames()) {
1524       if (kv.second->type() != BIDIRECTIONAL) {
1525         continue;
1526       }
1527     } else {
1528       if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1529         continue;
1530       }
1531     }
1532     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1533                                        perspective())) {
1534       continue;
1535     }
1536     QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1537                   << id << " of new stream flow control window " << new_window;
1538     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1539                                                 was_zero_rtt_rejected_)) {
1540       return;
1541     }
1542   }
1543 }
1544 
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1545 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1546     QuicStreamOffset new_window) {
1547   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1548   QUIC_DVLOG(1) << ENDPOINT
1549                 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1550                 << new_window;
1551   // Inform all existing incoming bidirectional streams about the new window.
1552   for (auto const& kv : stream_map_) {
1553     const QuicStreamId id = kv.first;
1554     if (!version().HasIetfQuicFrames()) {
1555       if (kv.second->type() != BIDIRECTIONAL) {
1556         continue;
1557       }
1558     } else {
1559       if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1560         continue;
1561       }
1562     }
1563     if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1564                                       perspective())) {
1565       continue;
1566     }
1567     QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1568                   << id << " of new stream flow control window " << new_window;
1569     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1570                                                 was_zero_rtt_rejected_)) {
1571       return;
1572     }
1573   }
1574 }
1575 
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1576 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1577   QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1578 
1579   if (was_zero_rtt_rejected_ && new_window < flow_controller_.bytes_sent()) {
1580     std::string error_details = absl::StrCat(
1581         "Server rejected 0-RTT. Aborting because the client received session "
1582         "flow control send window: ",
1583         new_window,
1584         ", which is below currently used: ", flow_controller_.bytes_sent());
1585     QUIC_LOG(ERROR) << error_details;
1586     connection_->CloseConnection(
1587         QUIC_ZERO_RTT_UNRETRANSMITTABLE, error_details,
1588         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1589     return;
1590   }
1591   if (!connection_->version().AllowsLowFlowControlLimits() &&
1592       new_window < kMinimumFlowControlSendWindow) {
1593     std::string error_details = absl::StrCat(
1594         "Peer sent us an invalid session flow control send window: ",
1595         new_window, ", below minimum: ", kMinimumFlowControlSendWindow);
1596     QUIC_LOG_FIRST_N(ERROR, 1) << error_details;
1597     connection_->CloseConnection(
1598         QUIC_FLOW_CONTROL_INVALID_WINDOW, error_details,
1599         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1600     return;
1601   }
1602   if (perspective_ == Perspective::IS_CLIENT &&
1603       new_window < flow_controller_.send_window_offset()) {
1604     // The client receives a lower limit than remembered, violating
1605     // https://tools.ietf.org/html/draft-ietf-quic-transport-27#section-7.3.1
1606     std::string error_details = absl::StrCat(
1607         was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
1608                                : "",
1609         "new session max data ", new_window,
1610         " decreases current limit: ", flow_controller_.send_window_offset());
1611     QUIC_LOG(ERROR) << error_details;
1612     connection_->CloseConnection(
1613         was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1614                                : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1615         error_details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1616     return;
1617   }
1618 
1619   flow_controller_.UpdateSendWindowOffset(new_window);
1620 }
1621 
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1622 bool QuicSession::OnNewDecryptionKeyAvailable(
1623     EncryptionLevel level, std::unique_ptr<QuicDecrypter> decrypter,
1624     bool set_alternative_decrypter, bool latch_once_used) {
1625   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1626       !connection()->framer().HasEncrypterOfEncryptionLevel(
1627           QuicUtils::GetEncryptionLevelToSendAckofSpace(
1628               QuicUtils::GetPacketNumberSpace(level)))) {
1629     // This should never happen because connection should never decrypt a packet
1630     // while an ACK for it cannot be encrypted.
1631     return false;
1632   }
1633   if (connection()->version().KnowsWhichDecrypterToUse()) {
1634     connection()->InstallDecrypter(level, std::move(decrypter));
1635     return true;
1636   }
1637   if (set_alternative_decrypter) {
1638     connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1639                                           latch_once_used);
1640     return true;
1641   }
1642   connection()->SetDecrypter(level, std::move(decrypter));
1643   return true;
1644 }
1645 
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1646 void QuicSession::OnNewEncryptionKeyAvailable(
1647     EncryptionLevel level, std::unique_ptr<QuicEncrypter> encrypter) {
1648   connection()->SetEncrypter(level, std::move(encrypter));
1649   if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
1650     return;
1651   }
1652 
1653   bool reset_encryption_level = false;
1654   if (IsEncryptionEstablished() && level == ENCRYPTION_HANDSHAKE) {
1655     // ENCRYPTION_HANDSHAKE keys are only used for the handshake. If
1656     // ENCRYPTION_ZERO_RTT keys exist, it is possible for a client to send
1657     // stream data, which must not be sent at the ENCRYPTION_HANDSHAKE level.
1658     // Therefore, we avoid setting the default encryption level to
1659     // ENCRYPTION_HANDSHAKE.
1660     reset_encryption_level = true;
1661   }
1662   QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1663   connection()->SetDefaultEncryptionLevel(level);
1664   if (reset_encryption_level) {
1665     connection()->SetDefaultEncryptionLevel(ENCRYPTION_ZERO_RTT);
1666   }
1667   QUIC_BUG_IF(quic_bug_12435_7,
1668               IsEncryptionEstablished() &&
1669                   (connection()->encryption_level() == ENCRYPTION_INITIAL ||
1670                    connection()->encryption_level() == ENCRYPTION_HANDSHAKE))
1671       << "Encryption is established, but the encryption level " << level
1672       << " does not support sending stream data";
1673 }
1674 
SetDefaultEncryptionLevel(EncryptionLevel level)1675 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1676   QUICHE_DCHECK_EQ(PROTOCOL_QUIC_CRYPTO,
1677                    connection_->version().handshake_protocol);
1678   QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1679   connection()->SetDefaultEncryptionLevel(level);
1680 
1681   switch (level) {
1682     case ENCRYPTION_INITIAL:
1683       break;
1684     case ENCRYPTION_ZERO_RTT:
1685       if (perspective() == Perspective::IS_CLIENT) {
1686         // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1687         // they can't be decrypted by the server.
1688         connection_->MarkZeroRttPacketsForRetransmission(0);
1689         if (!connection_->framer().is_processing_packet()) {
1690           // TODO(fayang): consider removing this OnCanWrite call.
1691           // Given any streams blocked by encryption a chance to write.
1692           QUIC_CODE_COUNT(
1693               quic_session_on_can_write_set_default_encryption_level);
1694           OnCanWrite();
1695         }
1696       }
1697       break;
1698     case ENCRYPTION_HANDSHAKE:
1699       break;
1700     case ENCRYPTION_FORWARD_SECURE:
1701       QUIC_BUG_IF(quic_bug_12435_8, !config_.negotiated())
1702           << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1703       connection()->mutable_stats().handshake_completion_time =
1704           connection()->clock()->ApproximateNow();
1705       break;
1706     default:
1707       QUIC_BUG(quic_bug_10866_7) << "Unknown encryption level: " << level;
1708   }
1709 }
1710 
OnTlsHandshakeComplete()1711 void QuicSession::OnTlsHandshakeComplete() {
1712   QUICHE_DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1713   QUIC_BUG_IF(quic_bug_12435_9,
1714               !GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1715       << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1716   QUIC_BUG_IF(quic_bug_12435_10, !config_.negotiated())
1717       << ENDPOINT << "Handshake completes without parameter negotiation.";
1718   connection()->mutable_stats().handshake_completion_time =
1719       connection()->clock()->ApproximateNow();
1720   if (connection()->version().UsesTls() &&
1721       perspective_ == Perspective::IS_SERVER) {
1722     // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1723     // to the client.
1724     control_frame_manager_.WriteOrBufferHandshakeDone();
1725     if (connection()->version().HasIetfQuicFrames()) {
1726       MaybeSendAddressToken();
1727     }
1728   }
1729 }
1730 
MaybeSendAddressToken()1731 bool QuicSession::MaybeSendAddressToken() {
1732   QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER &&
1733                 connection()->version().HasIetfQuicFrames());
1734   std::optional<CachedNetworkParameters> cached_network_params =
1735       GenerateCachedNetworkParameters();
1736 
1737   std::string address_token = GetCryptoStream()->GetAddressToken(
1738       cached_network_params.has_value() ? &*cached_network_params : nullptr);
1739   if (address_token.empty()) {
1740     return false;
1741   }
1742   const size_t buf_len = address_token.length() + 1;
1743   auto buffer = std::make_unique<char[]>(buf_len);
1744   QuicDataWriter writer(buf_len, buffer.get());
1745   // Add |kAddressTokenPrefix| for token sent in NEW_TOKEN frame.
1746   writer.WriteUInt8(kAddressTokenPrefix);
1747   writer.WriteBytes(address_token.data(), address_token.length());
1748   control_frame_manager_.WriteOrBufferNewToken(
1749       absl::string_view(buffer.get(), buf_len));
1750   if (cached_network_params.has_value()) {
1751     connection()->OnSendConnectionState(*cached_network_params);
1752   }
1753   return true;
1754 }
1755 
DiscardOldDecryptionKey(EncryptionLevel level)1756 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1757   if (!connection()->version().KnowsWhichDecrypterToUse()) {
1758     return;
1759   }
1760   connection()->RemoveDecrypter(level);
1761 }
1762 
DiscardOldEncryptionKey(EncryptionLevel level)1763 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1764   QUIC_DLOG(INFO) << ENDPOINT << "Discarding " << level << " keys";
1765   if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1766     connection()->RemoveEncrypter(level);
1767   }
1768   switch (level) {
1769     case ENCRYPTION_INITIAL:
1770       NeuterUnencryptedData();
1771       break;
1772     case ENCRYPTION_HANDSHAKE:
1773       NeuterHandshakeData();
1774       break;
1775     case ENCRYPTION_ZERO_RTT:
1776       break;
1777     case ENCRYPTION_FORWARD_SECURE:
1778       QUIC_BUG(quic_bug_10866_8)
1779           << ENDPOINT << "Discarding 1-RTT keys is not allowed";
1780       break;
1781     default:
1782       QUIC_BUG(quic_bug_10866_9)
1783           << ENDPOINT
1784           << "Cannot discard keys for unknown encryption level: " << level;
1785   }
1786 }
1787 
NeuterHandshakeData()1788 void QuicSession::NeuterHandshakeData() {
1789   GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(
1790       ENCRYPTION_HANDSHAKE);
1791   connection()->OnHandshakeComplete();
1792 }
1793 
OnZeroRttRejected(int reason)1794 void QuicSession::OnZeroRttRejected(int reason) {
1795   was_zero_rtt_rejected_ = true;
1796   connection_->MarkZeroRttPacketsForRetransmission(reason);
1797   if (connection_->encryption_level() == ENCRYPTION_FORWARD_SECURE) {
1798     QUIC_BUG(quic_bug_10866_10)
1799         << "1-RTT keys already available when 0-RTT is rejected.";
1800     connection_->CloseConnection(
1801         QUIC_INTERNAL_ERROR,
1802         "1-RTT keys already available when 0-RTT is rejected.",
1803         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1804   }
1805 }
1806 
FillTransportParameters(TransportParameters * params)1807 bool QuicSession::FillTransportParameters(TransportParameters* params) {
1808   if (version().UsesTls()) {
1809     if (perspective() == Perspective::IS_SERVER) {
1810       config_.SetOriginalConnectionIdToSend(
1811           connection_->GetOriginalDestinationConnectionId());
1812       config_.SetInitialSourceConnectionIdToSend(connection_->connection_id());
1813     } else {
1814       config_.SetInitialSourceConnectionIdToSend(
1815           connection_->client_connection_id());
1816     }
1817   }
1818   return config_.FillTransportParameters(params);
1819 }
1820 
ProcessTransportParameters(const TransportParameters & params,bool is_resumption,std::string * error_details)1821 QuicErrorCode QuicSession::ProcessTransportParameters(
1822     const TransportParameters& params, bool is_resumption,
1823     std::string* error_details) {
1824   return config_.ProcessTransportParameters(params, is_resumption,
1825                                             error_details);
1826 }
1827 
OnHandshakeCallbackDone()1828 void QuicSession::OnHandshakeCallbackDone() {
1829   if (!connection_->connected()) {
1830     return;
1831   }
1832 
1833   if (!connection()->is_processing_packet()) {
1834     connection()->MaybeProcessUndecryptablePackets();
1835   }
1836 }
1837 
PacketFlusherAttached() const1838 bool QuicSession::PacketFlusherAttached() const {
1839   QUICHE_DCHECK(connection_->connected());
1840   return connection()->packet_creator().PacketFlusherAttached();
1841 }
1842 
OnEncryptedClientHelloSent(absl::string_view client_hello) const1843 void QuicSession::OnEncryptedClientHelloSent(
1844     absl::string_view client_hello) const {
1845   connection()->OnEncryptedClientHelloSent(client_hello);
1846 }
1847 
OnEncryptedClientHelloReceived(absl::string_view client_hello) const1848 void QuicSession::OnEncryptedClientHelloReceived(
1849     absl::string_view client_hello) const {
1850   connection()->OnEncryptedClientHelloReceived(client_hello);
1851 }
1852 
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1853 void QuicSession::OnCryptoHandshakeMessageSent(
1854     const CryptoHandshakeMessage& /*message*/) {}
1855 
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1856 void QuicSession::OnCryptoHandshakeMessageReceived(
1857     const CryptoHandshakeMessage& /*message*/) {}
1858 
RegisterStreamPriority(QuicStreamId id,bool is_static,const QuicStreamPriority & priority)1859 void QuicSession::RegisterStreamPriority(QuicStreamId id, bool is_static,
1860                                          const QuicStreamPriority& priority) {
1861   write_blocked_streams()->RegisterStream(id, is_static, priority);
1862 }
1863 
UnregisterStreamPriority(QuicStreamId id)1864 void QuicSession::UnregisterStreamPriority(QuicStreamId id) {
1865   write_blocked_streams()->UnregisterStream(id);
1866 }
1867 
UpdateStreamPriority(QuicStreamId id,const QuicStreamPriority & new_priority)1868 void QuicSession::UpdateStreamPriority(QuicStreamId id,
1869                                        const QuicStreamPriority& new_priority) {
1870   write_blocked_streams()->UpdateStreamPriority(id, new_priority);
1871 }
1872 
ActivateStream(std::unique_ptr<QuicStream> stream)1873 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1874   const bool should_keep_alive = ShouldKeepConnectionAlive();
1875   QuicStreamId stream_id = stream->id();
1876   bool is_static = stream->is_static();
1877   QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1878                 << ". activating stream " << stream_id;
1879   QUICHE_DCHECK(!stream_map_.contains(stream_id));
1880   stream_map_[stream_id] = std::move(stream);
1881   if (is_static) {
1882     ++num_static_streams_;
1883     return;
1884   }
1885   if (!VersionHasIetfQuicFrames(transport_version())) {
1886     // Do not inform stream ID manager of static streams.
1887     stream_id_manager_.ActivateStream(
1888         /*is_incoming=*/IsIncomingStream(stream_id));
1889   }
1890   if (perspective() == Perspective::IS_CLIENT &&
1891       connection()->multi_port_stats() != nullptr && !should_keep_alive &&
1892       ShouldKeepConnectionAlive()) {
1893     connection()->MaybeProbeMultiPortPath();
1894   }
1895 }
1896 
GetNextOutgoingBidirectionalStreamId()1897 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1898   if (VersionHasIetfQuicFrames(transport_version())) {
1899     return ietf_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1900   }
1901   return stream_id_manager_.GetNextOutgoingStreamId();
1902 }
1903 
GetNextOutgoingUnidirectionalStreamId()1904 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1905   if (VersionHasIetfQuicFrames(transport_version())) {
1906     return ietf_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1907   }
1908   return stream_id_manager_.GetNextOutgoingStreamId();
1909 }
1910 
CanOpenNextOutgoingBidirectionalStream()1911 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1912   if (liveness_testing_in_progress_) {
1913     QUICHE_DCHECK_EQ(Perspective::IS_CLIENT, perspective());
1914     QUIC_CODE_COUNT(
1915         quic_client_fails_to_create_stream_liveness_testing_in_progress);
1916     return false;
1917   }
1918   if (!VersionHasIetfQuicFrames(transport_version())) {
1919     if (!stream_id_manager_.CanOpenNextOutgoingStream()) {
1920       return false;
1921     }
1922   } else {
1923     if (!ietf_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1924       QUIC_CODE_COUNT(
1925           quic_fails_to_create_stream_close_too_many_streams_created);
1926       if (is_configured_) {
1927         // Send STREAM_BLOCKED after config negotiated.
1928         control_frame_manager_.WriteOrBufferStreamsBlocked(
1929             ietf_streamid_manager_.max_outgoing_bidirectional_streams(),
1930             /*unidirectional=*/false);
1931       }
1932       return false;
1933     }
1934   }
1935   if (perspective() == Perspective::IS_CLIENT &&
1936       connection_->MaybeTestLiveness()) {
1937     // Now is relatively close to the idle timeout having the risk that requests
1938     // could be discarded at the server.
1939     liveness_testing_in_progress_ = true;
1940     QUIC_CODE_COUNT(quic_client_fails_to_create_stream_close_to_idle_timeout);
1941     return false;
1942   }
1943   return true;
1944 }
1945 
CanOpenNextOutgoingUnidirectionalStream()1946 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1947   if (!VersionHasIetfQuicFrames(transport_version())) {
1948     return stream_id_manager_.CanOpenNextOutgoingStream();
1949   }
1950   if (ietf_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
1951     return true;
1952   }
1953   if (is_configured_) {
1954     // Send STREAM_BLOCKED after config negotiated.
1955     control_frame_manager_.WriteOrBufferStreamsBlocked(
1956         ietf_streamid_manager_.max_outgoing_unidirectional_streams(),
1957         /*unidirectional=*/true);
1958   }
1959   return false;
1960 }
1961 
GetAdvertisedMaxIncomingBidirectionalStreams() const1962 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
1963     const {
1964   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1965   return ietf_streamid_manager_.advertised_max_incoming_bidirectional_streams();
1966 }
1967 
GetOrCreateStream(const QuicStreamId stream_id)1968 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1969   QUICHE_DCHECK(!pending_stream_map_.contains(stream_id));
1970   if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
1971     return GetMutableCryptoStream();
1972   }
1973 
1974   StreamMap::iterator it = stream_map_.find(stream_id);
1975   if (it != stream_map_.end()) {
1976     return it->second->IsZombie() ? nullptr : it->second.get();
1977   }
1978 
1979   if (IsClosedStream(stream_id)) {
1980     return nullptr;
1981   }
1982 
1983   if (!IsIncomingStream(stream_id)) {
1984     HandleFrameOnNonexistentOutgoingStream(stream_id);
1985     return nullptr;
1986   }
1987 
1988   // TODO(fkastenholz): If we are creating a new stream and we have sent a
1989   // goaway, we should ignore the stream creation. Need to add code to A) test
1990   // if goaway was sent ("if (transport_goaway_sent_)") and B) reject stream
1991   // creation ("return nullptr")
1992 
1993   if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1994     return nullptr;
1995   }
1996 
1997   if (!VersionHasIetfQuicFrames(transport_version()) &&
1998       !stream_id_manager_.CanOpenIncomingStream()) {
1999     // Refuse to open the stream.
2000     ResetStream(stream_id, QUIC_REFUSED_STREAM);
2001     return nullptr;
2002   }
2003 
2004   return CreateIncomingStream(stream_id);
2005 }
2006 
StreamDraining(QuicStreamId stream_id,bool unidirectional)2007 void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
2008   QUICHE_DCHECK(stream_map_.contains(stream_id));
2009   QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
2010   if (VersionHasIetfQuicFrames(transport_version())) {
2011     ietf_streamid_manager_.OnStreamClosed(stream_id);
2012   } else {
2013     stream_id_manager_.OnStreamClosed(
2014         /*is_incoming=*/IsIncomingStream(stream_id));
2015   }
2016   ++num_draining_streams_;
2017   if (!IsIncomingStream(stream_id)) {
2018     ++num_outgoing_draining_streams_;
2019     if (!VersionHasIetfQuicFrames(transport_version())) {
2020       OnCanCreateNewOutgoingStream(unidirectional);
2021     }
2022   }
2023 }
2024 
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)2025 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
2026     const QuicStreamId stream_id) {
2027   if (VersionHasIetfQuicFrames(transport_version())) {
2028     std::string error_details;
2029     if (ietf_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
2030             stream_id, &error_details)) {
2031       return true;
2032     }
2033     connection()->CloseConnection(
2034         QUIC_INVALID_STREAM_ID, error_details,
2035         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2036     return false;
2037   }
2038   if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
2039     connection()->CloseConnection(
2040         QUIC_TOO_MANY_AVAILABLE_STREAMS,
2041         absl::StrCat(stream_id, " exceeds available streams ",
2042                      stream_id_manager_.MaxAvailableStreams()),
2043         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2044     return false;
2045   }
2046   return true;
2047 }
2048 
ShouldYield(QuicStreamId stream_id)2049 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
2050   if (stream_id == currently_writing_stream_id_) {
2051     return false;
2052   }
2053   return write_blocked_streams()->ShouldYield(stream_id);
2054 }
2055 
GetOrCreatePendingStream(QuicStreamId stream_id)2056 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
2057   auto it = pending_stream_map_.find(stream_id);
2058   if (it != pending_stream_map_.end()) {
2059     return it->second.get();
2060   }
2061 
2062   if (IsClosedStream(stream_id) ||
2063       !MaybeIncreaseLargestPeerStreamId(stream_id)) {
2064     return nullptr;
2065   }
2066 
2067   auto pending = std::make_unique<PendingStream>(stream_id, this);
2068   PendingStream* unowned_pending = pending.get();
2069   pending_stream_map_[stream_id] = std::move(pending);
2070   return unowned_pending;
2071 }
2072 
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)2073 void QuicSession::set_largest_peer_created_stream_id(
2074     QuicStreamId largest_peer_created_stream_id) {
2075   QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
2076   stream_id_manager_.set_largest_peer_created_stream_id(
2077       largest_peer_created_stream_id);
2078 }
2079 
GetLargestPeerCreatedStreamId(bool unidirectional) const2080 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
2081     bool unidirectional) const {
2082   // This method is only used in IETF QUIC.
2083   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2084   return ietf_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
2085 }
2086 
DeleteConnection()2087 void QuicSession::DeleteConnection() {
2088   if (connection_) {
2089     delete connection_;
2090     connection_ = nullptr;
2091   }
2092 }
2093 
MaybeSetStreamPriority(QuicStreamId stream_id,const QuicStreamPriority & priority)2094 bool QuicSession::MaybeSetStreamPriority(QuicStreamId stream_id,
2095                                          const QuicStreamPriority& priority) {
2096   auto active_stream = stream_map_.find(stream_id);
2097   if (active_stream != stream_map_.end()) {
2098     active_stream->second->SetPriority(priority);
2099     return true;
2100   }
2101 
2102   return false;
2103 }
2104 
IsClosedStream(QuicStreamId id)2105 bool QuicSession::IsClosedStream(QuicStreamId id) {
2106   QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2107   if (IsOpenStream(id)) {
2108     // Stream is active
2109     return false;
2110   }
2111 
2112   if (VersionHasIetfQuicFrames(transport_version())) {
2113     return !ietf_streamid_manager_.IsAvailableStream(id);
2114   }
2115 
2116   return !stream_id_manager_.IsAvailableStream(id);
2117 }
2118 
IsOpenStream(QuicStreamId id)2119 bool QuicSession::IsOpenStream(QuicStreamId id) {
2120   QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2121   const StreamMap::iterator it = stream_map_.find(id);
2122   if (it != stream_map_.end()) {
2123     return !it->second->IsZombie();
2124   }
2125   if (pending_stream_map_.contains(id) ||
2126       QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2127     // Stream is active
2128     return true;
2129   }
2130   return false;
2131 }
2132 
IsStaticStream(QuicStreamId id) const2133 bool QuicSession::IsStaticStream(QuicStreamId id) const {
2134   auto it = stream_map_.find(id);
2135   if (it == stream_map_.end()) {
2136     return false;
2137   }
2138   return it->second->is_static();
2139 }
2140 
GetNumActiveStreams() const2141 size_t QuicSession::GetNumActiveStreams() const {
2142   QUICHE_DCHECK_GE(
2143       static_cast<QuicStreamCount>(stream_map_.size()),
2144       num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
2145   return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
2146          num_zombie_streams_;
2147 }
2148 
MarkConnectionLevelWriteBlocked(QuicStreamId id)2149 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
2150   if (GetOrCreateStream(id) == nullptr) {
2151     QUIC_BUG(quic_bug_10866_11)
2152         << "Marking unknown stream " << id << " blocked.";
2153     QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
2154   }
2155 
2156   QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
2157                 << " to write-blocked list";
2158 
2159   write_blocked_streams_->AddStream(id);
2160 }
2161 
HasDataToWrite() const2162 bool QuicSession::HasDataToWrite() const {
2163   return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
2164          write_blocked_streams_->HasWriteBlockedDataStreams() ||
2165          connection_->HasQueuedData() ||
2166          !streams_with_pending_retransmission_.empty() ||
2167          control_frame_manager_.WillingToWrite();
2168 }
2169 
OnAckNeedsRetransmittableFrame()2170 void QuicSession::OnAckNeedsRetransmittableFrame() {
2171   flow_controller_.SendWindowUpdate();
2172 }
2173 
SendAckFrequency(const QuicAckFrequencyFrame & frame)2174 void QuicSession::SendAckFrequency(const QuicAckFrequencyFrame& frame) {
2175   control_frame_manager_.WriteOrBufferAckFrequency(frame);
2176 }
2177 
SendNewConnectionId(const QuicNewConnectionIdFrame & frame)2178 void QuicSession::SendNewConnectionId(const QuicNewConnectionIdFrame& frame) {
2179   control_frame_manager_.WriteOrBufferNewConnectionId(
2180       frame.connection_id, frame.sequence_number, frame.retire_prior_to,
2181       frame.stateless_reset_token);
2182 }
2183 
SendRetireConnectionId(uint64_t sequence_number)2184 void QuicSession::SendRetireConnectionId(uint64_t sequence_number) {
2185   control_frame_manager_.WriteOrBufferRetireConnectionId(sequence_number);
2186 }
2187 
MaybeReserveConnectionId(const QuicConnectionId & server_connection_id)2188 bool QuicSession::MaybeReserveConnectionId(
2189     const QuicConnectionId& server_connection_id) {
2190   if (visitor_) {
2191     return visitor_->TryAddNewConnectionId(
2192         connection_->GetOneActiveServerConnectionId(), server_connection_id);
2193   }
2194   return true;
2195 }
2196 
OnServerConnectionIdRetired(const QuicConnectionId & server_connection_id)2197 void QuicSession::OnServerConnectionIdRetired(
2198     const QuicConnectionId& server_connection_id) {
2199   if (visitor_) {
2200     visitor_->OnConnectionIdRetired(server_connection_id);
2201   }
2202 }
2203 
IsConnectionFlowControlBlocked() const2204 bool QuicSession::IsConnectionFlowControlBlocked() const {
2205   return flow_controller_.IsBlocked();
2206 }
2207 
IsStreamFlowControlBlocked()2208 bool QuicSession::IsStreamFlowControlBlocked() {
2209   for (auto const& kv : stream_map_) {
2210     if (kv.second->IsFlowControlBlocked()) {
2211       return true;
2212     }
2213   }
2214   if (!QuicVersionUsesCryptoFrames(transport_version()) &&
2215       GetMutableCryptoStream()->IsFlowControlBlocked()) {
2216     return true;
2217   }
2218   return false;
2219 }
2220 
MaxAvailableBidirectionalStreams() const2221 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
2222   if (VersionHasIetfQuicFrames(transport_version())) {
2223     return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2224   }
2225   return stream_id_manager_.MaxAvailableStreams();
2226 }
2227 
MaxAvailableUnidirectionalStreams() const2228 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
2229   if (VersionHasIetfQuicFrames(transport_version())) {
2230     return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2231   }
2232   return stream_id_manager_.MaxAvailableStreams();
2233 }
2234 
IsIncomingStream(QuicStreamId id) const2235 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
2236   if (VersionHasIetfQuicFrames(transport_version())) {
2237     return !QuicUtils::IsOutgoingStreamId(version(), id, perspective_);
2238   }
2239   return stream_id_manager_.IsIncomingStream(id);
2240 }
2241 
MaybeCloseZombieStream(QuicStreamId id)2242 void QuicSession::MaybeCloseZombieStream(QuicStreamId id) {
2243   auto it = stream_map_.find(id);
2244   if (it == stream_map_.end()) {
2245     return;
2246   }
2247   --num_zombie_streams_;
2248   closed_streams_.push_back(std::move(it->second));
2249   stream_map_.erase(it);
2250 
2251   if (!closed_streams_clean_up_alarm_->IsSet()) {
2252     closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
2253   }
2254   // Do not retransmit data of a closed stream.
2255   streams_with_pending_retransmission_.erase(id);
2256   connection_->QuicBugIfHasPendingFrames(id);
2257 }
2258 
GetStream(QuicStreamId id) const2259 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
2260   auto active_stream = stream_map_.find(id);
2261   if (active_stream != stream_map_.end()) {
2262     return active_stream->second.get();
2263   }
2264 
2265   if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2266     return const_cast<QuicCryptoStream*>(GetCryptoStream());
2267   }
2268 
2269   return nullptr;
2270 }
2271 
GetActiveStream(QuicStreamId id) const2272 QuicStream* QuicSession::GetActiveStream(QuicStreamId id) const {
2273   auto stream = stream_map_.find(id);
2274   if (stream != stream_map_.end() && !stream->second->is_static()) {
2275     return stream->second.get();
2276   }
2277   return nullptr;
2278 }
2279 
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)2280 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
2281                                QuicTime::Delta ack_delay_time,
2282                                QuicTime receive_timestamp) {
2283   if (frame.type == MESSAGE_FRAME) {
2284     OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
2285     return true;
2286   }
2287   if (frame.type == CRYPTO_FRAME) {
2288     return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
2289                                                         ack_delay_time);
2290   }
2291   if (frame.type != STREAM_FRAME) {
2292     bool acked = control_frame_manager_.OnControlFrameAcked(frame);
2293     if (limit_sending_max_streams_ && acked &&
2294         frame.type == MAX_STREAMS_FRAME) {
2295       // Since there is a 2 frame limit on the number of outstanding max_streams
2296       // frames, when an outstanding max_streams frame is ack'd that frees up
2297       // room to potntially send another.
2298       QUIC_RELOADABLE_FLAG_COUNT_N(quic_limit_sending_max_streams2, 2, 3);
2299       ietf_streamid_manager_.MaybeSendMaxStreamsFrame();
2300     }
2301     return acked;
2302   }
2303   bool new_stream_data_acked = false;
2304   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2305   // Stream can already be reset when sent frame gets acked.
2306   if (stream != nullptr) {
2307     QuicByteCount newly_acked_length = 0;
2308     new_stream_data_acked = stream->OnStreamFrameAcked(
2309         frame.stream_frame.offset, frame.stream_frame.data_length,
2310         frame.stream_frame.fin, ack_delay_time, receive_timestamp,
2311         &newly_acked_length);
2312     if (!stream->HasPendingRetransmission()) {
2313       streams_with_pending_retransmission_.erase(stream->id());
2314     }
2315   }
2316   return new_stream_data_acked;
2317 }
2318 
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)2319 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
2320   QuicStream* stream = GetStream(frame.stream_id);
2321   if (stream == nullptr) {
2322     QUIC_BUG(quic_bug_10866_12)
2323         << "Stream: " << frame.stream_id << " is closed when " << frame
2324         << " is retransmitted.";
2325     connection()->CloseConnection(
2326         QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
2327         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2328     return;
2329   }
2330   stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
2331                                      frame.fin);
2332 }
2333 
OnFrameLost(const QuicFrame & frame)2334 void QuicSession::OnFrameLost(const QuicFrame& frame) {
2335   if (frame.type == MESSAGE_FRAME) {
2336     ++total_datagrams_lost_;
2337     OnMessageLost(frame.message_frame->message_id);
2338     return;
2339   }
2340   if (frame.type == CRYPTO_FRAME) {
2341     GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
2342     return;
2343   }
2344   if (frame.type != STREAM_FRAME) {
2345     control_frame_manager_.OnControlFrameLost(frame);
2346     return;
2347   }
2348   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2349   if (stream == nullptr) {
2350     return;
2351   }
2352   stream->OnStreamFrameLost(frame.stream_frame.offset,
2353                             frame.stream_frame.data_length,
2354                             frame.stream_frame.fin);
2355   if (stream->HasPendingRetransmission() &&
2356       !streams_with_pending_retransmission_.contains(
2357           frame.stream_frame.stream_id)) {
2358     streams_with_pending_retransmission_.insert(
2359         std::make_pair(frame.stream_frame.stream_id, true));
2360   }
2361 }
2362 
RetransmitFrames(const QuicFrames & frames,TransmissionType type)2363 bool QuicSession::RetransmitFrames(const QuicFrames& frames,
2364                                    TransmissionType type) {
2365   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2366   for (const QuicFrame& frame : frames) {
2367     if (frame.type == MESSAGE_FRAME) {
2368       // Do not retransmit MESSAGE frames.
2369       continue;
2370     }
2371     if (frame.type == CRYPTO_FRAME) {
2372       if (!GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type)) {
2373         return false;
2374       }
2375       continue;
2376     }
2377     if (frame.type != STREAM_FRAME) {
2378       if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
2379         return false;
2380       }
2381       continue;
2382     }
2383     QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2384     if (stream != nullptr &&
2385         !stream->RetransmitStreamData(frame.stream_frame.offset,
2386                                       frame.stream_frame.data_length,
2387                                       frame.stream_frame.fin, type)) {
2388       return false;
2389     }
2390   }
2391   return true;
2392 }
2393 
IsFrameOutstanding(const QuicFrame & frame) const2394 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2395   if (frame.type == MESSAGE_FRAME) {
2396     return false;
2397   }
2398   if (frame.type == CRYPTO_FRAME) {
2399     return GetCryptoStream()->IsFrameOutstanding(
2400         frame.crypto_frame->level, frame.crypto_frame->offset,
2401         frame.crypto_frame->data_length);
2402   }
2403   if (frame.type != STREAM_FRAME) {
2404     return control_frame_manager_.IsControlFrameOutstanding(frame);
2405   }
2406   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2407   return stream != nullptr &&
2408          stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2409                                           frame.stream_frame.data_length,
2410                                           frame.stream_frame.fin);
2411 }
2412 
HasUnackedCryptoData() const2413 bool QuicSession::HasUnackedCryptoData() const {
2414   const QuicCryptoStream* crypto_stream = GetCryptoStream();
2415   return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2416 }
2417 
HasUnackedStreamData() const2418 bool QuicSession::HasUnackedStreamData() const {
2419   for (const auto& it : stream_map_) {
2420     if (it.second->IsWaitingForAcks()) {
2421       return true;
2422     }
2423   }
2424   return false;
2425 }
2426 
GetHandshakeState() const2427 HandshakeState QuicSession::GetHandshakeState() const {
2428   return GetCryptoStream()->GetHandshakeState();
2429 }
2430 
GetFlowControlSendWindowSize(QuicStreamId id)2431 QuicByteCount QuicSession::GetFlowControlSendWindowSize(QuicStreamId id) {
2432   QuicStream* stream = GetActiveStream(id);
2433   if (stream == nullptr) {
2434     // No flow control for invalid or inactive stream ids. Returning uint64max
2435     // allows QuicPacketCreator to write as much data as possible.
2436     return std::numeric_limits<QuicByteCount>::max();
2437   }
2438   return stream->CalculateSendWindowSize();
2439 }
2440 
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2441 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2442                                                    QuicStreamOffset offset,
2443                                                    QuicByteCount data_length,
2444                                                    QuicDataWriter* writer) {
2445   QuicStream* stream = GetStream(id);
2446   if (stream == nullptr) {
2447     // This causes the connection to be closed because of failed to serialize
2448     // packet.
2449     QUIC_BUG(quic_bug_10866_13)
2450         << "Stream " << id << " does not exist when trying to write data."
2451         << " version:" << transport_version();
2452     return STREAM_MISSING;
2453   }
2454   if (stream->WriteStreamData(offset, data_length, writer)) {
2455     return WRITE_SUCCESS;
2456   }
2457   return WRITE_FAILED;
2458 }
2459 
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2460 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2461                                   QuicStreamOffset offset,
2462                                   QuicByteCount data_length,
2463                                   QuicDataWriter* writer) {
2464   return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2465                                                     writer);
2466 }
2467 
GetStatelessResetToken() const2468 StatelessResetToken QuicSession::GetStatelessResetToken() const {
2469   return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2470 }
2471 
CanWriteStreamData() const2472 bool QuicSession::CanWriteStreamData() const {
2473   // Don't write stream data if there are queued data packets.
2474   if (connection_->HasQueuedPackets()) {
2475     return false;
2476   }
2477   // Immediately write handshake data.
2478   if (HasPendingHandshake()) {
2479     return true;
2480   }
2481   return connection_->CanWrite(HAS_RETRANSMITTABLE_DATA);
2482 }
2483 
RetransmitLostData()2484 bool QuicSession::RetransmitLostData() {
2485   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2486   // Retransmit crypto data first.
2487   bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2488   QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2489   if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2490     crypto_stream->WritePendingCryptoRetransmission();
2491   }
2492   // Retransmit crypto data in stream 1 frames (version < 47).
2493   if (!uses_crypto_frames &&
2494       streams_with_pending_retransmission_.contains(
2495           QuicUtils::GetCryptoStreamId(transport_version()))) {
2496     // Retransmit crypto data first.
2497     QuicStream* crypto_stream =
2498         GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2499     crypto_stream->OnCanWrite();
2500     QUICHE_DCHECK(CheckStreamWriteBlocked(crypto_stream));
2501     if (crypto_stream->HasPendingRetransmission()) {
2502       // Connection is write blocked.
2503       return false;
2504     } else {
2505       streams_with_pending_retransmission_.erase(
2506           QuicUtils::GetCryptoStreamId(transport_version()));
2507     }
2508   }
2509   if (control_frame_manager_.HasPendingRetransmission()) {
2510     control_frame_manager_.OnCanWrite();
2511     if (control_frame_manager_.HasPendingRetransmission()) {
2512       return false;
2513     }
2514   }
2515   while (!streams_with_pending_retransmission_.empty()) {
2516     if (!CanWriteStreamData()) {
2517       break;
2518     }
2519     // Retransmit lost data on headers and data streams.
2520     const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2521     QuicStream* stream = GetStream(id);
2522     if (stream != nullptr) {
2523       stream->OnCanWrite();
2524       QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
2525       if (stream->HasPendingRetransmission()) {
2526         // Connection is write blocked.
2527         break;
2528       } else if (!streams_with_pending_retransmission_.empty() &&
2529                  streams_with_pending_retransmission_.begin()->first == id) {
2530         // Retransmit lost data may cause connection close. If this stream
2531         // has not yet sent fin, a RST_STREAM will be sent and it will be
2532         // removed from streams_with_pending_retransmission_.
2533         streams_with_pending_retransmission_.pop_front();
2534       }
2535     } else {
2536       QUIC_BUG(quic_bug_10866_14)
2537           << "Try to retransmit data of a closed stream";
2538       streams_with_pending_retransmission_.pop_front();
2539     }
2540   }
2541 
2542   return streams_with_pending_retransmission_.empty();
2543 }
2544 
NeuterUnencryptedData()2545 void QuicSession::NeuterUnencryptedData() {
2546   QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2547   crypto_stream->NeuterUnencryptedStreamData();
2548   if (!crypto_stream->HasPendingRetransmission() &&
2549       !QuicVersionUsesCryptoFrames(transport_version())) {
2550     streams_with_pending_retransmission_.erase(
2551         QuicUtils::GetCryptoStreamId(transport_version()));
2552   }
2553   connection_->NeuterUnencryptedPackets();
2554 }
2555 
SetTransmissionType(TransmissionType type)2556 void QuicSession::SetTransmissionType(TransmissionType type) {
2557   connection_->SetTransmissionType(type);
2558 }
2559 
SendMessage(absl::Span<quiche::QuicheMemSlice> message)2560 MessageResult QuicSession::SendMessage(
2561     absl::Span<quiche::QuicheMemSlice> message) {
2562   return SendMessage(message, /*flush=*/false);
2563 }
2564 
SendMessage(quiche::QuicheMemSlice message)2565 MessageResult QuicSession::SendMessage(quiche::QuicheMemSlice message) {
2566   return SendMessage(absl::MakeSpan(&message, 1), /*flush=*/false);
2567 }
2568 
SendMessage(absl::Span<quiche::QuicheMemSlice> message,bool flush)2569 MessageResult QuicSession::SendMessage(
2570     absl::Span<quiche::QuicheMemSlice> message, bool flush) {
2571   QUICHE_DCHECK(connection_->connected())
2572       << ENDPOINT << "Try to write messages when connection is closed.";
2573   if (!IsEncryptionEstablished()) {
2574     return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2575   }
2576   QuicConnection::ScopedEncryptionLevelContext context(
2577       connection(), GetEncryptionLevelToSendApplicationData());
2578   MessageStatus result =
2579       connection_->SendMessage(last_message_id_ + 1, message, flush);
2580   if (result == MESSAGE_STATUS_SUCCESS) {
2581     return {result, ++last_message_id_};
2582   }
2583   return {result, 0};
2584 }
2585 
OnMessageAcked(QuicMessageId message_id,QuicTime)2586 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2587                                  QuicTime /*receive_timestamp*/) {
2588   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2589 }
2590 
OnMessageLost(QuicMessageId message_id)2591 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2592   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2593                 << " is considered lost";
2594 }
2595 
CleanUpClosedStreams()2596 void QuicSession::CleanUpClosedStreams() { closed_streams_.clear(); }
2597 
GetCurrentLargestMessagePayload() const2598 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2599   return connection_->GetCurrentLargestMessagePayload();
2600 }
2601 
GetGuaranteedLargestMessagePayload() const2602 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2603   return connection_->GetGuaranteedLargestMessagePayload();
2604 }
2605 
next_outgoing_bidirectional_stream_id() const2606 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2607   if (VersionHasIetfQuicFrames(transport_version())) {
2608     return ietf_streamid_manager_.next_outgoing_bidirectional_stream_id();
2609   }
2610   return stream_id_manager_.next_outgoing_stream_id();
2611 }
2612 
next_outgoing_unidirectional_stream_id() const2613 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2614   if (VersionHasIetfQuicFrames(transport_version())) {
2615     return ietf_streamid_manager_.next_outgoing_unidirectional_stream_id();
2616   }
2617   return stream_id_manager_.next_outgoing_stream_id();
2618 }
2619 
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2620 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2621   const bool allow_new_streams =
2622       frame.unidirectional
2623           ? ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2624                 frame.stream_count)
2625           : ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2626                 frame.stream_count);
2627   if (allow_new_streams) {
2628     OnCanCreateNewOutgoingStream(frame.unidirectional);
2629   }
2630 
2631   return true;
2632 }
2633 
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2634 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2635   std::string error_details;
2636   if (ietf_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2637     return true;
2638   }
2639   connection_->CloseConnection(
2640       QUIC_STREAMS_BLOCKED_ERROR, error_details,
2641       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2642   return false;
2643 }
2644 
max_open_incoming_bidirectional_streams() const2645 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2646   if (VersionHasIetfQuicFrames(transport_version())) {
2647     return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2648   }
2649   return stream_id_manager_.max_open_incoming_streams();
2650 }
2651 
max_open_incoming_unidirectional_streams() const2652 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2653   if (VersionHasIetfQuicFrames(transport_version())) {
2654     return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2655   }
2656   return stream_id_manager_.max_open_incoming_streams();
2657 }
2658 
SelectAlpn(const std::vector<absl::string_view> & alpns) const2659 std::vector<absl::string_view>::const_iterator QuicSession::SelectAlpn(
2660     const std::vector<absl::string_view>& alpns) const {
2661   const std::string alpn = AlpnForVersion(connection()->version());
2662   return std::find(alpns.cbegin(), alpns.cend(), alpn);
2663 }
2664 
OnAlpnSelected(absl::string_view alpn)2665 void QuicSession::OnAlpnSelected(absl::string_view alpn) {
2666   QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2667                                                               : "Client: ")
2668                   << "ALPN selected: " << alpn;
2669 }
2670 
NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level)2671 void QuicSession::NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level) {
2672   GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
2673 }
2674 
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action)2675 void QuicSession::PerformActionOnActiveStreams(
2676     quiche::UnretainedCallback<bool(QuicStream*)> action) {
2677   std::vector<QuicStream*> active_streams;
2678   for (const auto& it : stream_map_) {
2679     if (!it.second->is_static() && !it.second->IsZombie()) {
2680       active_streams.push_back(it.second.get());
2681     }
2682   }
2683 
2684   for (QuicStream* stream : active_streams) {
2685     if (!action(stream)) {
2686       return;
2687     }
2688   }
2689 }
2690 
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action) const2691 void QuicSession::PerformActionOnActiveStreams(
2692     quiche::UnretainedCallback<bool(QuicStream*)> action) const {
2693   for (const auto& it : stream_map_) {
2694     if (!it.second->is_static() && !it.second->IsZombie() &&
2695         !action(it.second.get())) {
2696       return;
2697     }
2698   }
2699 }
2700 
GetEncryptionLevelToSendApplicationData() const2701 EncryptionLevel QuicSession::GetEncryptionLevelToSendApplicationData() const {
2702   return connection_->framer().GetEncryptionLevelToSendApplicationData();
2703 }
2704 
ProcessAllPendingStreams()2705 void QuicSession::ProcessAllPendingStreams() {
2706   std::vector<PendingStream*> pending_streams;
2707   pending_streams.reserve(pending_stream_map_.size());
2708   for (auto it = pending_stream_map_.cbegin(); it != pending_stream_map_.cend();
2709        ++it) {
2710     pending_streams.push_back(it->second.get());
2711   }
2712   for (auto* pending_stream : pending_streams) {
2713     MaybeProcessPendingStream(pending_stream);
2714     if (!connection()->connected()) {
2715       return;
2716     }
2717   }
2718 }
2719 
ValidatePath(std::unique_ptr<QuicPathValidationContext> context,std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,PathValidationReason reason)2720 void QuicSession::ValidatePath(
2721     std::unique_ptr<QuicPathValidationContext> context,
2722     std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,
2723     PathValidationReason reason) {
2724   connection_->ValidatePath(std::move(context), std::move(result_delegate),
2725                             reason);
2726 }
2727 
HasPendingPathValidation() const2728 bool QuicSession::HasPendingPathValidation() const {
2729   return connection_->HasPendingPathValidation();
2730 }
2731 
MigratePath(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,QuicPacketWriter * writer,bool owns_writer)2732 bool QuicSession::MigratePath(const QuicSocketAddress& self_address,
2733                               const QuicSocketAddress& peer_address,
2734                               QuicPacketWriter* writer, bool owns_writer) {
2735   return connection_->MigratePath(self_address, peer_address, writer,
2736                                   owns_writer);
2737 }
2738 
ValidateToken(absl::string_view token)2739 bool QuicSession::ValidateToken(absl::string_view token) {
2740   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_SERVER);
2741   if (GetQuicFlag(quic_reject_retry_token_in_initial_packet)) {
2742     return false;
2743   }
2744   if (token.empty() || token[0] != kAddressTokenPrefix) {
2745     // Validate the prefix for token received in NEW_TOKEN frame.
2746     return false;
2747   }
2748   const bool valid = GetCryptoStream()->ValidateAddressToken(
2749       absl::string_view(token.data() + 1, token.length() - 1));
2750   if (valid) {
2751     const CachedNetworkParameters* cached_network_params =
2752         GetCryptoStream()->PreviousCachedNetworkParams();
2753     if (cached_network_params != nullptr &&
2754         cached_network_params->timestamp() > 0) {
2755       connection()->OnReceiveConnectionState(*cached_network_params);
2756     }
2757   }
2758   return valid;
2759 }
2760 
OnServerPreferredAddressAvailable(const QuicSocketAddress & server_preferred_address)2761 void QuicSession::OnServerPreferredAddressAvailable(
2762     const QuicSocketAddress& server_preferred_address) {
2763   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
2764   if (visitor_ != nullptr) {
2765     visitor_->OnServerPreferredAddressAvailable(server_preferred_address);
2766   }
2767 }
2768 
ProcessPendingStream(PendingStream * pending)2769 QuicStream* QuicSession::ProcessPendingStream(PendingStream* pending) {
2770   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
2771   QUICHE_DCHECK(connection()->connected());
2772   QuicStreamId stream_id = pending->id();
2773   QUIC_BUG_IF(bad pending stream, !IsIncomingStream(stream_id))
2774       << "Pending stream " << stream_id << " is not an incoming stream.";
2775   // TODO(b/305051334) check if this stream is incoming stream before making it
2776   // pending. If not, connection should be closed.
2777   StreamType stream_type = QuicUtils::GetStreamType(
2778       stream_id, perspective(), /*peer_initiated=*/true, version());
2779   switch (stream_type) {
2780     case BIDIRECTIONAL: {
2781       return ProcessBidirectionalPendingStream(pending);
2782     }
2783     case READ_UNIDIRECTIONAL: {
2784       return ProcessReadUnidirectionalPendingStream(pending);
2785     }
2786     case WRITE_UNIDIRECTIONAL:
2787       ABSL_FALLTHROUGH_INTENDED;
2788     case CRYPTO:
2789       QUICHE_BUG(unexpected pending stream)
2790           << "Unexpected pending stream " << stream_id << " with type "
2791           << stream_type;
2792       return nullptr;
2793   }
2794   return nullptr;  // Unreachable, unless the enum value is out-of-range
2795                    // (potentially undefined behavior)
2796 }
2797 
2798 #undef ENDPOINT  // undef for jumbo builds
2799 }  // namespace quic
2800