1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream.h"
6
7 #include <limits>
8 #include <string>
9
10 #include "absl/strings/str_cat.h"
11 #include "absl/strings/string_view.h"
12 #include "absl/types/optional.h"
13 #include "quiche/quic/core/quic_error_codes.h"
14 #include "quiche/quic/core/quic_flow_controller.h"
15 #include "quiche/quic/core/quic_session.h"
16 #include "quiche/quic/core/quic_types.h"
17 #include "quiche/quic/core/quic_utils.h"
18 #include "quiche/quic/core/quic_versions.h"
19 #include "quiche/quic/platform/api/quic_bug_tracker.h"
20 #include "quiche/quic/platform/api/quic_flag_utils.h"
21 #include "quiche/quic/platform/api/quic_flags.h"
22 #include "quiche/quic/platform/api/quic_logging.h"
23 #include "quiche/common/platform/api/quiche_logging.h"
24 #include "quiche/common/platform/api/quiche_mem_slice.h"
25
26 using spdy::SpdyPriority;
27
28 namespace quic {
29
30 #define ENDPOINT \
31 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
32
33 namespace {
34
DefaultFlowControlWindow(ParsedQuicVersion version)35 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
36 if (!version.AllowsLowFlowControlLimits()) {
37 return kDefaultFlowControlSendWindow;
38 }
39 return 0;
40 }
41
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)42 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
43 QuicStreamId stream_id) {
44 ParsedQuicVersion version = session->connection()->version();
45 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
46 return session->config()->GetInitialStreamFlowControlWindowToSend();
47 }
48
49 // Unidirectional streams (v99 only).
50 if (VersionHasIetfQuicFrames(version.transport_version) &&
51 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
52 return session->config()
53 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
54 }
55
56 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
57 session->perspective())) {
58 return session->config()
59 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
60 }
61
62 return session->config()
63 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
64 }
65
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)66 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
67 QuicStreamId stream_id) {
68 ParsedQuicVersion version = session->connection()->version();
69 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
70 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
71 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
72 }
73
74 return DefaultFlowControlWindow(version);
75 }
76
77 // Unidirectional streams (v99 only).
78 if (VersionHasIetfQuicFrames(version.transport_version) &&
79 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
80 if (session->config()
81 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
82 return session->config()
83 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
84 }
85
86 return DefaultFlowControlWindow(version);
87 }
88
89 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
90 session->perspective())) {
91 if (session->config()
92 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
93 return session->config()
94 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
95 }
96
97 return DefaultFlowControlWindow(version);
98 }
99
100 if (session->config()
101 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
102 return session->config()
103 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
104 }
105
106 return DefaultFlowControlWindow(version);
107 }
108
109 } // namespace
110
PendingStream(QuicStreamId id,QuicSession * session)111 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
112 : id_(id),
113 version_(session->version()),
114 stream_delegate_(session),
115 stream_bytes_read_(0),
116 fin_received_(false),
117 is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
118 /*peer_initiated = */ true,
119 session->version()) ==
120 BIDIRECTIONAL),
121 connection_flow_controller_(session->flow_controller()),
122 flow_controller_(session, id,
123 /*is_connection_flow_controller*/ false,
124 GetReceivedFlowControlWindow(session, id),
125 GetInitialStreamFlowControlWindowToSend(session, id),
126 kStreamReceiveWindowLimit,
127 session->flow_controller()->auto_tune_receive_window(),
128 session->flow_controller()),
129 sequencer_(this) {}
130
OnDataAvailable()131 void PendingStream::OnDataAvailable() {
132 // Data should be kept in the sequencer so that
133 // QuicSession::ProcessPendingStream() can read it.
134 }
135
OnFinRead()136 void PendingStream::OnFinRead() { QUICHE_DCHECK(sequencer_.IsClosed()); }
137
AddBytesConsumed(QuicByteCount bytes)138 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
139 // It will be called when the metadata of the stream is consumed.
140 flow_controller_.AddBytesConsumed(bytes);
141 connection_flow_controller_->AddBytesConsumed(bytes);
142 }
143
ResetWithError(QuicResetStreamError)144 void PendingStream::ResetWithError(QuicResetStreamError /*error*/) {
145 // Currently PendingStream is only read-unidirectional. It shouldn't send
146 // Reset.
147 QUICHE_NOTREACHED();
148 }
149
OnUnrecoverableError(QuicErrorCode error,const std::string & details)150 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
151 const std::string& details) {
152 stream_delegate_->OnStreamError(error, details);
153 }
154
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)155 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
156 QuicIetfTransportErrorCodes ietf_error,
157 const std::string& details) {
158 stream_delegate_->OnStreamError(error, ietf_error, details);
159 }
160
id() const161 QuicStreamId PendingStream::id() const { return id_; }
162
version() const163 ParsedQuicVersion PendingStream::version() const { return version_; }
164
OnStreamFrame(const QuicStreamFrame & frame)165 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
166 QUICHE_DCHECK_EQ(frame.stream_id, id_);
167
168 bool is_stream_too_long =
169 (frame.offset > kMaxStreamLength) ||
170 (kMaxStreamLength - frame.offset < frame.data_length);
171 if (is_stream_too_long) {
172 // Close connection if stream becomes too long.
173 QUIC_PEER_BUG(quic_peer_bug_12570_1)
174 << "Receive stream frame reaches max stream length. frame offset "
175 << frame.offset << " length " << frame.data_length;
176 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
177 "Peer sends more data than allowed on this stream.");
178 return;
179 }
180
181 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
182 OnUnrecoverableError(
183 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
184 absl::StrCat(
185 "Stream ", id_,
186 " received data with offset: ", frame.offset + frame.data_length,
187 ", which is beyond close offset: ", sequencer()->close_offset()));
188 return;
189 }
190
191 if (frame.fin) {
192 fin_received_ = true;
193 }
194
195 // This count includes duplicate data received.
196 QuicByteCount frame_payload_size = frame.data_length;
197 stream_bytes_read_ += frame_payload_size;
198
199 // Flow control is interested in tracking highest received offset.
200 // Only interested in received frames that carry data.
201 if (frame_payload_size > 0 &&
202 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
203 // As the highest received offset has changed, check to see if this is a
204 // violation of flow control.
205 if (flow_controller_.FlowControlViolation() ||
206 connection_flow_controller_->FlowControlViolation()) {
207 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
208 "Flow control violation after increasing offset");
209 return;
210 }
211 }
212
213 sequencer_.OnStreamFrame(frame);
214 }
215
OnRstStreamFrame(const QuicRstStreamFrame & frame)216 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
217 QUICHE_DCHECK_EQ(frame.stream_id, id_);
218
219 if (frame.byte_offset > kMaxStreamLength) {
220 // Peer are not suppose to write bytes more than maxium allowed.
221 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
222 "Reset frame stream offset overflow.");
223 return;
224 }
225
226 const QuicStreamOffset kMaxOffset =
227 std::numeric_limits<QuicStreamOffset>::max();
228 if (sequencer()->close_offset() != kMaxOffset &&
229 frame.byte_offset != sequencer()->close_offset()) {
230 OnUnrecoverableError(
231 QUIC_STREAM_MULTIPLE_OFFSET,
232 absl::StrCat("Stream ", id_,
233 " received new final offset: ", frame.byte_offset,
234 ", which is different from close offset: ",
235 sequencer()->close_offset()));
236 return;
237 }
238
239 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
240 if (flow_controller_.FlowControlViolation() ||
241 connection_flow_controller_->FlowControlViolation()) {
242 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
243 "Flow control violation after increasing offset");
244 return;
245 }
246 }
247
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)248 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
249 QUICHE_DCHECK(is_bidirectional_);
250 flow_controller_.UpdateSendWindowOffset(frame.max_data);
251 }
252
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)253 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
254 QuicStreamOffset new_offset) {
255 uint64_t increment =
256 new_offset - flow_controller_.highest_received_byte_offset();
257 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
258 return false;
259 }
260
261 // If |new_offset| increased the stream flow controller's highest received
262 // offset, increase the connection flow controller's value by the incremental
263 // difference.
264 connection_flow_controller_->UpdateHighestReceivedOffset(
265 connection_flow_controller_->highest_received_byte_offset() + increment);
266 return true;
267 }
268
OnStopSending(QuicResetStreamError stop_sending_error_code)269 void PendingStream::OnStopSending(
270 QuicResetStreamError stop_sending_error_code) {
271 if (!stop_sending_error_code_) {
272 stop_sending_error_code_ = stop_sending_error_code;
273 }
274 }
275
MarkConsumed(QuicByteCount num_bytes)276 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
277 sequencer_.MarkConsumed(num_bytes);
278 }
279
StopReading()280 void PendingStream::StopReading() {
281 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
282 sequencer_.StopReading();
283 }
284
QuicStream(PendingStream * pending,QuicSession * session,bool is_static)285 QuicStream::QuicStream(PendingStream* pending, QuicSession* session,
286 bool is_static)
287 : QuicStream(pending->id_, session, std::move(pending->sequencer_),
288 is_static,
289 QuicUtils::GetStreamType(pending->id_, session->perspective(),
290 /*peer_initiated = */ true,
291 session->version()),
292 pending->stream_bytes_read_, pending->fin_received_,
293 std::move(pending->flow_controller_),
294 pending->connection_flow_controller_) {
295 QUICHE_DCHECK(session->version().HasIetfQuicFrames());
296 sequencer_.set_stream(this);
297 }
298
299 namespace {
300
FlowController(QuicStreamId id,QuicSession * session,StreamType type)301 absl::optional<QuicFlowController> FlowController(QuicStreamId id,
302 QuicSession* session,
303 StreamType type) {
304 if (type == CRYPTO) {
305 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
306 // it is using crypto frames instead of stream frames. The QuicCryptoStream
307 // doesn't have any flow control in that case, so we don't create a
308 // QuicFlowController for it.
309 return absl::nullopt;
310 }
311 return QuicFlowController(
312 session, id,
313 /*is_connection_flow_controller*/ false,
314 GetReceivedFlowControlWindow(session, id),
315 GetInitialStreamFlowControlWindowToSend(session, id),
316 kStreamReceiveWindowLimit,
317 session->flow_controller()->auto_tune_receive_window(),
318 session->flow_controller());
319 }
320
321 } // namespace
322
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)323 QuicStream::QuicStream(QuicStreamId id, QuicSession* session, bool is_static,
324 StreamType type)
325 : QuicStream(id, session, QuicStreamSequencer(this), is_static, type, 0,
326 false, FlowController(id, session, type),
327 session->flow_controller()) {}
328
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,absl::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller)329 QuicStream::QuicStream(QuicStreamId id, QuicSession* session,
330 QuicStreamSequencer sequencer, bool is_static,
331 StreamType type, uint64_t stream_bytes_read,
332 bool fin_received,
333 absl::optional<QuicFlowController> flow_controller,
334 QuicFlowController* connection_flow_controller)
335 : sequencer_(std::move(sequencer)),
336 id_(id),
337 session_(session),
338 stream_delegate_(session),
339 priority_(QuicStreamPriority::Default(session->priority_type())),
340 stream_bytes_read_(stream_bytes_read),
341 stream_error_(QuicResetStreamError::NoError()),
342 connection_error_(QUIC_NO_ERROR),
343 read_side_closed_(false),
344 write_side_closed_(false),
345 write_side_data_recvd_state_notified_(false),
346 fin_buffered_(false),
347 fin_sent_(false),
348 fin_outstanding_(false),
349 fin_lost_(false),
350 fin_received_(fin_received),
351 rst_sent_(false),
352 rst_received_(false),
353 stop_sending_sent_(false),
354 flow_controller_(std::move(flow_controller)),
355 connection_flow_controller_(connection_flow_controller),
356 stream_contributes_to_connection_flow_control_(true),
357 busy_counter_(0),
358 add_random_padding_after_fin_(false),
359 send_buffer_(
360 session->connection()->helper()->GetStreamSendBufferAllocator()),
361 buffered_data_threshold_(GetQuicFlag(quic_buffered_data_threshold)),
362 is_static_(is_static),
363 deadline_(QuicTime::Zero()),
364 was_draining_(false),
365 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
366 type != CRYPTO
367 ? QuicUtils::GetStreamType(id_, session->perspective(),
368 session->IsIncomingStream(id_),
369 session->version())
370 : type),
371 creation_time_(session->connection()->clock()->ApproximateNow()),
372 perspective_(session->perspective()) {
373 if (type_ == WRITE_UNIDIRECTIONAL) {
374 fin_received_ = true;
375 CloseReadSide();
376 } else if (type_ == READ_UNIDIRECTIONAL) {
377 fin_sent_ = true;
378 CloseWriteSide();
379 }
380 if (type_ != CRYPTO) {
381 stream_delegate_->RegisterStreamPriority(id, is_static_, priority_);
382 }
383 }
384
~QuicStream()385 QuicStream::~QuicStream() {
386 if (session_ != nullptr && IsWaitingForAcks()) {
387 QUIC_DVLOG(1)
388 << ENDPOINT << "Stream " << id_
389 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
390 << send_buffer_.stream_bytes_outstanding()
391 << ", fin_outstanding: " << fin_outstanding_;
392 }
393 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
394 stream_delegate_->UnregisterStreamPriority(id());
395 }
396 }
397
OnStreamFrame(const QuicStreamFrame & frame)398 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
399 QUICHE_DCHECK_EQ(frame.stream_id, id_);
400
401 QUICHE_DCHECK(!(read_side_closed_ && write_side_closed_));
402
403 if (frame.fin && is_static_) {
404 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
405 "Attempt to close a static stream");
406 return;
407 }
408
409 if (type_ == WRITE_UNIDIRECTIONAL) {
410 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
411 "Data received on write unidirectional stream");
412 return;
413 }
414
415 bool is_stream_too_long =
416 (frame.offset > kMaxStreamLength) ||
417 (kMaxStreamLength - frame.offset < frame.data_length);
418 if (is_stream_too_long) {
419 // Close connection if stream becomes too long.
420 QUIC_PEER_BUG(quic_peer_bug_10586_1)
421 << "Receive stream frame on stream " << id_
422 << " reaches max stream length. frame offset " << frame.offset
423 << " length " << frame.data_length << ". " << sequencer_.DebugString();
424 OnUnrecoverableError(
425 QUIC_STREAM_LENGTH_OVERFLOW,
426 absl::StrCat("Peer sends more data than allowed on stream ", id_,
427 ". frame: offset = ", frame.offset, ", length = ",
428 frame.data_length, ". ", sequencer_.DebugString()));
429 return;
430 }
431
432 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
433 OnUnrecoverableError(
434 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
435 absl::StrCat(
436 "Stream ", id_,
437 " received data with offset: ", frame.offset + frame.data_length,
438 ", which is beyond close offset: ", sequencer_.close_offset()));
439 return;
440 }
441
442 if (frame.fin && !fin_received_) {
443 fin_received_ = true;
444 if (fin_sent_) {
445 QUICHE_DCHECK(!was_draining_);
446 session_->StreamDraining(id_,
447 /*unidirectional=*/type_ != BIDIRECTIONAL);
448 was_draining_ = true;
449 }
450 }
451
452 if (read_side_closed_) {
453 QUIC_DLOG(INFO)
454 << ENDPOINT << "Stream " << frame.stream_id
455 << " is closed for reading. Ignoring newly received stream data.";
456 // The subclass does not want to read data: blackhole the data.
457 return;
458 }
459
460 // This count includes duplicate data received.
461 QuicByteCount frame_payload_size = frame.data_length;
462 stream_bytes_read_ += frame_payload_size;
463
464 // Flow control is interested in tracking highest received offset.
465 // Only interested in received frames that carry data.
466 if (frame_payload_size > 0 &&
467 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
468 // As the highest received offset has changed, check to see if this is a
469 // violation of flow control.
470 QUIC_BUG_IF(quic_bug_12570_2, !flow_controller_.has_value())
471 << ENDPOINT << "OnStreamFrame called on stream without flow control";
472 if ((flow_controller_.has_value() &&
473 flow_controller_->FlowControlViolation()) ||
474 connection_flow_controller_->FlowControlViolation()) {
475 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
476 "Flow control violation after increasing offset");
477 return;
478 }
479 }
480
481 sequencer_.OnStreamFrame(frame);
482 }
483
OnStopSending(QuicResetStreamError error)484 bool QuicStream::OnStopSending(QuicResetStreamError error) {
485 // Do not reset the stream if all data has been sent and acknowledged.
486 if (write_side_closed() && !IsWaitingForAcks()) {
487 QUIC_DVLOG(1) << ENDPOINT
488 << "Ignoring STOP_SENDING for a write closed stream, id: "
489 << id_;
490 return false;
491 }
492
493 if (is_static_) {
494 QUIC_DVLOG(1) << ENDPOINT
495 << "Received STOP_SENDING for a static stream, id: " << id_
496 << " Closing connection";
497 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
498 "Received STOP_SENDING for a static stream");
499 return false;
500 }
501
502 stream_error_ = error;
503 MaybeSendRstStream(error);
504 return true;
505 }
506
num_frames_received() const507 int QuicStream::num_frames_received() const {
508 return sequencer_.num_frames_received();
509 }
510
num_duplicate_frames_received() const511 int QuicStream::num_duplicate_frames_received() const {
512 return sequencer_.num_duplicate_frames_received();
513 }
514
OnStreamReset(const QuicRstStreamFrame & frame)515 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
516 rst_received_ = true;
517 if (frame.byte_offset > kMaxStreamLength) {
518 // Peer are not suppose to write bytes more than maxium allowed.
519 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
520 "Reset frame stream offset overflow.");
521 return;
522 }
523
524 const QuicStreamOffset kMaxOffset =
525 std::numeric_limits<QuicStreamOffset>::max();
526 if (sequencer()->close_offset() != kMaxOffset &&
527 frame.byte_offset != sequencer()->close_offset()) {
528 OnUnrecoverableError(
529 QUIC_STREAM_MULTIPLE_OFFSET,
530 absl::StrCat("Stream ", id_,
531 " received new final offset: ", frame.byte_offset,
532 ", which is different from close offset: ",
533 sequencer_.close_offset()));
534 return;
535 }
536
537 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
538 QUIC_BUG_IF(quic_bug_12570_3, !flow_controller_.has_value())
539 << ENDPOINT << "OnStreamReset called on stream without flow control";
540 if ((flow_controller_.has_value() &&
541 flow_controller_->FlowControlViolation()) ||
542 connection_flow_controller_->FlowControlViolation()) {
543 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
544 "Flow control violation after increasing offset");
545 return;
546 }
547
548 stream_error_ = frame.error();
549 // Google QUIC closes both sides of the stream in response to a
550 // RESET_STREAM, IETF QUIC closes only the read side.
551 if (!VersionHasIetfQuicFrames(transport_version())) {
552 CloseWriteSide();
553 }
554 CloseReadSide();
555 }
556
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)557 void QuicStream::OnConnectionClosed(QuicErrorCode error,
558 ConnectionCloseSource /*source*/) {
559 if (read_side_closed_ && write_side_closed_) {
560 return;
561 }
562 if (error != QUIC_NO_ERROR) {
563 stream_error_ =
564 QuicResetStreamError::FromInternal(QUIC_STREAM_CONNECTION_ERROR);
565 connection_error_ = error;
566 }
567
568 CloseWriteSide();
569 CloseReadSide();
570 }
571
OnFinRead()572 void QuicStream::OnFinRead() {
573 QUICHE_DCHECK(sequencer_.IsClosed());
574 // OnFinRead can be called due to a FIN flag in a headers block, so there may
575 // have been no OnStreamFrame call with a FIN in the frame.
576 fin_received_ = true;
577 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
578 // stream will be destroyed by CloseReadSide, so don't need to call
579 // StreamDraining.
580 CloseReadSide();
581 }
582
SetFinSent()583 void QuicStream::SetFinSent() {
584 QUICHE_DCHECK(!VersionUsesHttp3(transport_version()));
585 fin_sent_ = true;
586 }
587
Reset(QuicRstStreamErrorCode error)588 void QuicStream::Reset(QuicRstStreamErrorCode error) {
589 ResetWithError(QuicResetStreamError::FromInternal(error));
590 }
591
ResetWithError(QuicResetStreamError error)592 void QuicStream::ResetWithError(QuicResetStreamError error) {
593 stream_error_ = error;
594 QuicConnection::ScopedPacketFlusher flusher(session()->connection());
595 MaybeSendStopSending(error);
596 MaybeSendRstStream(error);
597
598 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
599 session()->MaybeCloseZombieStream(id_);
600 }
601 }
602
ResetWriteSide(QuicResetStreamError error)603 void QuicStream::ResetWriteSide(QuicResetStreamError error) {
604 stream_error_ = error;
605 MaybeSendRstStream(error);
606
607 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
608 session()->MaybeCloseZombieStream(id_);
609 }
610 }
611
SendStopSending(QuicResetStreamError error)612 void QuicStream::SendStopSending(QuicResetStreamError error) {
613 stream_error_ = error;
614 MaybeSendStopSending(error);
615
616 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
617 session()->MaybeCloseZombieStream(id_);
618 }
619 }
620
OnUnrecoverableError(QuicErrorCode error,const std::string & details)621 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
622 const std::string& details) {
623 stream_delegate_->OnStreamError(error, details);
624 }
625
OnUnrecoverableError(QuicErrorCode error,QuicIetfTransportErrorCodes ietf_error,const std::string & details)626 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
627 QuicIetfTransportErrorCodes ietf_error,
628 const std::string& details) {
629 stream_delegate_->OnStreamError(error, ietf_error, details);
630 }
631
priority() const632 const QuicStreamPriority& QuicStream::priority() const { return priority_; }
633
SetPriority(const QuicStreamPriority & priority)634 void QuicStream::SetPriority(const QuicStreamPriority& priority) {
635 priority_ = priority;
636
637 MaybeSendPriorityUpdateFrame();
638
639 stream_delegate_->UpdateStreamPriority(id(), priority);
640 }
641
WriteOrBufferData(absl::string_view data,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)642 void QuicStream::WriteOrBufferData(
643 absl::string_view data, bool fin,
644 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
645 ack_listener) {
646 QUIC_BUG_IF(quic_bug_12570_4,
647 QuicUtils::IsCryptoStreamId(transport_version(), id_))
648 << ENDPOINT
649 << "WriteOrBufferData is used to send application data, use "
650 "WriteOrBufferDataAtLevel to send crypto data.";
651 return WriteOrBufferDataAtLevel(
652 data, fin, session()->GetEncryptionLevelToSendApplicationData(),
653 ack_listener);
654 }
655
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)656 void QuicStream::WriteOrBufferDataAtLevel(
657 absl::string_view data, bool fin, EncryptionLevel level,
658 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
659 ack_listener) {
660 if (data.empty() && !fin) {
661 QUIC_BUG(quic_bug_10586_2) << "data.empty() && !fin";
662 return;
663 }
664
665 if (fin_buffered_) {
666 QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
667 return;
668 }
669 if (write_side_closed_) {
670 QUIC_DLOG(ERROR) << ENDPOINT
671 << "Attempt to write when the write side is closed";
672 if (type_ == READ_UNIDIRECTIONAL) {
673 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
674 "Try to send data on read unidirectional stream");
675 }
676 return;
677 }
678
679 fin_buffered_ = fin;
680
681 bool had_buffered_data = HasBufferedData();
682 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
683 // all data to be consumed.
684 if (data.length() > 0) {
685 QuicStreamOffset offset = send_buffer_.stream_offset();
686 if (kMaxStreamLength - offset < data.length()) {
687 QUIC_BUG(quic_bug_10586_4) << "Write too many data via stream " << id_;
688 OnUnrecoverableError(
689 QUIC_STREAM_LENGTH_OVERFLOW,
690 absl::StrCat("Write too many data via stream ", id_));
691 return;
692 }
693 send_buffer_.SaveStreamData(data);
694 OnDataBuffered(offset, data.length(), ack_listener);
695 }
696 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
697 // Write data if there is no buffered data before.
698 WriteBufferedData(level);
699 }
700 }
701
OnCanWrite()702 void QuicStream::OnCanWrite() {
703 if (HasDeadlinePassed()) {
704 OnDeadlinePassed();
705 return;
706 }
707 if (HasPendingRetransmission()) {
708 WritePendingRetransmission();
709 // Exit early to allow other streams to write pending retransmissions if
710 // any.
711 return;
712 }
713
714 if (write_side_closed_) {
715 QUIC_DLOG(ERROR)
716 << ENDPOINT << "Stream " << id()
717 << " attempting to write new data when the write side is closed";
718 return;
719 }
720 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
721 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
722 }
723 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
724 // Notify upper layer to write new data when buffered data size is below
725 // low water mark.
726 OnCanWriteNewData();
727 }
728 }
729
MaybeSendBlocked()730 void QuicStream::MaybeSendBlocked() {
731 if (!flow_controller_.has_value()) {
732 QUIC_BUG(quic_bug_10586_5)
733 << ENDPOINT << "MaybeSendBlocked called on stream without flow control";
734 return;
735 }
736 flow_controller_->MaybeSendBlocked();
737 if (!stream_contributes_to_connection_flow_control_) {
738 return;
739 }
740 connection_flow_controller_->MaybeSendBlocked();
741
742 // If the stream is blocked by connection-level flow control but not by
743 // stream-level flow control, add the stream to the write blocked list so that
744 // the stream will be given a chance to write when a connection-level
745 // WINDOW_UPDATE arrives.
746 if (!write_side_closed_ && connection_flow_controller_->IsBlocked() &&
747 !flow_controller_->IsBlocked()) {
748 session_->MarkConnectionLevelWriteBlocked(id());
749 }
750 }
751
WriteMemSlice(quiche::QuicheMemSlice span,bool fin)752 QuicConsumedData QuicStream::WriteMemSlice(quiche::QuicheMemSlice span,
753 bool fin) {
754 return WriteMemSlices(absl::MakeSpan(&span, 1), fin);
755 }
756
WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span,bool fin)757 QuicConsumedData QuicStream::WriteMemSlices(
758 absl::Span<quiche::QuicheMemSlice> span, bool fin) {
759 QuicConsumedData consumed_data(0, false);
760 if (span.empty() && !fin) {
761 QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin";
762 return consumed_data;
763 }
764
765 if (fin_buffered_) {
766 QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
767 return consumed_data;
768 }
769
770 if (write_side_closed_) {
771 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
772 << " attempting to write when the write side is closed";
773 if (type_ == READ_UNIDIRECTIONAL) {
774 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
775 "Try to send data on read unidirectional stream");
776 }
777 return consumed_data;
778 }
779
780 bool had_buffered_data = HasBufferedData();
781 if (CanWriteNewData() || span.empty()) {
782 consumed_data.fin_consumed = fin;
783 if (!span.empty()) {
784 // Buffer all data if buffered data size is below limit.
785 QuicStreamOffset offset = send_buffer_.stream_offset();
786 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
787 if (offset > send_buffer_.stream_offset() ||
788 kMaxStreamLength < send_buffer_.stream_offset()) {
789 QUIC_BUG(quic_bug_10586_8) << "Write too many data via stream " << id_;
790 OnUnrecoverableError(
791 QUIC_STREAM_LENGTH_OVERFLOW,
792 absl::StrCat("Write too many data via stream ", id_));
793 return consumed_data;
794 }
795 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
796 }
797 }
798 fin_buffered_ = consumed_data.fin_consumed;
799
800 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
801 // Write data if there is no buffered data before.
802 WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
803 }
804
805 return consumed_data;
806 }
807
HasPendingRetransmission() const808 bool QuicStream::HasPendingRetransmission() const {
809 return send_buffer_.HasPendingRetransmission() || fin_lost_;
810 }
811
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const812 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
813 QuicByteCount data_length,
814 bool fin) const {
815 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
816 (fin && fin_outstanding_);
817 }
818
CloseReadSide()819 void QuicStream::CloseReadSide() {
820 if (read_side_closed_) {
821 return;
822 }
823 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
824
825 read_side_closed_ = true;
826 sequencer_.ReleaseBuffer();
827
828 if (write_side_closed_) {
829 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
830 session_->OnStreamClosed(id());
831 OnClose();
832 }
833 }
834
CloseWriteSide()835 void QuicStream::CloseWriteSide() {
836 if (write_side_closed_) {
837 return;
838 }
839 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
840
841 write_side_closed_ = true;
842 if (read_side_closed_) {
843 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
844 session_->OnStreamClosed(id());
845 OnClose();
846 }
847 }
848
MaybeSendStopSending(QuicResetStreamError error)849 void QuicStream::MaybeSendStopSending(QuicResetStreamError error) {
850 if (stop_sending_sent_) {
851 return;
852 }
853
854 if (!session()->version().UsesHttp3() && !error.ok()) {
855 // In gQUIC, RST with error closes both read and write side.
856 return;
857 }
858
859 if (session()->version().UsesHttp3()) {
860 session()->MaybeSendStopSendingFrame(id(), error);
861 } else {
862 QUICHE_DCHECK_EQ(QUIC_STREAM_NO_ERROR, error.internal_code());
863 session()->MaybeSendRstStreamFrame(id(), QuicResetStreamError::NoError(),
864 stream_bytes_written());
865 }
866 stop_sending_sent_ = true;
867 CloseReadSide();
868 }
869
MaybeSendRstStream(QuicResetStreamError error)870 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
871 if (rst_sent_) {
872 return;
873 }
874
875 if (!session()->version().UsesHttp3()) {
876 QUIC_BUG_IF(quic_bug_12570_5, error.ok());
877 stop_sending_sent_ = true;
878 CloseReadSide();
879 }
880 session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
881 rst_sent_ = true;
882 CloseWriteSide();
883 }
884
HasBufferedData() const885 bool QuicStream::HasBufferedData() const {
886 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
887 return send_buffer_.stream_offset() > stream_bytes_written();
888 }
889
version() const890 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
891
transport_version() const892 QuicTransportVersion QuicStream::transport_version() const {
893 return session_->transport_version();
894 }
895
handshake_protocol() const896 HandshakeProtocol QuicStream::handshake_protocol() const {
897 return session_->connection()->version().handshake_protocol;
898 }
899
StopReading()900 void QuicStream::StopReading() {
901 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
902 sequencer_.StopReading();
903 }
904
OnClose()905 void QuicStream::OnClose() {
906 QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
907
908 if (!fin_sent_ && !rst_sent_) {
909 QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
910 session()->version().UsesHttp3())
911 << "The stream should've already sent RST in response to "
912 "STOP_SENDING";
913 // For flow control accounting, tell the peer how many bytes have been
914 // written on this stream before termination. Done here if needed, using a
915 // RST_STREAM frame.
916 MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
917 session_->MaybeCloseZombieStream(id_);
918 }
919
920 if (!flow_controller_.has_value() ||
921 flow_controller_->FlowControlViolation() ||
922 connection_flow_controller_->FlowControlViolation()) {
923 return;
924 }
925 // The stream is being closed and will not process any further incoming bytes.
926 // As there may be more bytes in flight, to ensure that both endpoints have
927 // the same connection level flow control state, mark all unreceived or
928 // buffered bytes as consumed.
929 QuicByteCount bytes_to_consume =
930 flow_controller_->highest_received_byte_offset() -
931 flow_controller_->bytes_consumed();
932 AddBytesConsumed(bytes_to_consume);
933 }
934
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)935 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
936 if (type_ == READ_UNIDIRECTIONAL) {
937 OnUnrecoverableError(
938 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
939 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
940 return;
941 }
942
943 if (!flow_controller_.has_value()) {
944 QUIC_BUG(quic_bug_10586_9)
945 << ENDPOINT
946 << "OnWindowUpdateFrame called on stream without flow control";
947 return;
948 }
949
950 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
951 // Let session unblock this stream.
952 session_->MarkConnectionLevelWriteBlocked(id_);
953 }
954 }
955
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)956 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
957 QuicStreamOffset new_offset) {
958 if (!flow_controller_.has_value()) {
959 QUIC_BUG(quic_bug_10586_10)
960 << ENDPOINT
961 << "MaybeIncreaseHighestReceivedOffset called on stream without "
962 "flow control";
963 return false;
964 }
965 uint64_t increment =
966 new_offset - flow_controller_->highest_received_byte_offset();
967 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
968 return false;
969 }
970
971 // If |new_offset| increased the stream flow controller's highest received
972 // offset, increase the connection flow controller's value by the incremental
973 // difference.
974 if (stream_contributes_to_connection_flow_control_) {
975 connection_flow_controller_->UpdateHighestReceivedOffset(
976 connection_flow_controller_->highest_received_byte_offset() +
977 increment);
978 }
979 return true;
980 }
981
AddBytesSent(QuicByteCount bytes)982 void QuicStream::AddBytesSent(QuicByteCount bytes) {
983 if (!flow_controller_.has_value()) {
984 QUIC_BUG(quic_bug_10586_11)
985 << ENDPOINT << "AddBytesSent called on stream without flow control";
986 return;
987 }
988 flow_controller_->AddBytesSent(bytes);
989 if (stream_contributes_to_connection_flow_control_) {
990 connection_flow_controller_->AddBytesSent(bytes);
991 }
992 }
993
AddBytesConsumed(QuicByteCount bytes)994 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
995 if (type_ == CRYPTO) {
996 // A stream with type CRYPTO has no flow control, so there's nothing this
997 // function needs to do. This function still gets called by the
998 // QuicStreamSequencers used by QuicCryptoStream.
999 return;
1000 }
1001 if (!flow_controller_.has_value()) {
1002 QUIC_BUG(quic_bug_12570_7)
1003 << ENDPOINT
1004 << "AddBytesConsumed called on non-crypto stream without flow control";
1005 return;
1006 }
1007 // Only adjust stream level flow controller if still reading.
1008 if (!read_side_closed_) {
1009 flow_controller_->AddBytesConsumed(bytes);
1010 }
1011
1012 if (stream_contributes_to_connection_flow_control_) {
1013 connection_flow_controller_->AddBytesConsumed(bytes);
1014 }
1015 }
1016
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1017 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1018 bool was_zero_rtt_rejected) {
1019 if (!flow_controller_.has_value()) {
1020 QUIC_BUG(quic_bug_10586_12)
1021 << ENDPOINT
1022 << "ConfigSendWindowOffset called on stream without flow control";
1023 return false;
1024 }
1025
1026 // The validation code below is for QUIC with TLS only.
1027 if (new_offset < flow_controller_->send_window_offset()) {
1028 QUICHE_DCHECK(session()->version().UsesTls());
1029 if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1030 // The client is given flow control window lower than what's written in
1031 // 0-RTT. This QUIC implementation is unable to retransmit them.
1032 QUIC_BUG_IF(quic_bug_12570_8, perspective_ == Perspective::IS_SERVER)
1033 << "Server streams' flow control should never be configured twice.";
1034 OnUnrecoverableError(
1035 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1036 absl::StrCat(
1037 "Server rejected 0-RTT, aborting because new stream max data ",
1038 new_offset, " for stream ", id_, " is less than currently used: ",
1039 flow_controller_->bytes_sent()));
1040 return false;
1041 } else if (session()->version().AllowsLowFlowControlLimits()) {
1042 // In IETF QUIC, if the client receives flow control limit lower than what
1043 // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1044 // peer's fault or our implementation's fault.
1045 QUIC_BUG_IF(quic_bug_12570_9, perspective_ == Perspective::IS_SERVER)
1046 << "Server streams' flow control should never be configured twice.";
1047 OnUnrecoverableError(
1048 was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1049 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1050 absl::StrCat(
1051 was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1052 : "",
1053 "new stream max data ", new_offset, " decreases current limit: ",
1054 flow_controller_->send_window_offset()));
1055 return false;
1056 }
1057 }
1058
1059 if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1060 // Let session unblock this stream.
1061 session_->MarkConnectionLevelWriteBlocked(id_);
1062 }
1063 return true;
1064 }
1065
AddRandomPaddingAfterFin()1066 void QuicStream::AddRandomPaddingAfterFin() {
1067 add_random_padding_after_fin_ = true;
1068 }
1069
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1070 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1071 QuicByteCount data_length, bool fin_acked,
1072 QuicTime::Delta /*ack_delay_time*/,
1073 QuicTime /*receive_timestamp*/,
1074 QuicByteCount* newly_acked_length) {
1075 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1076 << "[" << offset << ", " << offset + data_length << "]"
1077 << " fin = " << fin_acked;
1078 *newly_acked_length = 0;
1079 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1080 newly_acked_length)) {
1081 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1082 return false;
1083 }
1084 if (!fin_sent_ && fin_acked) {
1085 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1086 return false;
1087 }
1088 // Indicates whether ack listener's OnPacketAcked should be called.
1089 const bool new_data_acked =
1090 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1091 if (fin_acked) {
1092 fin_outstanding_ = false;
1093 fin_lost_ = false;
1094 }
1095 if (!IsWaitingForAcks() && write_side_closed_ &&
1096 !write_side_data_recvd_state_notified_) {
1097 OnWriteSideInDataRecvdState();
1098 write_side_data_recvd_state_notified_ = true;
1099 }
1100 if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1101 session_->MaybeCloseZombieStream(id_);
1102 }
1103 return new_data_acked;
1104 }
1105
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1106 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1107 QuicByteCount data_length,
1108 bool fin_retransmitted) {
1109 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1110 if (fin_retransmitted) {
1111 fin_lost_ = false;
1112 }
1113 }
1114
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1115 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1116 QuicByteCount data_length, bool fin_lost) {
1117 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1118 << "[" << offset << ", " << offset + data_length << "]"
1119 << " fin = " << fin_lost;
1120 if (data_length > 0) {
1121 send_buffer_.OnStreamDataLost(offset, data_length);
1122 }
1123 if (fin_lost && fin_outstanding_) {
1124 fin_lost_ = true;
1125 }
1126 }
1127
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1128 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1129 QuicByteCount data_length, bool fin,
1130 TransmissionType type) {
1131 QUICHE_DCHECK(type == PTO_RETRANSMISSION);
1132 if (HasDeadlinePassed()) {
1133 OnDeadlinePassed();
1134 return true;
1135 }
1136 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1137 offset + data_length);
1138 retransmission.Difference(bytes_acked());
1139 bool retransmit_fin = fin && fin_outstanding_;
1140 if (retransmission.Empty() && !retransmit_fin) {
1141 return true;
1142 }
1143 QuicConsumedData consumed(0, false);
1144 for (const auto& interval : retransmission) {
1145 QuicStreamOffset retransmission_offset = interval.min();
1146 QuicByteCount retransmission_length = interval.max() - interval.min();
1147 const bool can_bundle_fin =
1148 retransmit_fin && (retransmission_offset + retransmission_length ==
1149 stream_bytes_written());
1150 consumed = stream_delegate_->WritevData(
1151 id_, retransmission_length, retransmission_offset,
1152 can_bundle_fin ? FIN : NO_FIN, type,
1153 session()->GetEncryptionLevelToSendApplicationData());
1154 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1155 << " is forced to retransmit stream data ["
1156 << retransmission_offset << ", "
1157 << retransmission_offset + retransmission_length
1158 << ") and fin: " << can_bundle_fin
1159 << ", consumed: " << consumed;
1160 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1161 consumed.fin_consumed);
1162 if (can_bundle_fin) {
1163 retransmit_fin = !consumed.fin_consumed;
1164 }
1165 if (consumed.bytes_consumed < retransmission_length ||
1166 (can_bundle_fin && !consumed.fin_consumed)) {
1167 // Connection is write blocked.
1168 return false;
1169 }
1170 }
1171 if (retransmit_fin) {
1172 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1173 << " retransmits fin only frame.";
1174 consumed = stream_delegate_->WritevData(
1175 id_, 0, stream_bytes_written(), FIN, type,
1176 session()->GetEncryptionLevelToSendApplicationData());
1177 if (!consumed.fin_consumed) {
1178 return false;
1179 }
1180 }
1181 return true;
1182 }
1183
IsWaitingForAcks() const1184 bool QuicStream::IsWaitingForAcks() const {
1185 return (!rst_sent_ || stream_error_.ok()) &&
1186 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1187 }
1188
ReadableBytes() const1189 QuicByteCount QuicStream::ReadableBytes() const {
1190 return sequencer_.ReadableBytes();
1191 }
1192
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1193 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1194 QuicByteCount data_length,
1195 QuicDataWriter* writer) {
1196 QUICHE_DCHECK_LT(0u, data_length);
1197 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1198 << offset << " length " << data_length;
1199 return send_buffer_.WriteStreamData(offset, data_length, writer);
1200 }
1201
WriteBufferedData(EncryptionLevel level)1202 void QuicStream::WriteBufferedData(EncryptionLevel level) {
1203 QUICHE_DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1204
1205 if (session_->ShouldYield(id())) {
1206 session_->MarkConnectionLevelWriteBlocked(id());
1207 return;
1208 }
1209
1210 // Size of buffered data.
1211 QuicByteCount write_length = BufferedDataBytes();
1212
1213 // A FIN with zero data payload should not be flow control blocked.
1214 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1215
1216 bool fin = fin_buffered_;
1217
1218 // How much data flow control permits to be written.
1219 QuicByteCount send_window;
1220 if (flow_controller_.has_value()) {
1221 send_window = flow_controller_->SendWindowSize();
1222 } else {
1223 send_window = std::numeric_limits<QuicByteCount>::max();
1224 QUIC_BUG(quic_bug_10586_13)
1225 << ENDPOINT
1226 << "WriteBufferedData called on stream without flow control";
1227 }
1228 if (stream_contributes_to_connection_flow_control_) {
1229 send_window =
1230 std::min(send_window, connection_flow_controller_->SendWindowSize());
1231 }
1232
1233 if (send_window == 0 && !fin_with_zero_data) {
1234 // Quick return if nothing can be sent.
1235 MaybeSendBlocked();
1236 return;
1237 }
1238
1239 if (write_length > send_window) {
1240 // Don't send the FIN unless all the data will be sent.
1241 fin = false;
1242
1243 // Writing more data would be a violation of flow control.
1244 write_length = send_window;
1245 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1246 << write_length << " due to flow control";
1247 }
1248
1249 StreamSendingState state = fin ? FIN : NO_FIN;
1250 if (fin && add_random_padding_after_fin_) {
1251 state = FIN_AND_PADDING;
1252 }
1253 QuicConsumedData consumed_data =
1254 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1255 state, NOT_RETRANSMISSION, level);
1256
1257 OnStreamDataConsumed(consumed_data.bytes_consumed);
1258
1259 AddBytesSent(consumed_data.bytes_consumed);
1260 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1261 << stream_bytes_written() << " bytes "
1262 << " and has buffered data " << BufferedDataBytes() << " bytes."
1263 << " fin is sent: " << consumed_data.fin_consumed
1264 << " fin is buffered: " << fin_buffered_;
1265
1266 // The write may have generated a write error causing this stream to be
1267 // closed. If so, simply return without marking the stream write blocked.
1268 if (write_side_closed_) {
1269 return;
1270 }
1271
1272 if (consumed_data.bytes_consumed == write_length) {
1273 if (!fin_with_zero_data) {
1274 MaybeSendBlocked();
1275 }
1276 if (fin && consumed_data.fin_consumed) {
1277 QUICHE_DCHECK(!fin_sent_);
1278 fin_sent_ = true;
1279 fin_outstanding_ = true;
1280 if (fin_received_) {
1281 QUICHE_DCHECK(!was_draining_);
1282 session_->StreamDraining(id_,
1283 /*unidirectional=*/type_ != BIDIRECTIONAL);
1284 was_draining_ = true;
1285 }
1286 CloseWriteSide();
1287 } else if (fin && !consumed_data.fin_consumed && !write_side_closed_) {
1288 session_->MarkConnectionLevelWriteBlocked(id());
1289 }
1290 } else {
1291 session_->MarkConnectionLevelWriteBlocked(id());
1292 }
1293 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1294 busy_counter_ = 0;
1295 }
1296 }
1297
BufferedDataBytes() const1298 uint64_t QuicStream::BufferedDataBytes() const {
1299 QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1300 return send_buffer_.stream_offset() - stream_bytes_written();
1301 }
1302
CanWriteNewData() const1303 bool QuicStream::CanWriteNewData() const {
1304 return BufferedDataBytes() < buffered_data_threshold_;
1305 }
1306
CanWriteNewDataAfterData(QuicByteCount length) const1307 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1308 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1309 }
1310
stream_bytes_written() const1311 uint64_t QuicStream::stream_bytes_written() const {
1312 return send_buffer_.stream_bytes_written();
1313 }
1314
bytes_acked() const1315 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1316 return send_buffer_.bytes_acked();
1317 }
1318
OnStreamDataConsumed(QuicByteCount bytes_consumed)1319 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1320 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1321 }
1322
WritePendingRetransmission()1323 void QuicStream::WritePendingRetransmission() {
1324 while (HasPendingRetransmission()) {
1325 QuicConsumedData consumed(0, false);
1326 if (!send_buffer_.HasPendingRetransmission()) {
1327 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1328 << " retransmits fin only frame.";
1329 consumed = stream_delegate_->WritevData(
1330 id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION,
1331 session()->GetEncryptionLevelToSendApplicationData());
1332 fin_lost_ = !consumed.fin_consumed;
1333 if (fin_lost_) {
1334 // Connection is write blocked.
1335 return;
1336 }
1337 } else {
1338 StreamPendingRetransmission pending =
1339 send_buffer_.NextPendingRetransmission();
1340 // Determine whether the lost fin can be bundled with the data.
1341 const bool can_bundle_fin =
1342 fin_lost_ &&
1343 (pending.offset + pending.length == stream_bytes_written());
1344 consumed = stream_delegate_->WritevData(
1345 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1346 LOSS_RETRANSMISSION,
1347 session()->GetEncryptionLevelToSendApplicationData());
1348 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1349 << " tries to retransmit stream data [" << pending.offset
1350 << ", " << pending.offset + pending.length
1351 << ") and fin: " << can_bundle_fin
1352 << ", consumed: " << consumed;
1353 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1354 consumed.fin_consumed);
1355 if (consumed.bytes_consumed < pending.length ||
1356 (can_bundle_fin && !consumed.fin_consumed)) {
1357 // Connection is write blocked.
1358 return;
1359 }
1360 }
1361 }
1362 }
1363
MaybeSetTtl(QuicTime::Delta ttl)1364 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1365 if (is_static_) {
1366 QUIC_BUG(quic_bug_10586_14) << "Cannot set TTL of a static stream.";
1367 return false;
1368 }
1369 if (deadline_.IsInitialized()) {
1370 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1371 return false;
1372 }
1373 QuicTime now = session()->connection()->clock()->ApproximateNow();
1374 deadline_ = now + ttl;
1375 return true;
1376 }
1377
HasDeadlinePassed() const1378 bool QuicStream::HasDeadlinePassed() const {
1379 if (!deadline_.IsInitialized()) {
1380 // No deadline has been set.
1381 return false;
1382 }
1383 QuicTime now = session()->connection()->clock()->ApproximateNow();
1384 if (now < deadline_) {
1385 return false;
1386 }
1387 // TTL expired.
1388 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1389 return true;
1390 }
1391
OnDeadlinePassed()1392 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
1393
IsFlowControlBlocked() const1394 bool QuicStream::IsFlowControlBlocked() const {
1395 if (!flow_controller_.has_value()) {
1396 QUIC_BUG(quic_bug_10586_15)
1397 << "Trying to access non-existent flow controller.";
1398 return false;
1399 }
1400 return flow_controller_->IsBlocked();
1401 }
1402
highest_received_byte_offset() const1403 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1404 if (!flow_controller_.has_value()) {
1405 QUIC_BUG(quic_bug_10586_16)
1406 << "Trying to access non-existent flow controller.";
1407 return 0;
1408 }
1409 return flow_controller_->highest_received_byte_offset();
1410 }
1411
UpdateReceiveWindowSize(QuicStreamOffset size)1412 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1413 if (!flow_controller_.has_value()) {
1414 QUIC_BUG(quic_bug_10586_17)
1415 << "Trying to access non-existent flow controller.";
1416 return;
1417 }
1418 flow_controller_->UpdateReceiveWindowSize(size);
1419 }
1420
GetSendWindow() const1421 absl::optional<QuicByteCount> QuicStream::GetSendWindow() const {
1422 return flow_controller_.has_value()
1423 ? absl::optional<QuicByteCount>(flow_controller_->SendWindowSize())
1424 : absl::nullopt;
1425 }
1426
GetReceiveWindow() const1427 absl::optional<QuicByteCount> QuicStream::GetReceiveWindow() const {
1428 return flow_controller_.has_value()
1429 ? absl::optional<QuicByteCount>(
1430 flow_controller_->receive_window_size())
1431 : absl::nullopt;
1432 }
1433
OnStreamCreatedFromPendingStream()1434 void QuicStream::OnStreamCreatedFromPendingStream() {
1435 sequencer()->SetUnblocked();
1436 }
1437
1438 } // namespace quic
1439