• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_stream.h"
6 
7 #include <limits>
8 #include <string>
9 
10 #include "absl/strings/str_cat.h"
11 #include "absl/strings/string_view.h"
12 #include "absl/types/optional.h"
13 #include "quiche/quic/core/quic_error_codes.h"
14 #include "quiche/quic/core/quic_flow_controller.h"
15 #include "quiche/quic/core/quic_session.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/core/quic_utils.h"
18 #include "quiche/quic/core/quic_versions.h"
19 #include "quiche/quic/platform/api/quic_bug_tracker.h"
20 #include "quiche/quic/platform/api/quic_flag_utils.h"
21 #include "quiche/quic/platform/api/quic_flags.h"
22 #include "quiche/quic/platform/api/quic_logging.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/platform/api/quiche_mem_slice.h"
25 
26 using spdy::SpdyPriority;
27 
28 namespace quic {
29 
30 #define ENDPOINT \
31   (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
32 
33 namespace {
34 
DefaultFlowControlWindow(ParsedQuicVersion version)35 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
36   if (!version.AllowsLowFlowControlLimits()) {
37     return kDefaultFlowControlSendWindow;
38   }
39   return 0;
40 }
41 
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)42 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
43                                                       QuicStreamId stream_id) {
44   ParsedQuicVersion version = session->connection()->version();
45   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
46     return session->config()->GetInitialStreamFlowControlWindowToSend();
47   }
48 
49   // Unidirectional streams (v99 only).
50   if (VersionHasIetfQuicFrames(version.transport_version) &&
51       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
52     return session->config()
53         ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
54   }
55 
56   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
57                                     session->perspective())) {
58     return session->config()
59         ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
60   }
61 
62   return session->config()
63       ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
64 }
65 
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)66 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
67                                            QuicStreamId stream_id) {
68   ParsedQuicVersion version = session->connection()->version();
69   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
70     if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
71       return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
72     }
73 
74     return DefaultFlowControlWindow(version);
75   }
76 
77   // Unidirectional streams (v99 only).
78   if (VersionHasIetfQuicFrames(version.transport_version) &&
79       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
80     if (session->config()
81             ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
82       return session->config()
83           ->ReceivedInitialMaxStreamDataBytesUnidirectional();
84     }
85 
86     return DefaultFlowControlWindow(version);
87   }
88 
89   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
90                                     session->perspective())) {
91     if (session->config()
92             ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
93       return session->config()
94           ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
95     }
96 
97     return DefaultFlowControlWindow(version);
98   }
99 
100   if (session->config()
101           ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
102     return session->config()
103         ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
104   }
105 
106   return DefaultFlowControlWindow(version);
107 }
108 
109 }  // namespace
110 
PendingStream(QuicStreamId id,QuicSession * session)111 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
112     : id_(id),
113       version_(session->version()),
114       stream_delegate_(session),
115       stream_bytes_read_(0),
116       fin_received_(false),
117       is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
118                                                  /*peer_initiated = */ true,
119                                                  session->version()) ==
120                         BIDIRECTIONAL),
121       connection_flow_controller_(session->flow_controller()),
122       flow_controller_(session, id,
123                        /*is_connection_flow_controller*/ false,
124                        GetReceivedFlowControlWindow(session, id),
125                        GetInitialStreamFlowControlWindowToSend(session, id),
126                        kStreamReceiveWindowLimit,
127                        session->flow_controller()->auto_tune_receive_window(),
128                        session->flow_controller()),
129       sequencer_(this) {}
130 
OnDataAvailable()131 void PendingStream::OnDataAvailable() {
132   // Data should be kept in the sequencer so that
133   // QuicSession::ProcessPendingStream() can read it.
134 }
135 
OnFinRead()136 void PendingStream::OnFinRead() { QUICHE_DCHECK(sequencer_.IsClosed()); }
137 
AddBytesConsumed(QuicByteCount bytes)138 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
139   // It will be called when the metadata of the stream is consumed.
140   flow_controller_.AddBytesConsumed(bytes);
141   connection_flow_controller_->AddBytesConsumed(bytes);
142 }
143 
ResetWithError(QuicResetStreamError)144 void PendingStream::ResetWithError(QuicResetStreamError /*error*/) {
145   // Currently PendingStream is only read-unidirectional. It shouldn't send
146   // Reset.
147   QUICHE_NOTREACHED();
148 }
149 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)150 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
151                                          const std::string& details) {
152   stream_delegate_->OnStreamError(error, details);
153 }
154 
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)155 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
156                                          QuicIetfTransportErrorCodes ietf_error,
157                                          const std::string& details) {
158   stream_delegate_->OnStreamError(error, ietf_error, details);
159 }
160 
id() const161 QuicStreamId PendingStream::id() const { return id_; }
162 
version() const163 ParsedQuicVersion PendingStream::version() const { return version_; }
164 
OnStreamFrame(const QuicStreamFrame & frame)165 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
166   QUICHE_DCHECK_EQ(frame.stream_id, id_);
167 
168   bool is_stream_too_long =
169       (frame.offset > kMaxStreamLength) ||
170       (kMaxStreamLength - frame.offset < frame.data_length);
171   if (is_stream_too_long) {
172     // Close connection if stream becomes too long.
173     QUIC_PEER_BUG(quic_peer_bug_12570_1)
174         << "Receive stream frame reaches max stream length. frame offset "
175         << frame.offset << " length " << frame.data_length;
176     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
177                          "Peer sends more data than allowed on this stream.");
178     return;
179   }
180 
181   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
182     OnUnrecoverableError(
183         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
184         absl::StrCat(
185             "Stream ", id_,
186             " received data with offset: ", frame.offset + frame.data_length,
187             ", which is beyond close offset: ", sequencer()->close_offset()));
188     return;
189   }
190 
191   if (frame.fin) {
192     fin_received_ = true;
193   }
194 
195   // This count includes duplicate data received.
196   QuicByteCount frame_payload_size = frame.data_length;
197   stream_bytes_read_ += frame_payload_size;
198 
199   // Flow control is interested in tracking highest received offset.
200   // Only interested in received frames that carry data.
201   if (frame_payload_size > 0 &&
202       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
203     // As the highest received offset has changed, check to see if this is a
204     // violation of flow control.
205     if (flow_controller_.FlowControlViolation() ||
206         connection_flow_controller_->FlowControlViolation()) {
207       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
208                            "Flow control violation after increasing offset");
209       return;
210     }
211   }
212 
213   sequencer_.OnStreamFrame(frame);
214 }
215 
OnRstStreamFrame(const QuicRstStreamFrame & frame)216 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
217   QUICHE_DCHECK_EQ(frame.stream_id, id_);
218 
219   if (frame.byte_offset > kMaxStreamLength) {
220     // Peer are not suppose to write bytes more than maxium allowed.
221     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
222                          "Reset frame stream offset overflow.");
223     return;
224   }
225 
226   const QuicStreamOffset kMaxOffset =
227       std::numeric_limits<QuicStreamOffset>::max();
228   if (sequencer()->close_offset() != kMaxOffset &&
229       frame.byte_offset != sequencer()->close_offset()) {
230     OnUnrecoverableError(
231         QUIC_STREAM_MULTIPLE_OFFSET,
232         absl::StrCat("Stream ", id_,
233                      " received new final offset: ", frame.byte_offset,
234                      ", which is different from close offset: ",
235                      sequencer()->close_offset()));
236     return;
237   }
238 
239   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
240   if (flow_controller_.FlowControlViolation() ||
241       connection_flow_controller_->FlowControlViolation()) {
242     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
243                          "Flow control violation after increasing offset");
244     return;
245   }
246 }
247 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)248 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
249   QUICHE_DCHECK(is_bidirectional_);
250   flow_controller_.UpdateSendWindowOffset(frame.max_data);
251 }
252 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)253 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
254     QuicStreamOffset new_offset) {
255   uint64_t increment =
256       new_offset - flow_controller_.highest_received_byte_offset();
257   if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
258     return false;
259   }
260 
261   // If |new_offset| increased the stream flow controller's highest received
262   // offset, increase the connection flow controller's value by the incremental
263   // difference.
264   connection_flow_controller_->UpdateHighestReceivedOffset(
265       connection_flow_controller_->highest_received_byte_offset() + increment);
266   return true;
267 }
268 
OnStopSending(QuicResetStreamError stop_sending_error_code)269 void PendingStream::OnStopSending(
270     QuicResetStreamError stop_sending_error_code) {
271   if (!stop_sending_error_code_) {
272     stop_sending_error_code_ = stop_sending_error_code;
273   }
274 }
275 
MarkConsumed(QuicByteCount num_bytes)276 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
277   sequencer_.MarkConsumed(num_bytes);
278 }
279 
StopReading()280 void PendingStream::StopReading() {
281   QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
282   sequencer_.StopReading();
283 }
284 
QuicStream(PendingStream * pending,QuicSession * session,bool is_static)285 QuicStream::QuicStream(PendingStream* pending, QuicSession* session,
286                        bool is_static)
287     : QuicStream(pending->id_, session, std::move(pending->sequencer_),
288                  is_static,
289                  QuicUtils::GetStreamType(pending->id_, session->perspective(),
290                                           /*peer_initiated = */ true,
291                                           session->version()),
292                  pending->stream_bytes_read_, pending->fin_received_,
293                  std::move(pending->flow_controller_),
294                  pending->connection_flow_controller_) {
295   QUICHE_DCHECK(session->version().HasIetfQuicFrames());
296   sequencer_.set_stream(this);
297 }
298 
299 namespace {
300 
FlowController(QuicStreamId id,QuicSession * session,StreamType type)301 absl::optional<QuicFlowController> FlowController(QuicStreamId id,
302                                                   QuicSession* session,
303                                                   StreamType type) {
304   if (type == CRYPTO) {
305     // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
306     // it is using crypto frames instead of stream frames. The QuicCryptoStream
307     // doesn't have any flow control in that case, so we don't create a
308     // QuicFlowController for it.
309     return absl::nullopt;
310   }
311   return QuicFlowController(
312       session, id,
313       /*is_connection_flow_controller*/ false,
314       GetReceivedFlowControlWindow(session, id),
315       GetInitialStreamFlowControlWindowToSend(session, id),
316       kStreamReceiveWindowLimit,
317       session->flow_controller()->auto_tune_receive_window(),
318       session->flow_controller());
319 }
320 
321 }  // namespace
322 
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)323 QuicStream::QuicStream(QuicStreamId id, QuicSession* session, bool is_static,
324                        StreamType type)
325     : QuicStream(id, session, QuicStreamSequencer(this), is_static, type, 0,
326                  false, FlowController(id, session, type),
327                  session->flow_controller()) {}
328 
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,absl::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller)329 QuicStream::QuicStream(QuicStreamId id, QuicSession* session,
330                        QuicStreamSequencer sequencer, bool is_static,
331                        StreamType type, uint64_t stream_bytes_read,
332                        bool fin_received,
333                        absl::optional<QuicFlowController> flow_controller,
334                        QuicFlowController* connection_flow_controller)
335     : sequencer_(std::move(sequencer)),
336       id_(id),
337       session_(session),
338       stream_delegate_(session),
339       priority_(QuicStreamPriority::Default(session->priority_type())),
340       stream_bytes_read_(stream_bytes_read),
341       stream_error_(QuicResetStreamError::NoError()),
342       connection_error_(QUIC_NO_ERROR),
343       read_side_closed_(false),
344       write_side_closed_(false),
345       write_side_data_recvd_state_notified_(false),
346       fin_buffered_(false),
347       fin_sent_(false),
348       fin_outstanding_(false),
349       fin_lost_(false),
350       fin_received_(fin_received),
351       rst_sent_(false),
352       rst_received_(false),
353       stop_sending_sent_(false),
354       flow_controller_(std::move(flow_controller)),
355       connection_flow_controller_(connection_flow_controller),
356       stream_contributes_to_connection_flow_control_(true),
357       busy_counter_(0),
358       add_random_padding_after_fin_(false),
359       send_buffer_(
360           session->connection()->helper()->GetStreamSendBufferAllocator()),
361       buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
362       is_static_(is_static),
363       deadline_(QuicTime::Zero()),
364       was_draining_(false),
365       type_(VersionHasIetfQuicFrames(session->transport_version()) &&
366                     type != CRYPTO
367                 ? QuicUtils::GetStreamType(id_, session->perspective(),
368                                            session->IsIncomingStream(id_),
369                                            session->version())
370                 : type),
371       creation_time_(session->connection()->clock()->ApproximateNow()),
372       perspective_(session->perspective()) {
373   if (type_ == WRITE_UNIDIRECTIONAL) {
374     fin_received_ = true;
375     CloseReadSide();
376   } else if (type_ == READ_UNIDIRECTIONAL) {
377     fin_sent_ = true;
378     CloseWriteSide();
379   }
380   if (type_ != CRYPTO) {
381     stream_delegate_->RegisterStreamPriority(id, is_static_, priority_);
382   }
383 }
384 
~QuicStream()385 QuicStream::~QuicStream() {
386   if (session_ != nullptr && IsWaitingForAcks()) {
387     QUIC_DVLOG(1)
388         << ENDPOINT << "Stream " << id_
389         << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
390         << send_buffer_.stream_bytes_outstanding()
391         << ", fin_outstanding: " << fin_outstanding_;
392   }
393   if (stream_delegate_ != nullptr && type_ != CRYPTO) {
394     stream_delegate_->UnregisterStreamPriority(id());
395   }
396 }
397 
OnStreamFrame(const QuicStreamFrame & frame)398 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
399   QUICHE_DCHECK_EQ(frame.stream_id, id_);
400 
401   QUICHE_DCHECK(!(read_side_closed_ && write_side_closed_));
402 
403   if (frame.fin && is_static_) {
404     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
405                          "Attempt to close a static stream");
406     return;
407   }
408 
409   if (type_ == WRITE_UNIDIRECTIONAL) {
410     OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
411                          "Data received on write unidirectional stream");
412     return;
413   }
414 
415   bool is_stream_too_long =
416       (frame.offset > kMaxStreamLength) ||
417       (kMaxStreamLength - frame.offset < frame.data_length);
418   if (is_stream_too_long) {
419     // Close connection if stream becomes too long.
420     QUIC_PEER_BUG(quic_peer_bug_10586_1)
421         << "Receive stream frame on stream " << id_
422         << " reaches max stream length. frame offset " << frame.offset
423         << " length " << frame.data_length << ". " << sequencer_.DebugString();
424     OnUnrecoverableError(
425         QUIC_STREAM_LENGTH_OVERFLOW,
426         absl::StrCat("Peer sends more data than allowed on stream ", id_,
427                      ". frame: offset = ", frame.offset, ", length = ",
428                      frame.data_length, ". ", sequencer_.DebugString()));
429     return;
430   }
431 
432   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
433     OnUnrecoverableError(
434         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
435         absl::StrCat(
436             "Stream ", id_,
437             " received data with offset: ", frame.offset + frame.data_length,
438             ", which is beyond close offset: ", sequencer_.close_offset()));
439     return;
440   }
441 
442   if (frame.fin && !fin_received_) {
443     fin_received_ = true;
444     if (fin_sent_) {
445       QUICHE_DCHECK(!was_draining_);
446       session_->StreamDraining(id_,
447                                /*unidirectional=*/type_ != BIDIRECTIONAL);
448       was_draining_ = true;
449     }
450   }
451 
452   if (read_side_closed_) {
453     QUIC_DLOG(INFO)
454         << ENDPOINT << "Stream " << frame.stream_id
455         << " is closed for reading. Ignoring newly received stream data.";
456     // The subclass does not want to read data:  blackhole the data.
457     return;
458   }
459 
460   // This count includes duplicate data received.
461   QuicByteCount frame_payload_size = frame.data_length;
462   stream_bytes_read_ += frame_payload_size;
463 
464   // Flow control is interested in tracking highest received offset.
465   // Only interested in received frames that carry data.
466   if (frame_payload_size > 0 &&
467       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
468     // As the highest received offset has changed, check to see if this is a
469     // violation of flow control.
470     QUIC_BUG_IF(quic_bug_12570_2, !flow_controller_.has_value())
471         << ENDPOINT << "OnStreamFrame called on stream without flow control";
472     if ((flow_controller_.has_value() &&
473          flow_controller_->FlowControlViolation()) ||
474         connection_flow_controller_->FlowControlViolation()) {
475       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
476                            "Flow control violation after increasing offset");
477       return;
478     }
479   }
480 
481   sequencer_.OnStreamFrame(frame);
482 }
483 
OnStopSending(QuicResetStreamError error)484 bool QuicStream::OnStopSending(QuicResetStreamError error) {
485   // Do not reset the stream if all data has been sent and acknowledged.
486   if (write_side_closed() && !IsWaitingForAcks()) {
487     QUIC_DVLOG(1) << ENDPOINT
488                   << "Ignoring STOP_SENDING for a write closed stream, id: "
489                   << id_;
490     return false;
491   }
492 
493   if (is_static_) {
494     QUIC_DVLOG(1) << ENDPOINT
495                   << "Received STOP_SENDING for a static stream, id: " << id_
496                   << " Closing connection";
497     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
498                          "Received STOP_SENDING for a static stream");
499     return false;
500   }
501 
502   stream_error_ = error;
503   MaybeSendRstStream(error);
504   return true;
505 }
506 
num_frames_received() const507 int QuicStream::num_frames_received() const {
508   return sequencer_.num_frames_received();
509 }
510 
num_duplicate_frames_received() const511 int QuicStream::num_duplicate_frames_received() const {
512   return sequencer_.num_duplicate_frames_received();
513 }
514 
OnStreamReset(const QuicRstStreamFrame & frame)515 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
516   rst_received_ = true;
517   if (frame.byte_offset > kMaxStreamLength) {
518     // Peer are not suppose to write bytes more than maxium allowed.
519     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
520                          "Reset frame stream offset overflow.");
521     return;
522   }
523 
524   const QuicStreamOffset kMaxOffset =
525       std::numeric_limits<QuicStreamOffset>::max();
526   if (sequencer()->close_offset() != kMaxOffset &&
527       frame.byte_offset != sequencer()->close_offset()) {
528     OnUnrecoverableError(
529         QUIC_STREAM_MULTIPLE_OFFSET,
530         absl::StrCat("Stream ", id_,
531                      " received new final offset: ", frame.byte_offset,
532                      ", which is different from close offset: ",
533                      sequencer_.close_offset()));
534     return;
535   }
536 
537   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
538   QUIC_BUG_IF(quic_bug_12570_3, !flow_controller_.has_value())
539       << ENDPOINT << "OnStreamReset called on stream without flow control";
540   if ((flow_controller_.has_value() &&
541        flow_controller_->FlowControlViolation()) ||
542       connection_flow_controller_->FlowControlViolation()) {
543     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
544                          "Flow control violation after increasing offset");
545     return;
546   }
547 
548   stream_error_ = frame.error();
549   // Google QUIC closes both sides of the stream in response to a
550   // RESET_STREAM, IETF QUIC closes only the read side.
551   if (!VersionHasIetfQuicFrames(transport_version())) {
552     CloseWriteSide();
553   }
554   CloseReadSide();
555 }
556 
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)557 void QuicStream::OnConnectionClosed(QuicErrorCode error,
558                                     ConnectionCloseSource /*source*/) {
559   if (read_side_closed_ && write_side_closed_) {
560     return;
561   }
562   if (error != QUIC_NO_ERROR) {
563     stream_error_ =
564         QuicResetStreamError::FromInternal(QUIC_STREAM_CONNECTION_ERROR);
565     connection_error_ = error;
566   }
567 
568   CloseWriteSide();
569   CloseReadSide();
570 }
571 
OnFinRead()572 void QuicStream::OnFinRead() {
573   QUICHE_DCHECK(sequencer_.IsClosed());
574   // OnFinRead can be called due to a FIN flag in a headers block, so there may
575   // have been no OnStreamFrame call with a FIN in the frame.
576   fin_received_ = true;
577   // If fin_sent_ is true, then CloseWriteSide has already been called, and the
578   // stream will be destroyed by CloseReadSide, so don't need to call
579   // StreamDraining.
580   CloseReadSide();
581 }
582 
SetFinSent()583 void QuicStream::SetFinSent() {
584   QUICHE_DCHECK(!VersionUsesHttp3(transport_version()));
585   fin_sent_ = true;
586 }
587 
Reset(QuicRstStreamErrorCode error)588 void QuicStream::Reset(QuicRstStreamErrorCode error) {
589   ResetWithError(QuicResetStreamError::FromInternal(error));
590 }
591 
ResetWithError(QuicResetStreamError error)592 void QuicStream::ResetWithError(QuicResetStreamError error) {
593   stream_error_ = error;
594   QuicConnection::ScopedPacketFlusher flusher(session()->connection());
595   MaybeSendStopSending(error);
596   MaybeSendRstStream(error);
597 
598   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
599     session()->MaybeCloseZombieStream(id_);
600   }
601 }
602 
ResetWriteSide(QuicResetStreamError error)603 void QuicStream::ResetWriteSide(QuicResetStreamError error) {
604   stream_error_ = error;
605   MaybeSendRstStream(error);
606 
607   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
608     session()->MaybeCloseZombieStream(id_);
609   }
610 }
611 
SendStopSending(QuicResetStreamError error)612 void QuicStream::SendStopSending(QuicResetStreamError error) {
613   stream_error_ = error;
614   MaybeSendStopSending(error);
615 
616   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
617     session()->MaybeCloseZombieStream(id_);
618   }
619 }
620 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)621 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
622                                       const std::string& details) {
623   stream_delegate_->OnStreamError(error, details);
624 }
625 
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)626 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
627                                       QuicIetfTransportErrorCodes ietf_error,
628                                       const std::string& details) {
629   stream_delegate_->OnStreamError(error, ietf_error, details);
630 }
631 
priority() const632 const QuicStreamPriority& QuicStream::priority() const { return priority_; }
633 
SetPriority(const QuicStreamPriority & priority)634 void QuicStream::SetPriority(const QuicStreamPriority& priority) {
635   priority_ = priority;
636 
637   MaybeSendPriorityUpdateFrame();
638 
639   stream_delegate_->UpdateStreamPriority(id(), priority);
640 }
641 
WriteOrBufferData(absl::string_view data,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)642 void QuicStream::WriteOrBufferData(
643     absl::string_view data, bool fin,
644     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
645         ack_listener) {
646   QUIC_BUG_IF(quic_bug_12570_4,
647               QuicUtils::IsCryptoStreamId(transport_version(), id_))
648       << ENDPOINT
649       << "WriteOrBufferData is used to send application data, use "
650          "WriteOrBufferDataAtLevel to send crypto data.";
651   return WriteOrBufferDataAtLevel(
652       data, fin, session()->GetEncryptionLevelToSendApplicationData(),
653       ack_listener);
654 }
655 
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)656 void QuicStream::WriteOrBufferDataAtLevel(
657     absl::string_view data, bool fin, EncryptionLevel level,
658     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
659         ack_listener) {
660   if (data.empty() && !fin) {
661     QUIC_BUG(quic_bug_10586_2) << "data.empty() && !fin";
662     return;
663   }
664 
665   if (fin_buffered_) {
666     QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
667     return;
668   }
669   if (write_side_closed_) {
670     QUIC_DLOG(ERROR) << ENDPOINT
671                      << "Attempt to write when the write side is closed";
672     if (type_ == READ_UNIDIRECTIONAL) {
673       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
674                            "Try to send data on read unidirectional stream");
675     }
676     return;
677   }
678 
679   fin_buffered_ = fin;
680 
681   bool had_buffered_data = HasBufferedData();
682   // Do not respect buffered data upper limit as WriteOrBufferData guarantees
683   // all data to be consumed.
684   if (data.length() > 0) {
685     QuicStreamOffset offset = send_buffer_.stream_offset();
686     if (kMaxStreamLength - offset < data.length()) {
687       QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
688       OnUnrecoverableError(
689           QUIC_STREAM_LENGTH_OVERFLOW,
690           absl::StrCat("Write too many data via stream ", id_));
691       return;
692     }
693     send_buffer_.SaveStreamData(data);
694     OnDataBuffered(offset, data.length(), ack_listener);
695   }
696   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
697     // Write data if there is no buffered data before.
698     WriteBufferedData(level);
699   }
700 }
701 
OnCanWrite()702 void QuicStream::OnCanWrite() {
703   if (HasDeadlinePassed()) {
704     OnDeadlinePassed();
705     return;
706   }
707   if (HasPendingRetransmission()) {
708     WritePendingRetransmission();
709     // Exit early to allow other streams to write pending retransmissions if
710     // any.
711     return;
712   }
713 
714   if (write_side_closed_) {
715     QUIC_DLOG(ERROR)
716         << ENDPOINT << "Stream " << id()
717         << " attempting to write new data when the write side is closed";
718     return;
719   }
720   if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
721     WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
722   }
723   if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
724     // Notify upper layer to write new data when buffered data size is below
725     // low water mark.
726     OnCanWriteNewData();
727   }
728 }
729 
MaybeSendBlocked()730 void QuicStream::MaybeSendBlocked() {
731   if (!flow_controller_.has_value()) {
732     QUIC_BUG(quic_bug_10586_5)
733         << ENDPOINT << "MaybeSendBlocked called on stream without flow control";
734     return;
735   }
736   flow_controller_->MaybeSendBlocked();
737   if (!stream_contributes_to_connection_flow_control_) {
738     return;
739   }
740   connection_flow_controller_->MaybeSendBlocked();
741 
742   // If the stream is blocked by connection-level flow control but not by
743   // stream-level flow control, add the stream to the write blocked list so that
744   // the stream will be given a chance to write when a connection-level
745   // WINDOW_UPDATE arrives.
746   if (!write_side_closed_ && connection_flow_controller_->IsBlocked() &&
747       !flow_controller_->IsBlocked()) {
748     session_->MarkConnectionLevelWriteBlocked(id());
749   }
750 }
751 
WriteMemSlice(quiche::QuicheMemSlice span,bool fin)752 QuicConsumedData QuicStream::WriteMemSlice(quiche::QuicheMemSlice span,
753                                            bool fin) {
754   return WriteMemSlices(absl::MakeSpan(&span, 1), fin);
755 }
756 
WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span,bool fin)757 QuicConsumedData QuicStream::WriteMemSlices(
758     absl::Span<quiche::QuicheMemSlice> span, bool fin) {
759   QuicConsumedData consumed_data(0, false);
760   if (span.empty() && !fin) {
761     QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin";
762     return consumed_data;
763   }
764 
765   if (fin_buffered_) {
766     QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
767     return consumed_data;
768   }
769 
770   if (write_side_closed_) {
771     QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
772                      << " attempting to write when the write side is closed";
773     if (type_ == READ_UNIDIRECTIONAL) {
774       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
775                            "Try to send data on read unidirectional stream");
776     }
777     return consumed_data;
778   }
779 
780   bool had_buffered_data = HasBufferedData();
781   if (CanWriteNewData() || span.empty()) {
782     consumed_data.fin_consumed = fin;
783     if (!span.empty()) {
784       // Buffer all data if buffered data size is below limit.
785       QuicStreamOffset offset = send_buffer_.stream_offset();
786       consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
787       if (offset > send_buffer_.stream_offset() ||
788           kMaxStreamLength < send_buffer_.stream_offset()) {
789         QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
790         OnUnrecoverableError(
791             QUIC_STREAM_LENGTH_OVERFLOW,
792             absl::StrCat("Write too many data via stream ", id_));
793         return consumed_data;
794       }
795       OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
796     }
797   }
798   fin_buffered_ = consumed_data.fin_consumed;
799 
800   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
801     // Write data if there is no buffered data before.
802     WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
803   }
804 
805   return consumed_data;
806 }
807 
HasPendingRetransmission() const808 bool QuicStream::HasPendingRetransmission() const {
809   return send_buffer_.HasPendingRetransmission() || fin_lost_;
810 }
811 
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const812 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
813                                           QuicByteCount data_length,
814                                           bool fin) const {
815   return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
816          (fin && fin_outstanding_);
817 }
818 
CloseReadSide()819 void QuicStream::CloseReadSide() {
820   if (read_side_closed_) {
821     return;
822   }
823   QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
824 
825   read_side_closed_ = true;
826   sequencer_.ReleaseBuffer();
827 
828   if (write_side_closed_) {
829     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
830     session_->OnStreamClosed(id());
831     OnClose();
832   }
833 }
834 
CloseWriteSide()835 void QuicStream::CloseWriteSide() {
836   if (write_side_closed_) {
837     return;
838   }
839   QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
840 
841   write_side_closed_ = true;
842   if (read_side_closed_) {
843     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
844     session_->OnStreamClosed(id());
845     OnClose();
846   }
847 }
848 
MaybeSendStopSending(QuicResetStreamError error)849 void QuicStream::MaybeSendStopSending(QuicResetStreamError error) {
850   if (stop_sending_sent_) {
851     return;
852   }
853 
854   if (!session()->version().UsesHttp3() && !error.ok()) {
855     // In gQUIC, RST with error closes both read and write side.
856     return;
857   }
858 
859   if (session()->version().UsesHttp3()) {
860     session()->MaybeSendStopSendingFrame(id(), error);
861   } else {
862     QUICHE_DCHECK_EQ(QUIC_STREAM_NO_ERROR, error.internal_code());
863     session()->MaybeSendRstStreamFrame(id(), QuicResetStreamError::NoError(),
864                                        stream_bytes_written());
865   }
866   stop_sending_sent_ = true;
867   CloseReadSide();
868 }
869 
MaybeSendRstStream(QuicResetStreamError error)870 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
871   if (rst_sent_) {
872     return;
873   }
874 
875   if (!session()->version().UsesHttp3()) {
876     QUIC_BUG_IF(quic_bug_12570_5, error.ok());
877     stop_sending_sent_ = true;
878     CloseReadSide();
879   }
880   session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
881   rst_sent_ = true;
882   CloseWriteSide();
883 }
884 
HasBufferedData() const885 bool QuicStream::HasBufferedData() const {
886   QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
887   return send_buffer_.stream_offset() > stream_bytes_written();
888 }
889 
version() const890 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
891 
transport_version() const892 QuicTransportVersion QuicStream::transport_version() const {
893   return session_->transport_version();
894 }
895 
handshake_protocol() const896 HandshakeProtocol QuicStream::handshake_protocol() const {
897   return session_->connection()->version().handshake_protocol;
898 }
899 
StopReading()900 void QuicStream::StopReading() {
901   QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
902   sequencer_.StopReading();
903 }
904 
OnClose()905 void QuicStream::OnClose() {
906   QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
907 
908   if (!fin_sent_ && !rst_sent_) {
909     QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
910                                       session()->version().UsesHttp3())
911         << "The stream should've already sent RST in response to "
912            "STOP_SENDING";
913     // For flow control accounting, tell the peer how many bytes have been
914     // written on this stream before termination. Done here if needed, using a
915     // RST_STREAM frame.
916     MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
917     session_->MaybeCloseZombieStream(id_);
918   }
919 
920   if (!flow_controller_.has_value() ||
921       flow_controller_->FlowControlViolation() ||
922       connection_flow_controller_->FlowControlViolation()) {
923     return;
924   }
925   // The stream is being closed and will not process any further incoming bytes.
926   // As there may be more bytes in flight, to ensure that both endpoints have
927   // the same connection level flow control state, mark all unreceived or
928   // buffered bytes as consumed.
929   QuicByteCount bytes_to_consume =
930       flow_controller_->highest_received_byte_offset() -
931       flow_controller_->bytes_consumed();
932   AddBytesConsumed(bytes_to_consume);
933 }
934 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)935 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
936   if (type_ == READ_UNIDIRECTIONAL) {
937     OnUnrecoverableError(
938         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
939         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
940     return;
941   }
942 
943   if (!flow_controller_.has_value()) {
944     QUIC_BUG(quic_bug_10586_9)
945         << ENDPOINT
946         << "OnWindowUpdateFrame called on stream without flow control";
947     return;
948   }
949 
950   if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
951     // Let session unblock this stream.
952     session_->MarkConnectionLevelWriteBlocked(id_);
953   }
954 }
955 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)956 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
957     QuicStreamOffset new_offset) {
958   if (!flow_controller_.has_value()) {
959     QUIC_BUG(quic_bug_10586_10)
960         << ENDPOINT
961         << "MaybeIncreaseHighestReceivedOffset called on stream without "
962            "flow control";
963     return false;
964   }
965   uint64_t increment =
966       new_offset - flow_controller_->highest_received_byte_offset();
967   if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
968     return false;
969   }
970 
971   // If |new_offset| increased the stream flow controller's highest received
972   // offset, increase the connection flow controller's value by the incremental
973   // difference.
974   if (stream_contributes_to_connection_flow_control_) {
975     connection_flow_controller_->UpdateHighestReceivedOffset(
976         connection_flow_controller_->highest_received_byte_offset() +
977         increment);
978   }
979   return true;
980 }
981 
AddBytesSent(QuicByteCount bytes)982 void QuicStream::AddBytesSent(QuicByteCount bytes) {
983   if (!flow_controller_.has_value()) {
984     QUIC_BUG(quic_bug_10586_11)
985         << ENDPOINT << "AddBytesSent called on stream without flow control";
986     return;
987   }
988   flow_controller_->AddBytesSent(bytes);
989   if (stream_contributes_to_connection_flow_control_) {
990     connection_flow_controller_->AddBytesSent(bytes);
991   }
992 }
993 
AddBytesConsumed(QuicByteCount bytes)994 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
995   if (type_ == CRYPTO) {
996     // A stream with type CRYPTO has no flow control, so there's nothing this
997     // function needs to do. This function still gets called by the
998     // QuicStreamSequencers used by QuicCryptoStream.
999     return;
1000   }
1001   if (!flow_controller_.has_value()) {
1002     QUIC_BUG(quic_bug_12570_7)
1003         << ENDPOINT
1004         << "AddBytesConsumed called on non-crypto stream without flow control";
1005     return;
1006   }
1007   // Only adjust stream level flow controller if still reading.
1008   if (!read_side_closed_) {
1009     flow_controller_->AddBytesConsumed(bytes);
1010   }
1011 
1012   if (stream_contributes_to_connection_flow_control_) {
1013     connection_flow_controller_->AddBytesConsumed(bytes);
1014   }
1015 }
1016 
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1017 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1018                                              bool was_zero_rtt_rejected) {
1019   if (!flow_controller_.has_value()) {
1020     QUIC_BUG(quic_bug_10586_12)
1021         << ENDPOINT
1022         << "ConfigSendWindowOffset called on stream without flow control";
1023     return false;
1024   }
1025 
1026   // The validation code below is for QUIC with TLS only.
1027   if (new_offset < flow_controller_->send_window_offset()) {
1028     QUICHE_DCHECK(session()->version().UsesTls());
1029     if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1030       // The client is given flow control window lower than what's written in
1031       // 0-RTT. This QUIC implementation is unable to retransmit them.
1032       QUIC_BUG_IF(quic_bug_12570_8, perspective_ == Perspective::IS_SERVER)
1033           << "Server streams' flow control should never be configured twice.";
1034       OnUnrecoverableError(
1035           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1036           absl::StrCat(
1037               "Server rejected 0-RTT, aborting because new stream max data ",
1038               new_offset, " for stream ", id_, " is less than currently used: ",
1039               flow_controller_->bytes_sent()));
1040       return false;
1041     } else if (session()->version().AllowsLowFlowControlLimits()) {
1042       // In IETF QUIC, if the client receives flow control limit lower than what
1043       // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1044       // peer's fault or our implementation's fault.
1045       QUIC_BUG_IF(quic_bug_12570_9, perspective_ == Perspective::IS_SERVER)
1046           << "Server streams' flow control should never be configured twice.";
1047       OnUnrecoverableError(
1048           was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1049                                 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1050           absl::StrCat(
1051               was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1052                                     : "",
1053               "new stream max data ", new_offset, " decreases current limit: ",
1054               flow_controller_->send_window_offset()));
1055       return false;
1056     }
1057   }
1058 
1059   if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1060     // Let session unblock this stream.
1061     session_->MarkConnectionLevelWriteBlocked(id_);
1062   }
1063   return true;
1064 }
1065 
AddRandomPaddingAfterFin()1066 void QuicStream::AddRandomPaddingAfterFin() {
1067   add_random_padding_after_fin_ = true;
1068 }
1069 
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1070 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1071                                     QuicByteCount data_length, bool fin_acked,
1072                                     QuicTime::Delta /*ack_delay_time*/,
1073                                     QuicTime /*receive_timestamp*/,
1074                                     QuicByteCount* newly_acked_length) {
1075   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1076                 << "[" << offset << ", " << offset + data_length << "]"
1077                 << " fin = " << fin_acked;
1078   *newly_acked_length = 0;
1079   if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1080                                       newly_acked_length)) {
1081     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1082     return false;
1083   }
1084   if (!fin_sent_ && fin_acked) {
1085     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1086     return false;
1087   }
1088   // Indicates whether ack listener's OnPacketAcked should be called.
1089   const bool new_data_acked =
1090       *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1091   if (fin_acked) {
1092     fin_outstanding_ = false;
1093     fin_lost_ = false;
1094   }
1095   if (!IsWaitingForAcks() && write_side_closed_ &&
1096       !write_side_data_recvd_state_notified_) {
1097     OnWriteSideInDataRecvdState();
1098     write_side_data_recvd_state_notified_ = true;
1099   }
1100   if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1101     session_->MaybeCloseZombieStream(id_);
1102   }
1103   return new_data_acked;
1104 }
1105 
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1106 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1107                                             QuicByteCount data_length,
1108                                             bool fin_retransmitted) {
1109   send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1110   if (fin_retransmitted) {
1111     fin_lost_ = false;
1112   }
1113 }
1114 
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1115 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1116                                    QuicByteCount data_length, bool fin_lost) {
1117   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1118                 << "[" << offset << ", " << offset + data_length << "]"
1119                 << " fin = " << fin_lost;
1120   if (data_length > 0) {
1121     send_buffer_.OnStreamDataLost(offset, data_length);
1122   }
1123   if (fin_lost && fin_outstanding_) {
1124     fin_lost_ = true;
1125   }
1126 }
1127 
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1128 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1129                                       QuicByteCount data_length, bool fin,
1130                                       TransmissionType type) {
1131   QUICHE_DCHECK(type == PTO_RETRANSMISSION);
1132   if (HasDeadlinePassed()) {
1133     OnDeadlinePassed();
1134     return true;
1135   }
1136   QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1137                                                    offset + data_length);
1138   retransmission.Difference(bytes_acked());
1139   bool retransmit_fin = fin && fin_outstanding_;
1140   if (retransmission.Empty() && !retransmit_fin) {
1141     return true;
1142   }
1143   QuicConsumedData consumed(0, false);
1144   for (const auto& interval : retransmission) {
1145     QuicStreamOffset retransmission_offset = interval.min();
1146     QuicByteCount retransmission_length = interval.max() - interval.min();
1147     const bool can_bundle_fin =
1148         retransmit_fin && (retransmission_offset + retransmission_length ==
1149                            stream_bytes_written());
1150     consumed = stream_delegate_->WritevData(
1151         id_, retransmission_length, retransmission_offset,
1152         can_bundle_fin ? FIN : NO_FIN, type,
1153         session()->GetEncryptionLevelToSendApplicationData());
1154     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1155                   << " is forced to retransmit stream data ["
1156                   << retransmission_offset << ", "
1157                   << retransmission_offset + retransmission_length
1158                   << ") and fin: " << can_bundle_fin
1159                   << ", consumed: " << consumed;
1160     OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1161                                consumed.fin_consumed);
1162     if (can_bundle_fin) {
1163       retransmit_fin = !consumed.fin_consumed;
1164     }
1165     if (consumed.bytes_consumed < retransmission_length ||
1166         (can_bundle_fin && !consumed.fin_consumed)) {
1167       // Connection is write blocked.
1168       return false;
1169     }
1170   }
1171   if (retransmit_fin) {
1172     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1173                   << " retransmits fin only frame.";
1174     consumed = stream_delegate_->WritevData(
1175         id_, 0, stream_bytes_written(), FIN, type,
1176         session()->GetEncryptionLevelToSendApplicationData());
1177     if (!consumed.fin_consumed) {
1178       return false;
1179     }
1180   }
1181   return true;
1182 }
1183 
IsWaitingForAcks() const1184 bool QuicStream::IsWaitingForAcks() const {
1185   return (!rst_sent_ || stream_error_.ok()) &&
1186          (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1187 }
1188 
ReadableBytes() const1189 QuicByteCount QuicStream::ReadableBytes() const {
1190   return sequencer_.ReadableBytes();
1191 }
1192 
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1193 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1194                                  QuicByteCount data_length,
1195                                  QuicDataWriter* writer) {
1196   QUICHE_DCHECK_LT(0u, data_length);
1197   QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1198                 << offset << " length " << data_length;
1199   return send_buffer_.WriteStreamData(offset, data_length, writer);
1200 }
1201 
WriteBufferedData(EncryptionLevel level)1202 void QuicStream::WriteBufferedData(EncryptionLevel level) {
1203   QUICHE_DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1204 
1205   if (session_->ShouldYield(id())) {
1206     session_->MarkConnectionLevelWriteBlocked(id());
1207     return;
1208   }
1209 
1210   // Size of buffered data.
1211   QuicByteCount write_length = BufferedDataBytes();
1212 
1213   // A FIN with zero data payload should not be flow control blocked.
1214   bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1215 
1216   bool fin = fin_buffered_;
1217 
1218   // How much data flow control permits to be written.
1219   QuicByteCount send_window;
1220   if (flow_controller_.has_value()) {
1221     send_window = flow_controller_->SendWindowSize();
1222   } else {
1223     send_window = std::numeric_limits<QuicByteCount>::max();
1224     QUIC_BUG(quic_bug_10586_13)
1225         << ENDPOINT
1226         << "WriteBufferedData called on stream without flow control";
1227   }
1228   if (stream_contributes_to_connection_flow_control_) {
1229     send_window =
1230         std::min(send_window, connection_flow_controller_->SendWindowSize());
1231   }
1232 
1233   if (send_window == 0 && !fin_with_zero_data) {
1234     // Quick return if nothing can be sent.
1235     MaybeSendBlocked();
1236     return;
1237   }
1238 
1239   if (write_length > send_window) {
1240     // Don't send the FIN unless all the data will be sent.
1241     fin = false;
1242 
1243     // Writing more data would be a violation of flow control.
1244     write_length = send_window;
1245     QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1246                   << write_length << " due to flow control";
1247   }
1248 
1249   StreamSendingState state = fin ? FIN : NO_FIN;
1250   if (fin && add_random_padding_after_fin_) {
1251     state = FIN_AND_PADDING;
1252   }
1253   QuicConsumedData consumed_data =
1254       stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1255                                    state, NOT_RETRANSMISSION, level);
1256 
1257   OnStreamDataConsumed(consumed_data.bytes_consumed);
1258 
1259   AddBytesSent(consumed_data.bytes_consumed);
1260   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1261                 << stream_bytes_written() << " bytes "
1262                 << " and has buffered data " << BufferedDataBytes() << " bytes."
1263                 << " fin is sent: " << consumed_data.fin_consumed
1264                 << " fin is buffered: " << fin_buffered_;
1265 
1266   // The write may have generated a write error causing this stream to be
1267   // closed. If so, simply return without marking the stream write blocked.
1268   if (write_side_closed_) {
1269     return;
1270   }
1271 
1272   if (consumed_data.bytes_consumed == write_length) {
1273     if (!fin_with_zero_data) {
1274       MaybeSendBlocked();
1275     }
1276     if (fin && consumed_data.fin_consumed) {
1277       QUICHE_DCHECK(!fin_sent_);
1278       fin_sent_ = true;
1279       fin_outstanding_ = true;
1280       if (fin_received_) {
1281         QUICHE_DCHECK(!was_draining_);
1282         session_->StreamDraining(id_,
1283                                  /*unidirectional=*/type_ != BIDIRECTIONAL);
1284         was_draining_ = true;
1285       }
1286       CloseWriteSide();
1287     } else if (fin && !consumed_data.fin_consumed && !write_side_closed_) {
1288       session_->MarkConnectionLevelWriteBlocked(id());
1289     }
1290   } else {
1291     session_->MarkConnectionLevelWriteBlocked(id());
1292   }
1293   if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1294     busy_counter_ = 0;
1295   }
1296 }
1297 
BufferedDataBytes() const1298 uint64_t QuicStream::BufferedDataBytes() const {
1299   QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1300   return send_buffer_.stream_offset() - stream_bytes_written();
1301 }
1302 
CanWriteNewData() const1303 bool QuicStream::CanWriteNewData() const {
1304   return BufferedDataBytes() < buffered_data_threshold_;
1305 }
1306 
CanWriteNewDataAfterData(QuicByteCount length) const1307 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1308   return (BufferedDataBytes() + length) < buffered_data_threshold_;
1309 }
1310 
stream_bytes_written() const1311 uint64_t QuicStream::stream_bytes_written() const {
1312   return send_buffer_.stream_bytes_written();
1313 }
1314 
bytes_acked() const1315 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1316   return send_buffer_.bytes_acked();
1317 }
1318 
OnStreamDataConsumed(QuicByteCount bytes_consumed)1319 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1320   send_buffer_.OnStreamDataConsumed(bytes_consumed);
1321 }
1322 
WritePendingRetransmission()1323 void QuicStream::WritePendingRetransmission() {
1324   while (HasPendingRetransmission()) {
1325     QuicConsumedData consumed(0, false);
1326     if (!send_buffer_.HasPendingRetransmission()) {
1327       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1328                     << " retransmits fin only frame.";
1329       consumed = stream_delegate_->WritevData(
1330           id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION,
1331           session()->GetEncryptionLevelToSendApplicationData());
1332       fin_lost_ = !consumed.fin_consumed;
1333       if (fin_lost_) {
1334         // Connection is write blocked.
1335         return;
1336       }
1337     } else {
1338       StreamPendingRetransmission pending =
1339           send_buffer_.NextPendingRetransmission();
1340       // Determine whether the lost fin can be bundled with the data.
1341       const bool can_bundle_fin =
1342           fin_lost_ &&
1343           (pending.offset + pending.length == stream_bytes_written());
1344       consumed = stream_delegate_->WritevData(
1345           id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1346           LOSS_RETRANSMISSION,
1347           session()->GetEncryptionLevelToSendApplicationData());
1348       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1349                     << " tries to retransmit stream data [" << pending.offset
1350                     << ", " << pending.offset + pending.length
1351                     << ") and fin: " << can_bundle_fin
1352                     << ", consumed: " << consumed;
1353       OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1354                                  consumed.fin_consumed);
1355       if (consumed.bytes_consumed < pending.length ||
1356           (can_bundle_fin && !consumed.fin_consumed)) {
1357         // Connection is write blocked.
1358         return;
1359       }
1360     }
1361   }
1362 }
1363 
MaybeSetTtl(QuicTime::Delta ttl)1364 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1365   if (is_static_) {
1366     QUIC_BUG(quic_bug_10586_14) << "Cannot set TTL of a static stream.";
1367     return false;
1368   }
1369   if (deadline_.IsInitialized()) {
1370     QUIC_DLOG(WARNING) << "Deadline has already been set.";
1371     return false;
1372   }
1373   QuicTime now = session()->connection()->clock()->ApproximateNow();
1374   deadline_ = now + ttl;
1375   return true;
1376 }
1377 
HasDeadlinePassed() const1378 bool QuicStream::HasDeadlinePassed() const {
1379   if (!deadline_.IsInitialized()) {
1380     // No deadline has been set.
1381     return false;
1382   }
1383   QuicTime now = session()->connection()->clock()->ApproximateNow();
1384   if (now < deadline_) {
1385     return false;
1386   }
1387   // TTL expired.
1388   QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1389   return true;
1390 }
1391 
OnDeadlinePassed()1392 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
1393 
IsFlowControlBlocked() const1394 bool QuicStream::IsFlowControlBlocked() const {
1395   if (!flow_controller_.has_value()) {
1396     QUIC_BUG(quic_bug_10586_15)
1397         << "Trying to access non-existent flow controller.";
1398     return false;
1399   }
1400   return flow_controller_->IsBlocked();
1401 }
1402 
highest_received_byte_offset() const1403 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1404   if (!flow_controller_.has_value()) {
1405     QUIC_BUG(quic_bug_10586_16)
1406         << "Trying to access non-existent flow controller.";
1407     return 0;
1408   }
1409   return flow_controller_->highest_received_byte_offset();
1410 }
1411 
UpdateReceiveWindowSize(QuicStreamOffset size)1412 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1413   if (!flow_controller_.has_value()) {
1414     QUIC_BUG(quic_bug_10586_17)
1415         << "Trying to access non-existent flow controller.";
1416     return;
1417   }
1418   flow_controller_->UpdateReceiveWindowSize(size);
1419 }
1420 
GetSendWindow() const1421 absl::optional<QuicByteCount> QuicStream::GetSendWindow() const {
1422   return flow_controller_.has_value()
1423              ? absl::optional<QuicByteCount>(flow_controller_->SendWindowSize())
1424              : absl::nullopt;
1425 }
1426 
GetReceiveWindow() const1427 absl::optional<QuicByteCount> QuicStream::GetReceiveWindow() const {
1428   return flow_controller_.has_value()
1429              ? absl::optional<QuicByteCount>(
1430                    flow_controller_->receive_window_size())
1431              : absl::nullopt;
1432 }
1433 
OnStreamCreatedFromPendingStream()1434 void QuicStream::OnStreamCreatedFromPendingStream() {
1435   sequencer()->SetUnblocked();
1436 }
1437 
1438 }  // namespace quic
1439