1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream.h"
6
7 #include <limits>
8 #include <optional>
9 #include <string>
10
11 #include "absl/strings/str_cat.h"
12 #include "absl/strings/string_view.h"
13 #include "quiche/quic/core/quic_error_codes.h"
14 #include "quiche/quic/core/quic_flow_controller.h"
15 #include "quiche/quic/core/quic_session.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/core/quic_utils.h"
18 #include "quiche/quic/core/quic_versions.h"
19 #include "quiche/quic/platform/api/quic_bug_tracker.h"
20 #include "quiche/quic/platform/api/quic_flag_utils.h"
21 #include "quiche/quic/platform/api/quic_flags.h"
22 #include "quiche/quic/platform/api/quic_logging.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/platform/api/quiche_mem_slice.h"
25
26 using spdy::SpdyPriority;
27
28 namespace quic {
29
30 #define ENDPOINT \
31 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
32
33 namespace {
34
DefaultFlowControlWindow(ParsedQuicVersion version)35 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
36 if (!version.AllowsLowFlowControlLimits()) {
37 return kDefaultFlowControlSendWindow;
38 }
39 return 0;
40 }
41
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)42 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
43 QuicStreamId stream_id) {
44 ParsedQuicVersion version = session->connection()->version();
45 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
46 return session->config()->GetInitialStreamFlowControlWindowToSend();
47 }
48
49 // Unidirectional streams (v99 only).
50 if (VersionHasIetfQuicFrames(version.transport_version) &&
51 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
52 return session->config()
53 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
54 }
55
56 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
57 session->perspective())) {
58 return session->config()
59 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
60 }
61
62 return session->config()
63 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
64 }
65
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)66 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
67 QuicStreamId stream_id) {
68 ParsedQuicVersion version = session->connection()->version();
69 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
70 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
71 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
72 }
73
74 return DefaultFlowControlWindow(version);
75 }
76
77 // Unidirectional streams (v99 only).
78 if (VersionHasIetfQuicFrames(version.transport_version) &&
79 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
80 if (session->config()
81 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
82 return session->config()
83 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
84 }
85
86 return DefaultFlowControlWindow(version);
87 }
88
89 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
90 session->perspective())) {
91 if (session->config()
92 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
93 return session->config()
94 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
95 }
96
97 return DefaultFlowControlWindow(version);
98 }
99
100 if (session->config()
101 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
102 return session->config()
103 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
104 }
105
106 return DefaultFlowControlWindow(version);
107 }
108
109 } // namespace
110
PendingStream(QuicStreamId id,QuicSession * session)111 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
112 : id_(id),
113 version_(session->version()),
114 stream_delegate_(session),
115 stream_bytes_read_(0),
116 fin_received_(false),
117 is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
118 /*peer_initiated = */ true,
119 session->version()) ==
120 BIDIRECTIONAL),
121 connection_flow_controller_(session->flow_controller()),
122 flow_controller_(session, id,
123 /*is_connection_flow_controller*/ false,
124 GetReceivedFlowControlWindow(session, id),
125 GetInitialStreamFlowControlWindowToSend(session, id),
126 kStreamReceiveWindowLimit,
127 session->flow_controller()->auto_tune_receive_window(),
128 session->flow_controller()),
129 sequencer_(this) {}
130
OnDataAvailable()131 void PendingStream::OnDataAvailable() {
132 // Data should be kept in the sequencer so that
133 // QuicSession::ProcessPendingStream() can read it.
134 }
135
OnFinRead()136 void PendingStream::OnFinRead() { QUICHE_DCHECK(sequencer_.IsClosed()); }
137
AddBytesConsumed(QuicByteCount bytes)138 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
139 // It will be called when the metadata of the stream is consumed.
140 flow_controller_.AddBytesConsumed(bytes);
141 connection_flow_controller_->AddBytesConsumed(bytes);
142 }
143
ResetWithError(QuicResetStreamError)144 void PendingStream::ResetWithError(QuicResetStreamError /*error*/) {
145 // Currently PendingStream is only read-unidirectional. It shouldn't send
146 // Reset.
147 QUICHE_NOTREACHED();
148 }
149
OnUnrecoverableError(QuicErrorCode error,const std::string & details)150 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
151 const std::string& details) {
152 stream_delegate_->OnStreamError(error, details);
153 }
154
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)155 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
156 QuicIetfTransportErrorCodes ietf_error,
157 const std::string& details) {
158 stream_delegate_->OnStreamError(error, ietf_error, details);
159 }
160
id() const161 QuicStreamId PendingStream::id() const { return id_; }
162
version() const163 ParsedQuicVersion PendingStream::version() const { return version_; }
164
OnStreamFrame(const QuicStreamFrame & frame)165 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
166 QUICHE_DCHECK_EQ(frame.stream_id, id_);
167
168 bool is_stream_too_long =
169 (frame.offset > kMaxStreamLength) ||
170 (kMaxStreamLength - frame.offset < frame.data_length);
171 if (is_stream_too_long) {
172 // Close connection if stream becomes too long.
173 QUIC_PEER_BUG(quic_peer_bug_12570_1)
174 << "Receive stream frame reaches max stream length. frame offset "
175 << frame.offset << " length " << frame.data_length;
176 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
177 "Peer sends more data than allowed on this stream.");
178 return;
179 }
180
181 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
182 OnUnrecoverableError(
183 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
184 absl::StrCat(
185 "Stream ", id_,
186 " received data with offset: ", frame.offset + frame.data_length,
187 ", which is beyond close offset: ", sequencer()->close_offset()));
188 return;
189 }
190
191 if (frame.fin) {
192 fin_received_ = true;
193 }
194
195 // This count includes duplicate data received.
196 QuicByteCount frame_payload_size = frame.data_length;
197 stream_bytes_read_ += frame_payload_size;
198
199 // Flow control is interested in tracking highest received offset.
200 // Only interested in received frames that carry data.
201 if (frame_payload_size > 0 &&
202 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
203 // As the highest received offset has changed, check to see if this is a
204 // violation of flow control.
205 if (flow_controller_.FlowControlViolation() ||
206 connection_flow_controller_->FlowControlViolation()) {
207 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
208 "Flow control violation after increasing offset");
209 return;
210 }
211 }
212
213 sequencer_.OnStreamFrame(frame);
214 }
215
OnRstStreamFrame(const QuicRstStreamFrame & frame)216 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
217 QUICHE_DCHECK_EQ(frame.stream_id, id_);
218
219 if (frame.byte_offset > kMaxStreamLength) {
220 // Peer are not suppose to write bytes more than maxium allowed.
221 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
222 "Reset frame stream offset overflow.");
223 return;
224 }
225
226 const QuicStreamOffset kMaxOffset =
227 std::numeric_limits<QuicStreamOffset>::max();
228 if (sequencer()->close_offset() != kMaxOffset &&
229 frame.byte_offset != sequencer()->close_offset()) {
230 OnUnrecoverableError(
231 QUIC_STREAM_MULTIPLE_OFFSET,
232 absl::StrCat("Stream ", id_,
233 " received new final offset: ", frame.byte_offset,
234 ", which is different from close offset: ",
235 sequencer()->close_offset()));
236 return;
237 }
238
239 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
240 if (flow_controller_.FlowControlViolation() ||
241 connection_flow_controller_->FlowControlViolation()) {
242 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
243 "Flow control violation after increasing offset");
244 return;
245 }
246 }
247
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)248 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
249 QUICHE_DCHECK(is_bidirectional_);
250 flow_controller_.UpdateSendWindowOffset(frame.max_data);
251 }
252
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)253 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
254 QuicStreamOffset new_offset) {
255 uint64_t increment =
256 new_offset - flow_controller_.highest_received_byte_offset();
257 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
258 return false;
259 }
260
261 // If |new_offset| increased the stream flow controller's highest received
262 // offset, increase the connection flow controller's value by the incremental
263 // difference.
264 connection_flow_controller_->UpdateHighestReceivedOffset(
265 connection_flow_controller_->highest_received_byte_offset() + increment);
266 return true;
267 }
268
OnStopSending(QuicResetStreamError stop_sending_error_code)269 void PendingStream::OnStopSending(
270 QuicResetStreamError stop_sending_error_code) {
271 if (!stop_sending_error_code_) {
272 stop_sending_error_code_ = stop_sending_error_code;
273 }
274 }
275
MarkConsumed(QuicByteCount num_bytes)276 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
277 sequencer_.MarkConsumed(num_bytes);
278 }
279
StopReading()280 void PendingStream::StopReading() {
281 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
282 sequencer_.StopReading();
283 }
284
QuicStream(PendingStream * pending,QuicSession * session,bool is_static)285 QuicStream::QuicStream(PendingStream* pending, QuicSession* session,
286 bool is_static)
287 : QuicStream(pending->id_, session, std::move(pending->sequencer_),
288 is_static,
289 QuicUtils::GetStreamType(pending->id_, session->perspective(),
290 /*peer_initiated = */ true,
291 session->version()),
292 pending->stream_bytes_read_, pending->fin_received_,
293 std::move(pending->flow_controller_),
294 pending->connection_flow_controller_) {
295 QUICHE_DCHECK(session->version().HasIetfQuicFrames());
296 sequencer_.set_stream(this);
297 }
298
299 namespace {
300
FlowController(QuicStreamId id,QuicSession * session,StreamType type)301 std::optional<QuicFlowController> FlowController(QuicStreamId id,
302 QuicSession* session,
303 StreamType type) {
304 if (type == CRYPTO) {
305 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
306 // it is using crypto frames instead of stream frames. The QuicCryptoStream
307 // doesn't have any flow control in that case, so we don't create a
308 // QuicFlowController for it.
309 return std::nullopt;
310 }
311 return QuicFlowController(
312 session, id,
313 /*is_connection_flow_controller*/ false,
314 GetReceivedFlowControlWindow(session, id),
315 GetInitialStreamFlowControlWindowToSend(session, id),
316 kStreamReceiveWindowLimit,
317 session->flow_controller()->auto_tune_receive_window(),
318 session->flow_controller());
319 }
320
321 } // namespace
322
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)323 QuicStream::QuicStream(QuicStreamId id, QuicSession* session, bool is_static,
324 StreamType type)
325 : QuicStream(id, session, QuicStreamSequencer(this), is_static, type, 0,
326 false, FlowController(id, session, type),
327 session->flow_controller()) {}
328
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,std::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller)329 QuicStream::QuicStream(QuicStreamId id, QuicSession* session,
330 QuicStreamSequencer sequencer, bool is_static,
331 StreamType type, uint64_t stream_bytes_read,
332 bool fin_received,
333 std::optional<QuicFlowController> flow_controller,
334 QuicFlowController* connection_flow_controller)
335 : sequencer_(std::move(sequencer)),
336 id_(id),
337 session_(session),
338 stream_delegate_(session),
339 priority_(QuicStreamPriority::Default(session->priority_type())),
340 stream_bytes_read_(stream_bytes_read),
341 stream_error_(QuicResetStreamError::NoError()),
342 connection_error_(QUIC_NO_ERROR),
343 read_side_closed_(false),
344 write_side_closed_(false),
345 write_side_data_recvd_state_notified_(false),
346 fin_buffered_(false),
347 fin_sent_(false),
348 fin_outstanding_(false),
349 fin_lost_(false),
350 fin_received_(fin_received),
351 rst_sent_(false),
352 rst_received_(false),
353 stop_sending_sent_(false),
354 flow_controller_(std::move(flow_controller)),
355 connection_flow_controller_(connection_flow_controller),
356 stream_contributes_to_connection_flow_control_(true),
357 busy_counter_(0),
358 add_random_padding_after_fin_(false),
359 send_buffer_(
360 session->connection()->helper()->GetStreamSendBufferAllocator()),
361 buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
362 is_static_(is_static),
363 deadline_(QuicTime::Zero()),
364 was_draining_(false),
365 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
366 type != CRYPTO
367 ? QuicUtils::GetStreamType(id_, session->perspective(),
368 session->IsIncomingStream(id_),
369 session->version())
370 : type),
371 creation_time_(session->connection()->clock()->ApproximateNow()),
372 perspective_(session->perspective()) {
373 if (type_ == WRITE_UNIDIRECTIONAL) {
374 fin_received_ = true;
375 CloseReadSide();
376 } else if (type_ == READ_UNIDIRECTIONAL) {
377 fin_sent_ = true;
378 CloseWriteSide();
379 }
380 if (type_ != CRYPTO) {
381 stream_delegate_->RegisterStreamPriority(id, is_static_, priority_);
382 }
383 }
384
~QuicStream()385 QuicStream::~QuicStream() {
386 if (session_ != nullptr && IsWaitingForAcks()) {
387 QUIC_DVLOG(1)
388 << ENDPOINT << "Stream " << id_
389 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
390 << send_buffer_.stream_bytes_outstanding()
391 << ", fin_outstanding: " << fin_outstanding_;
392 }
393 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
394 stream_delegate_->UnregisterStreamPriority(id());
395 }
396 }
397
OnStreamFrame(const QuicStreamFrame & frame)398 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
399 QUICHE_DCHECK_EQ(frame.stream_id, id_);
400
401 QUICHE_DCHECK(!(read_side_closed_ && write_side_closed_));
402
403 if (frame.fin && is_static_) {
404 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
405 "Attempt to close a static stream");
406 return;
407 }
408
409 if (type_ == WRITE_UNIDIRECTIONAL) {
410 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
411 "Data received on write unidirectional stream");
412 return;
413 }
414
415 bool is_stream_too_long =
416 (frame.offset > kMaxStreamLength) ||
417 (kMaxStreamLength - frame.offset < frame.data_length);
418 if (is_stream_too_long) {
419 // Close connection if stream becomes too long.
420 QUIC_PEER_BUG(quic_peer_bug_10586_1)
421 << "Receive stream frame on stream " << id_
422 << " reaches max stream length. frame offset " << frame.offset
423 << " length " << frame.data_length << ". " << sequencer_.DebugString();
424 OnUnrecoverableError(
425 QUIC_STREAM_LENGTH_OVERFLOW,
426 absl::StrCat("Peer sends more data than allowed on stream ", id_,
427 ". frame: offset = ", frame.offset, ", length = ",
428 frame.data_length, ". ", sequencer_.DebugString()));
429 return;
430 }
431
432 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
433 OnUnrecoverableError(
434 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
435 absl::StrCat(
436 "Stream ", id_,
437 " received data with offset: ", frame.offset + frame.data_length,
438 ", which is beyond close offset: ", sequencer_.close_offset()));
439 return;
440 }
441
442 if (frame.fin && !fin_received_) {
443 fin_received_ = true;
444 if (fin_sent_) {
445 QUICHE_DCHECK(!was_draining_);
446 session_->StreamDraining(id_,
447 /*unidirectional=*/type_ != BIDIRECTIONAL);
448 was_draining_ = true;
449 }
450 }
451
452 if (read_side_closed_) {
453 QUIC_DLOG(INFO)
454 << ENDPOINT << "Stream " << frame.stream_id
455 << " is closed for reading. Ignoring newly received stream data.";
456 // The subclass does not want to read data: blackhole the data.
457 return;
458 }
459
460 // This count includes duplicate data received.
461 QuicByteCount frame_payload_size = frame.data_length;
462 stream_bytes_read_ += frame_payload_size;
463
464 // Flow control is interested in tracking highest received offset.
465 // Only interested in received frames that carry data.
466 if (frame_payload_size > 0 &&
467 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
468 // As the highest received offset has changed, check to see if this is a
469 // violation of flow control.
470 QUIC_BUG_IF(quic_bug_12570_2, !flow_controller_.has_value())
471 << ENDPOINT << "OnStreamFrame called on stream without flow control";
472 if ((flow_controller_.has_value() &&
473 flow_controller_->FlowControlViolation()) ||
474 connection_flow_controller_->FlowControlViolation()) {
475 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
476 "Flow control violation after increasing offset");
477 return;
478 }
479 }
480
481 sequencer_.OnStreamFrame(frame);
482 }
483
OnStopSending(QuicResetStreamError error)484 bool QuicStream::OnStopSending(QuicResetStreamError error) {
485 // Do not reset the stream if all data has been sent and acknowledged.
486 if (write_side_closed() && !IsWaitingForAcks()) {
487 QUIC_DVLOG(1) << ENDPOINT
488 << "Ignoring STOP_SENDING for a write closed stream, id: "
489 << id_;
490 return false;
491 }
492
493 if (is_static_) {
494 QUIC_DVLOG(1) << ENDPOINT
495 << "Received STOP_SENDING for a static stream, id: " << id_
496 << " Closing connection";
497 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
498 "Received STOP_SENDING for a static stream");
499 return false;
500 }
501
502 stream_error_ = error;
503 MaybeSendRstStream(error);
504 return true;
505 }
506
num_frames_received() const507 int QuicStream::num_frames_received() const {
508 return sequencer_.num_frames_received();
509 }
510
num_duplicate_frames_received() const511 int QuicStream::num_duplicate_frames_received() const {
512 return sequencer_.num_duplicate_frames_received();
513 }
514
OnStreamReset(const QuicRstStreamFrame & frame)515 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
516 rst_received_ = true;
517 if (frame.byte_offset > kMaxStreamLength) {
518 // Peer are not suppose to write bytes more than maxium allowed.
519 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
520 "Reset frame stream offset overflow.");
521 return;
522 }
523
524 const QuicStreamOffset kMaxOffset =
525 std::numeric_limits<QuicStreamOffset>::max();
526 if (sequencer()->close_offset() != kMaxOffset &&
527 frame.byte_offset != sequencer()->close_offset()) {
528 OnUnrecoverableError(
529 QUIC_STREAM_MULTIPLE_OFFSET,
530 absl::StrCat("Stream ", id_,
531 " received new final offset: ", frame.byte_offset,
532 ", which is different from close offset: ",
533 sequencer_.close_offset()));
534 return;
535 }
536
537 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
538 QUIC_BUG_IF(quic_bug_12570_3, !flow_controller_.has_value())
539 << ENDPOINT << "OnStreamReset called on stream without flow control";
540 if ((flow_controller_.has_value() &&
541 flow_controller_->FlowControlViolation()) ||
542 connection_flow_controller_->FlowControlViolation()) {
543 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
544 "Flow control violation after increasing offset");
545 return;
546 }
547
548 stream_error_ = frame.error();
549 // Google QUIC closes both sides of the stream in response to a
550 // RESET_STREAM, IETF QUIC closes only the read side.
551 if (!VersionHasIetfQuicFrames(transport_version())) {
552 CloseWriteSide();
553 }
554 CloseReadSide();
555 }
556
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)557 void QuicStream::OnConnectionClosed(QuicErrorCode error,
558 ConnectionCloseSource /*source*/) {
559 if (read_side_closed_ && write_side_closed_) {
560 return;
561 }
562 if (error != QUIC_NO_ERROR) {
563 stream_error_ =
564 QuicResetStreamError::FromInternal(QUIC_STREAM_CONNECTION_ERROR);
565 connection_error_ = error;
566 }
567
568 CloseWriteSide();
569 CloseReadSide();
570 }
571
OnFinRead()572 void QuicStream::OnFinRead() {
573 QUICHE_DCHECK(sequencer_.IsClosed());
574 // OnFinRead can be called due to a FIN flag in a headers block, so there may
575 // have been no OnStreamFrame call with a FIN in the frame.
576 fin_received_ = true;
577 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
578 // stream will be destroyed by CloseReadSide, so don't need to call
579 // StreamDraining.
580 CloseReadSide();
581 }
582
SetFinSent()583 void QuicStream::SetFinSent() {
584 QUICHE_DCHECK(!VersionUsesHttp3(transport_version()));
585 fin_sent_ = true;
586 }
587
Reset(QuicRstStreamErrorCode error)588 void QuicStream::Reset(QuicRstStreamErrorCode error) {
589 ResetWithError(QuicResetStreamError::FromInternal(error));
590 }
591
ResetWithError(QuicResetStreamError error)592 void QuicStream::ResetWithError(QuicResetStreamError error) {
593 stream_error_ = error;
594 QuicConnection::ScopedPacketFlusher flusher(session()->connection());
595 MaybeSendStopSending(error);
596 MaybeSendRstStream(error);
597
598 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
599 session()->MaybeCloseZombieStream(id_);
600 }
601 }
602
ResetWriteSide(QuicResetStreamError error)603 void QuicStream::ResetWriteSide(QuicResetStreamError error) {
604 stream_error_ = error;
605 MaybeSendRstStream(error);
606
607 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
608 session()->MaybeCloseZombieStream(id_);
609 }
610 }
611
SendStopSending(QuicResetStreamError error)612 void QuicStream::SendStopSending(QuicResetStreamError error) {
613 stream_error_ = error;
614 MaybeSendStopSending(error);
615
616 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
617 session()->MaybeCloseZombieStream(id_);
618 }
619 }
620
OnUnrecoverableError(QuicErrorCode error,const std::string & details)621 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
622 const std::string& details) {
623 stream_delegate_->OnStreamError(error, details);
624 }
625
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)626 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
627 QuicIetfTransportErrorCodes ietf_error,
628 const std::string& details) {
629 stream_delegate_->OnStreamError(error, ietf_error, details);
630 }
631
priority() const632 const QuicStreamPriority& QuicStream::priority() const { return priority_; }
633
SetPriority(const QuicStreamPriority & priority)634 void QuicStream::SetPriority(const QuicStreamPriority& priority) {
635 priority_ = priority;
636
637 MaybeSendPriorityUpdateFrame();
638
639 stream_delegate_->UpdateStreamPriority(id(), priority);
640 }
641
WriteOrBufferData(absl::string_view data,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)642 void QuicStream::WriteOrBufferData(
643 absl::string_view data, bool fin,
644 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
645 ack_listener) {
646 QUIC_BUG_IF(quic_bug_12570_4,
647 QuicUtils::IsCryptoStreamId(transport_version(), id_))
648 << ENDPOINT
649 << "WriteOrBufferData is used to send application data, use "
650 "WriteOrBufferDataAtLevel to send crypto data.";
651 return WriteOrBufferDataAtLevel(
652 data, fin, session()->GetEncryptionLevelToSendApplicationData(),
653 ack_listener);
654 }
655
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)656 void QuicStream::WriteOrBufferDataAtLevel(
657 absl::string_view data, bool fin, EncryptionLevel level,
658 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
659 ack_listener) {
660 if (data.empty() && !fin) {
661 QUIC_BUG(quic_bug_10586_2) << "data.empty() && !fin";
662 return;
663 }
664
665 if (fin_buffered_) {
666 QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
667 return;
668 }
669 if (write_side_closed_) {
670 QUIC_DLOG(ERROR) << ENDPOINT
671 << "Attempt to write when the write side is closed";
672 if (type_ == READ_UNIDIRECTIONAL) {
673 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
674 "Try to send data on read unidirectional stream");
675 }
676 return;
677 }
678
679 fin_buffered_ = fin;
680
681 bool had_buffered_data = HasBufferedData();
682 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
683 // all data to be consumed.
684 if (data.length() > 0) {
685 QuicStreamOffset offset = send_buffer_.stream_offset();
686 if (kMaxStreamLength - offset < data.length()) {
687 QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
688 OnUnrecoverableError(
689 QUIC_STREAM_LENGTH_OVERFLOW,
690 absl::StrCat("Write too many data via stream ", id_));
691 return;
692 }
693 send_buffer_.SaveStreamData(data);
694 OnDataBuffered(offset, data.length(), ack_listener);
695 }
696 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
697 // Write data if there is no buffered data before.
698 WriteBufferedData(level);
699 }
700 }
701
OnCanWrite()702 void QuicStream::OnCanWrite() {
703 if (HasDeadlinePassed()) {
704 OnDeadlinePassed();
705 return;
706 }
707 if (HasPendingRetransmission()) {
708 WritePendingRetransmission();
709 // Exit early to allow other streams to write pending retransmissions if
710 // any.
711 return;
712 }
713
714 if (write_side_closed_) {
715 QUIC_DLOG(ERROR)
716 << ENDPOINT << "Stream " << id()
717 << " attempting to write new data when the write side is closed";
718 return;
719 }
720 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
721 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
722 }
723 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
724 // Notify upper layer to write new data when buffered data size is below
725 // low water mark.
726 OnCanWriteNewData();
727 }
728 }
729
MaybeSendBlocked()730 void QuicStream::MaybeSendBlocked() {
731 if (!flow_controller_.has_value()) {
732 QUIC_BUG(quic_bug_10586_5)
733 << ENDPOINT << "MaybeSendBlocked called on stream without flow control";
734 return;
735 }
736 flow_controller_->MaybeSendBlocked();
737 if (!stream_contributes_to_connection_flow_control_) {
738 return;
739 }
740 connection_flow_controller_->MaybeSendBlocked();
741
742 // If the stream is blocked by connection-level flow control but not by
743 // stream-level flow control, add the stream to the write blocked list so that
744 // the stream will be given a chance to write when a connection-level
745 // WINDOW_UPDATE arrives.
746 if (!write_side_closed_ && connection_flow_controller_->IsBlocked() &&
747 !flow_controller_->IsBlocked()) {
748 session_->MarkConnectionLevelWriteBlocked(id());
749 }
750 }
751
WriteMemSlice(quiche::QuicheMemSlice span,bool fin)752 QuicConsumedData QuicStream::WriteMemSlice(quiche::QuicheMemSlice span,
753 bool fin) {
754 return WriteMemSlices(absl::MakeSpan(&span, 1), fin);
755 }
756
WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span,bool fin)757 QuicConsumedData QuicStream::WriteMemSlices(
758 absl::Span<quiche::QuicheMemSlice> span, bool fin) {
759 QuicConsumedData consumed_data(0, false);
760 if (span.empty() && !fin) {
761 QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin";
762 return consumed_data;
763 }
764
765 if (fin_buffered_) {
766 QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
767 return consumed_data;
768 }
769
770 if (write_side_closed_) {
771 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
772 << " attempting to write when the write side is closed";
773 if (type_ == READ_UNIDIRECTIONAL) {
774 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
775 "Try to send data on read unidirectional stream");
776 }
777 return consumed_data;
778 }
779
780 bool had_buffered_data = HasBufferedData();
781 if (CanWriteNewData() || span.empty()) {
782 consumed_data.fin_consumed = fin;
783 if (!span.empty()) {
784 // Buffer all data if buffered data size is below limit.
785 QuicStreamOffset offset = send_buffer_.stream_offset();
786 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
787 if (offset > send_buffer_.stream_offset() ||
788 kMaxStreamLength < send_buffer_.stream_offset()) {
789 QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
790 OnUnrecoverableError(
791 QUIC_STREAM_LENGTH_OVERFLOW,
792 absl::StrCat("Write too many data via stream ", id_));
793 return consumed_data;
794 }
795 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
796 }
797 }
798 fin_buffered_ = consumed_data.fin_consumed;
799
800 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
801 // Write data if there is no buffered data before.
802 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
803 }
804
805 return consumed_data;
806 }
807
HasPendingRetransmission() const808 bool QuicStream::HasPendingRetransmission() const {
809 return send_buffer_.HasPendingRetransmission() || fin_lost_;
810 }
811
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const812 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
813 QuicByteCount data_length,
814 bool fin) const {
815 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
816 (fin && fin_outstanding_);
817 }
818
CloseReadSide()819 void QuicStream::CloseReadSide() {
820 if (read_side_closed_) {
821 return;
822 }
823 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
824
825 read_side_closed_ = true;
826 sequencer_.ReleaseBuffer();
827
828 if (write_side_closed_) {
829 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
830 session_->OnStreamClosed(id());
831 OnClose();
832 }
833 }
834
CloseWriteSide()835 void QuicStream::CloseWriteSide() {
836 if (write_side_closed_) {
837 return;
838 }
839 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
840
841 write_side_closed_ = true;
842 if (read_side_closed_) {
843 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
844 session_->OnStreamClosed(id());
845 OnClose();
846 }
847 }
848
MaybeSendStopSending(QuicResetStreamError error)849 void QuicStream::MaybeSendStopSending(QuicResetStreamError error) {
850 if (stop_sending_sent_) {
851 return;
852 }
853
854 if (!session()->version().UsesHttp3() && !error.ok()) {
855 // In gQUIC, RST with error closes both read and write side.
856 return;
857 }
858
859 if (session()->version().UsesHttp3()) {
860 session()->MaybeSendStopSendingFrame(id(), error);
861 } else {
862 QUICHE_DCHECK_EQ(QUIC_STREAM_NO_ERROR, error.internal_code());
863 session()->MaybeSendRstStreamFrame(id(), QuicResetStreamError::NoError(),
864 stream_bytes_written());
865 }
866 stop_sending_sent_ = true;
867 CloseReadSide();
868 }
869
MaybeSendRstStream(QuicResetStreamError error)870 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
871 if (rst_sent_) {
872 return;
873 }
874
875 if (!session()->version().UsesHttp3()) {
876 QUIC_BUG_IF(quic_bug_12570_5, error.ok());
877 stop_sending_sent_ = true;
878 CloseReadSide();
879 }
880 session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
881 rst_sent_ = true;
882 CloseWriteSide();
883 }
884
HasBufferedData() const885 bool QuicStream::HasBufferedData() const {
886 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
887 return send_buffer_.stream_offset() > stream_bytes_written();
888 }
889
version() const890 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
891
transport_version() const892 QuicTransportVersion QuicStream::transport_version() const {
893 return session_->transport_version();
894 }
895
handshake_protocol() const896 HandshakeProtocol QuicStream::handshake_protocol() const {
897 return session_->connection()->version().handshake_protocol;
898 }
899
StopReading()900 void QuicStream::StopReading() {
901 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
902 sequencer_.StopReading();
903 }
904
OnClose()905 void QuicStream::OnClose() {
906 QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
907
908 if (!fin_sent_ && !rst_sent_) {
909 QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
910 session()->version().UsesHttp3())
911 << "The stream should've already sent RST in response to "
912 "STOP_SENDING";
913 // For flow control accounting, tell the peer how many bytes have been
914 // written on this stream before termination. Done here if needed, using a
915 // RST_STREAM frame.
916 MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
917 session_->MaybeCloseZombieStream(id_);
918 }
919
920 if (!flow_controller_.has_value() ||
921 flow_controller_->FlowControlViolation() ||
922 connection_flow_controller_->FlowControlViolation()) {
923 return;
924 }
925 // The stream is being closed and will not process any further incoming bytes.
926 // As there may be more bytes in flight, to ensure that both endpoints have
927 // the same connection level flow control state, mark all unreceived or
928 // buffered bytes as consumed.
929 QuicByteCount bytes_to_consume =
930 flow_controller_->highest_received_byte_offset() -
931 flow_controller_->bytes_consumed();
932 AddBytesConsumed(bytes_to_consume);
933 }
934
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)935 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
936 if (type_ == READ_UNIDIRECTIONAL) {
937 OnUnrecoverableError(
938 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
939 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
940 return;
941 }
942
943 if (!flow_controller_.has_value()) {
944 QUIC_BUG(quic_bug_10586_9)
945 << ENDPOINT
946 << "OnWindowUpdateFrame called on stream without flow control";
947 return;
948 }
949
950 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
951 // Let session unblock this stream.
952 session_->MarkConnectionLevelWriteBlocked(id_);
953 }
954 }
955
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)956 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
957 QuicStreamOffset new_offset) {
958 if (!flow_controller_.has_value()) {
959 QUIC_BUG(quic_bug_10586_10)
960 << ENDPOINT
961 << "MaybeIncreaseHighestReceivedOffset called on stream without "
962 "flow control";
963 return false;
964 }
965 uint64_t increment =
966 new_offset - flow_controller_->highest_received_byte_offset();
967 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
968 return false;
969 }
970
971 // If |new_offset| increased the stream flow controller's highest received
972 // offset, increase the connection flow controller's value by the incremental
973 // difference.
974 if (stream_contributes_to_connection_flow_control_) {
975 connection_flow_controller_->UpdateHighestReceivedOffset(
976 connection_flow_controller_->highest_received_byte_offset() +
977 increment);
978 }
979 return true;
980 }
981
AddBytesSent(QuicByteCount bytes)982 void QuicStream::AddBytesSent(QuicByteCount bytes) {
983 if (!flow_controller_.has_value()) {
984 QUIC_BUG(quic_bug_10586_11)
985 << ENDPOINT << "AddBytesSent called on stream without flow control";
986 return;
987 }
988 flow_controller_->AddBytesSent(bytes);
989 if (stream_contributes_to_connection_flow_control_) {
990 connection_flow_controller_->AddBytesSent(bytes);
991 }
992 }
993
AddBytesConsumed(QuicByteCount bytes)994 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
995 if (type_ == CRYPTO) {
996 // A stream with type CRYPTO has no flow control, so there's nothing this
997 // function needs to do. This function still gets called by the
998 // QuicStreamSequencers used by QuicCryptoStream.
999 return;
1000 }
1001 if (!flow_controller_.has_value()) {
1002 QUIC_BUG(quic_bug_12570_7)
1003 << ENDPOINT
1004 << "AddBytesConsumed called on non-crypto stream without flow control";
1005 return;
1006 }
1007 // Only adjust stream level flow controller if still reading.
1008 if (!read_side_closed_) {
1009 flow_controller_->AddBytesConsumed(bytes);
1010 }
1011
1012 if (stream_contributes_to_connection_flow_control_) {
1013 connection_flow_controller_->AddBytesConsumed(bytes);
1014 }
1015 }
1016
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1017 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1018 bool was_zero_rtt_rejected) {
1019 if (!flow_controller_.has_value()) {
1020 QUIC_BUG(quic_bug_10586_12)
1021 << ENDPOINT
1022 << "ConfigSendWindowOffset called on stream without flow control";
1023 return false;
1024 }
1025
1026 // The validation code below is for QUIC with TLS only.
1027 if (new_offset < flow_controller_->send_window_offset()) {
1028 QUICHE_DCHECK(session()->version().UsesTls());
1029 if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1030 // The client is given flow control window lower than what's written in
1031 // 0-RTT. This QUIC implementation is unable to retransmit them.
1032 QUIC_BUG_IF(quic_bug_12570_8, perspective_ == Perspective::IS_SERVER)
1033 << "Server streams' flow control should never be configured twice.";
1034 OnUnrecoverableError(
1035 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1036 absl::StrCat(
1037 "Server rejected 0-RTT, aborting because new stream max data ",
1038 new_offset, " for stream ", id_, " is less than currently used: ",
1039 flow_controller_->bytes_sent()));
1040 return false;
1041 } else if (session()->version().AllowsLowFlowControlLimits()) {
1042 // In IETF QUIC, if the client receives flow control limit lower than what
1043 // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1044 // peer's fault or our implementation's fault.
1045 QUIC_BUG_IF(quic_bug_12570_9, perspective_ == Perspective::IS_SERVER)
1046 << "Server streams' flow control should never be configured twice.";
1047 OnUnrecoverableError(
1048 was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1049 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1050 absl::StrCat(
1051 was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1052 : "",
1053 "new stream max data ", new_offset, " decreases current limit: ",
1054 flow_controller_->send_window_offset()));
1055 return false;
1056 }
1057 }
1058
1059 if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1060 // Let session unblock this stream.
1061 session_->MarkConnectionLevelWriteBlocked(id_);
1062 }
1063 return true;
1064 }
1065
AddRandomPaddingAfterFin()1066 void QuicStream::AddRandomPaddingAfterFin() {
1067 add_random_padding_after_fin_ = true;
1068 }
1069
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1070 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1071 QuicByteCount data_length, bool fin_acked,
1072 QuicTime::Delta /*ack_delay_time*/,
1073 QuicTime /*receive_timestamp*/,
1074 QuicByteCount* newly_acked_length) {
1075 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1076 << "[" << offset << ", " << offset + data_length << "]"
1077 << " fin = " << fin_acked;
1078 *newly_acked_length = 0;
1079 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1080 newly_acked_length)) {
1081 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1082 return false;
1083 }
1084 if (!fin_sent_ && fin_acked) {
1085 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1086 return false;
1087 }
1088 // Indicates whether ack listener's OnPacketAcked should be called.
1089 const bool new_data_acked =
1090 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1091 if (fin_acked) {
1092 fin_outstanding_ = false;
1093 fin_lost_ = false;
1094 }
1095 if (!IsWaitingForAcks() && write_side_closed_ &&
1096 !write_side_data_recvd_state_notified_) {
1097 OnWriteSideInDataRecvdState();
1098 write_side_data_recvd_state_notified_ = true;
1099 }
1100 if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1101 session_->MaybeCloseZombieStream(id_);
1102 }
1103 return new_data_acked;
1104 }
1105
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1106 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1107 QuicByteCount data_length,
1108 bool fin_retransmitted) {
1109 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1110 if (fin_retransmitted) {
1111 fin_lost_ = false;
1112 }
1113 }
1114
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1115 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1116 QuicByteCount data_length, bool fin_lost) {
1117 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1118 << "[" << offset << ", " << offset + data_length << "]"
1119 << " fin = " << fin_lost;
1120 if (data_length > 0) {
1121 send_buffer_.OnStreamDataLost(offset, data_length);
1122 }
1123 if (fin_lost && fin_outstanding_) {
1124 fin_lost_ = true;
1125 }
1126 }
1127
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1128 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1129 QuicByteCount data_length, bool fin,
1130 TransmissionType type) {
1131 QUICHE_DCHECK(type == PTO_RETRANSMISSION);
1132 if (HasDeadlinePassed()) {
1133 OnDeadlinePassed();
1134 return true;
1135 }
1136 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1137 offset + data_length);
1138 retransmission.Difference(bytes_acked());
1139 bool retransmit_fin = fin && fin_outstanding_;
1140 if (retransmission.Empty() && !retransmit_fin) {
1141 return true;
1142 }
1143 QuicConsumedData consumed(0, false);
1144 for (const auto& interval : retransmission) {
1145 QuicStreamOffset retransmission_offset = interval.min();
1146 QuicByteCount retransmission_length = interval.max() - interval.min();
1147 const bool can_bundle_fin =
1148 retransmit_fin && (retransmission_offset + retransmission_length ==
1149 stream_bytes_written());
1150 consumed = stream_delegate_->WritevData(
1151 id_, retransmission_length, retransmission_offset,
1152 can_bundle_fin ? FIN : NO_FIN, type,
1153 session()->GetEncryptionLevelToSendApplicationData());
1154 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1155 << " is forced to retransmit stream data ["
1156 << retransmission_offset << ", "
1157 << retransmission_offset + retransmission_length
1158 << ") and fin: " << can_bundle_fin
1159 << ", consumed: " << consumed;
1160 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1161 consumed.fin_consumed);
1162 if (can_bundle_fin) {
1163 retransmit_fin = !consumed.fin_consumed;
1164 }
1165 if (consumed.bytes_consumed < retransmission_length ||
1166 (can_bundle_fin && !consumed.fin_consumed)) {
1167 // Connection is write blocked.
1168 return false;
1169 }
1170 }
1171 if (retransmit_fin) {
1172 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1173 << " retransmits fin only frame.";
1174 consumed = stream_delegate_->WritevData(
1175 id_, 0, stream_bytes_written(), FIN, type,
1176 session()->GetEncryptionLevelToSendApplicationData());
1177 if (!consumed.fin_consumed) {
1178 return false;
1179 }
1180 }
1181 return true;
1182 }
1183
IsWaitingForAcks() const1184 bool QuicStream::IsWaitingForAcks() const {
1185 return (!rst_sent_ || stream_error_.ok()) &&
1186 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1187 }
1188
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1189 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1190 QuicByteCount data_length,
1191 QuicDataWriter* writer) {
1192 QUICHE_DCHECK_LT(0u, data_length);
1193 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1194 << offset << " length " << data_length;
1195 return send_buffer_.WriteStreamData(offset, data_length, writer);
1196 }
1197
WriteBufferedData(EncryptionLevel level)1198 void QuicStream::WriteBufferedData(EncryptionLevel level) {
1199 QUICHE_DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1200
1201 if (session_->ShouldYield(id())) {
1202 session_->MarkConnectionLevelWriteBlocked(id());
1203 return;
1204 }
1205
1206 // Size of buffered data.
1207 QuicByteCount write_length = BufferedDataBytes();
1208
1209 // A FIN with zero data payload should not be flow control blocked.
1210 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1211
1212 bool fin = fin_buffered_;
1213
1214 QUIC_BUG_IF(quic_bug_10586_13, !flow_controller_.has_value())
1215 << ENDPOINT << "WriteBufferedData called on stream without flow control";
1216
1217 // How much data flow control permits to be written.
1218 QuicByteCount send_window = CalculateSendWindowSize();
1219
1220 if (send_window == 0 && !fin_with_zero_data) {
1221 // Quick return if nothing can be sent.
1222 MaybeSendBlocked();
1223 return;
1224 }
1225
1226 if (write_length > send_window) {
1227 // Don't send the FIN unless all the data will be sent.
1228 fin = false;
1229
1230 // Writing more data would be a violation of flow control.
1231 write_length = send_window;
1232 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1233 << write_length << " due to flow control";
1234 }
1235
1236 StreamSendingState state = fin ? FIN : NO_FIN;
1237 if (fin && add_random_padding_after_fin_) {
1238 state = FIN_AND_PADDING;
1239 }
1240 QuicConsumedData consumed_data =
1241 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1242 state, NOT_RETRANSMISSION, level);
1243
1244 OnStreamDataConsumed(consumed_data.bytes_consumed);
1245
1246 AddBytesSent(consumed_data.bytes_consumed);
1247 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1248 << stream_bytes_written() << " bytes "
1249 << " and has buffered data " << BufferedDataBytes() << " bytes."
1250 << " fin is sent: " << consumed_data.fin_consumed
1251 << " fin is buffered: " << fin_buffered_;
1252
1253 // The write may have generated a write error causing this stream to be
1254 // closed. If so, simply return without marking the stream write blocked.
1255 if (write_side_closed_) {
1256 return;
1257 }
1258
1259 if (consumed_data.bytes_consumed == write_length) {
1260 if (!fin_with_zero_data) {
1261 MaybeSendBlocked();
1262 }
1263 if (fin && consumed_data.fin_consumed) {
1264 QUICHE_DCHECK(!fin_sent_);
1265 fin_sent_ = true;
1266 fin_outstanding_ = true;
1267 if (fin_received_) {
1268 QUICHE_DCHECK(!was_draining_);
1269 session_->StreamDraining(id_,
1270 /*unidirectional=*/type_ != BIDIRECTIONAL);
1271 was_draining_ = true;
1272 }
1273 CloseWriteSide();
1274 } else if (fin && !consumed_data.fin_consumed && !write_side_closed_) {
1275 session_->MarkConnectionLevelWriteBlocked(id());
1276 }
1277 } else {
1278 session_->MarkConnectionLevelWriteBlocked(id());
1279 }
1280 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1281 busy_counter_ = 0;
1282 }
1283 }
1284
BufferedDataBytes() const1285 uint64_t QuicStream::BufferedDataBytes() const {
1286 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1287 return send_buffer_.stream_offset() - stream_bytes_written();
1288 }
1289
CanWriteNewData() const1290 bool QuicStream::CanWriteNewData() const {
1291 return BufferedDataBytes() < buffered_data_threshold_;
1292 }
1293
CanWriteNewDataAfterData(QuicByteCount length) const1294 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1295 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1296 }
1297
stream_bytes_written() const1298 uint64_t QuicStream::stream_bytes_written() const {
1299 return send_buffer_.stream_bytes_written();
1300 }
1301
bytes_acked() const1302 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1303 return send_buffer_.bytes_acked();
1304 }
1305
OnStreamDataConsumed(QuicByteCount bytes_consumed)1306 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1307 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1308 }
1309
WritePendingRetransmission()1310 void QuicStream::WritePendingRetransmission() {
1311 while (HasPendingRetransmission()) {
1312 QuicConsumedData consumed(0, false);
1313 if (!send_buffer_.HasPendingRetransmission()) {
1314 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1315 << " retransmits fin only frame.";
1316 consumed = stream_delegate_->WritevData(
1317 id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION,
1318 session()->GetEncryptionLevelToSendApplicationData());
1319 fin_lost_ = !consumed.fin_consumed;
1320 if (fin_lost_) {
1321 // Connection is write blocked.
1322 return;
1323 }
1324 } else {
1325 StreamPendingRetransmission pending =
1326 send_buffer_.NextPendingRetransmission();
1327 // Determine whether the lost fin can be bundled with the data.
1328 const bool can_bundle_fin =
1329 fin_lost_ &&
1330 (pending.offset + pending.length == stream_bytes_written());
1331 consumed = stream_delegate_->WritevData(
1332 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1333 LOSS_RETRANSMISSION,
1334 session()->GetEncryptionLevelToSendApplicationData());
1335 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1336 << " tries to retransmit stream data [" << pending.offset
1337 << ", " << pending.offset + pending.length
1338 << ") and fin: " << can_bundle_fin
1339 << ", consumed: " << consumed;
1340 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1341 consumed.fin_consumed);
1342 if (consumed.bytes_consumed < pending.length ||
1343 (can_bundle_fin && !consumed.fin_consumed)) {
1344 // Connection is write blocked.
1345 return;
1346 }
1347 }
1348 }
1349 }
1350
MaybeSetTtl(QuicTime::Delta ttl)1351 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1352 if (is_static_) {
1353 QUIC_BUG(quic_bug_10586_14) << "Cannot set TTL of a static stream.";
1354 return false;
1355 }
1356 if (deadline_.IsInitialized()) {
1357 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1358 return false;
1359 }
1360 QuicTime now = session()->connection()->clock()->ApproximateNow();
1361 deadline_ = now + ttl;
1362 return true;
1363 }
1364
HasDeadlinePassed() const1365 bool QuicStream::HasDeadlinePassed() const {
1366 if (!deadline_.IsInitialized()) {
1367 // No deadline has been set.
1368 return false;
1369 }
1370 QuicTime now = session()->connection()->clock()->ApproximateNow();
1371 if (now < deadline_) {
1372 return false;
1373 }
1374 // TTL expired.
1375 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1376 return true;
1377 }
1378
OnDeadlinePassed()1379 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
1380
IsFlowControlBlocked() const1381 bool QuicStream::IsFlowControlBlocked() const {
1382 if (!flow_controller_.has_value()) {
1383 QUIC_BUG(quic_bug_10586_15)
1384 << "Trying to access non-existent flow controller.";
1385 return false;
1386 }
1387 return flow_controller_->IsBlocked();
1388 }
1389
highest_received_byte_offset() const1390 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1391 if (!flow_controller_.has_value()) {
1392 QUIC_BUG(quic_bug_10586_16)
1393 << "Trying to access non-existent flow controller.";
1394 return 0;
1395 }
1396 return flow_controller_->highest_received_byte_offset();
1397 }
1398
UpdateReceiveWindowSize(QuicStreamOffset size)1399 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1400 if (!flow_controller_.has_value()) {
1401 QUIC_BUG(quic_bug_10586_17)
1402 << "Trying to access non-existent flow controller.";
1403 return;
1404 }
1405 flow_controller_->UpdateReceiveWindowSize(size);
1406 }
1407
GetSendWindow() const1408 std::optional<QuicByteCount> QuicStream::GetSendWindow() const {
1409 return flow_controller_.has_value()
1410 ? std::optional<QuicByteCount>(flow_controller_->SendWindowSize())
1411 : std::nullopt;
1412 }
1413
GetReceiveWindow() const1414 std::optional<QuicByteCount> QuicStream::GetReceiveWindow() const {
1415 return flow_controller_.has_value()
1416 ? std::optional<QuicByteCount>(
1417 flow_controller_->receive_window_size())
1418 : std::nullopt;
1419 }
1420
OnStreamCreatedFromPendingStream()1421 void QuicStream::OnStreamCreatedFromPendingStream() {
1422 sequencer()->SetUnblocked();
1423 }
1424
CalculateSendWindowSize() const1425 QuicByteCount QuicStream::CalculateSendWindowSize() const {
1426 QuicByteCount send_window;
1427 if (flow_controller_.has_value()) {
1428 send_window = flow_controller_->SendWindowSize();
1429 } else {
1430 send_window = std::numeric_limits<QuicByteCount>::max();
1431 }
1432 if (stream_contributes_to_connection_flow_control_) {
1433 send_window =
1434 std::min(send_window, connection_flow_controller_->SendWindowSize());
1435 }
1436 return send_window;
1437 }
1438
1439 } // namespace quic
1440