• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_stream.h"
6 
7 #include <algorithm>
8 #include <limits>
9 #include <string_view>
10 #include <utility>
11 
12 #include "base/check_op.h"
13 #include "base/compiler_specific.h"
14 #include "base/functional/bind.h"
15 #include "base/location.h"
16 #include "base/metrics/histogram_functions.h"
17 #include "base/metrics/histogram_macros.h"
18 #include "base/notreached.h"
19 #include "base/strings/string_number_conversions.h"
20 #include "base/strings/stringprintf.h"
21 #include "base/task/single_thread_task_runner.h"
22 #include "base/trace_event/memory_usage_estimator.h"
23 #include "base/values.h"
24 #include "net/base/load_timing_info.h"
25 #include "net/http/http_status_code.h"
26 #include "net/log/net_log.h"
27 #include "net/log/net_log_capture_mode.h"
28 #include "net/log/net_log_event_type.h"
29 #include "net/spdy/spdy_buffer_producer.h"
30 #include "net/spdy/spdy_http_utils.h"
31 #include "net/spdy/spdy_log_util.h"
32 #include "net/spdy/spdy_session.h"
33 
34 namespace net {
35 
36 namespace {
37 
NetLogSpdyStreamErrorParams(spdy::SpdyStreamId stream_id,int net_error,std::string_view description)38 base::Value::Dict NetLogSpdyStreamErrorParams(spdy::SpdyStreamId stream_id,
39                                               int net_error,
40                                               std::string_view description) {
41   return base::Value::Dict()
42       .Set("stream_id", static_cast<int>(stream_id))
43       .Set("net_error", ErrorToShortString(net_error))
44       .Set("description", description);
45 }
46 
NetLogSpdyStreamWindowUpdateParams(spdy::SpdyStreamId stream_id,int32_t delta,int32_t window_size)47 base::Value::Dict NetLogSpdyStreamWindowUpdateParams(
48     spdy::SpdyStreamId stream_id,
49     int32_t delta,
50     int32_t window_size) {
51   return base::Value::Dict()
52       .Set("stream_id", static_cast<int>(stream_id))
53       .Set("delta", delta)
54       .Set("window_size", window_size);
55 }
56 
NetLogSpdyDataParams(spdy::SpdyStreamId stream_id,int size,bool fin)57 base::Value::Dict NetLogSpdyDataParams(spdy::SpdyStreamId stream_id,
58                                        int size,
59                                        bool fin) {
60   return base::Value::Dict()
61       .Set("stream_id", static_cast<int>(stream_id))
62       .Set("size", size)
63       .Set("fin", fin);
64 }
65 
66 }  // namespace
67 
68 // A wrapper around a stream that calls into ProduceHeadersFrame().
69 class SpdyStream::HeadersBufferProducer : public SpdyBufferProducer {
70  public:
HeadersBufferProducer(const base::WeakPtr<SpdyStream> & stream)71   explicit HeadersBufferProducer(const base::WeakPtr<SpdyStream>& stream)
72       : stream_(stream) {
73     DCHECK(stream_.get());
74   }
75 
76   ~HeadersBufferProducer() override = default;
77 
ProduceBuffer()78   std::unique_ptr<SpdyBuffer> ProduceBuffer() override {
79     if (!stream_.get()) {
80       NOTREACHED();
81     }
82     DCHECK_GT(stream_->stream_id(), 0u);
83     return std::make_unique<SpdyBuffer>(stream_->ProduceHeadersFrame());
84   }
85 
86  private:
87   const base::WeakPtr<SpdyStream> stream_;
88 };
89 
SpdyStream(SpdyStreamType type,const base::WeakPtr<SpdySession> & session,const GURL & url,RequestPriority priority,int32_t initial_send_window_size,int32_t max_recv_window_size,const NetLogWithSource & net_log,const NetworkTrafficAnnotationTag & traffic_annotation,bool detect_broken_connection)90 SpdyStream::SpdyStream(SpdyStreamType type,
91                        const base::WeakPtr<SpdySession>& session,
92                        const GURL& url,
93                        RequestPriority priority,
94                        int32_t initial_send_window_size,
95                        int32_t max_recv_window_size,
96                        const NetLogWithSource& net_log,
97                        const NetworkTrafficAnnotationTag& traffic_annotation,
98                        bool detect_broken_connection)
99     : type_(type),
100       url_(url),
101       priority_(priority),
102       send_window_size_(initial_send_window_size),
103       max_recv_window_size_(max_recv_window_size),
104       recv_window_size_(max_recv_window_size),
105       last_recv_window_update_(base::TimeTicks::Now()),
106       session_(session),
107       request_time_(base::Time::Now()),
108       net_log_(net_log),
109       traffic_annotation_(traffic_annotation),
110       detect_broken_connection_(detect_broken_connection) {
111   CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
112         type_ == SPDY_REQUEST_RESPONSE_STREAM);
113   CHECK_GE(priority_, MINIMUM_PRIORITY);
114   CHECK_LE(priority_, MAXIMUM_PRIORITY);
115 }
116 
~SpdyStream()117 SpdyStream::~SpdyStream() {
118   CHECK(!write_handler_guard_);
119 }
120 
SetDelegate(Delegate * delegate)121 void SpdyStream::SetDelegate(Delegate* delegate) {
122   CHECK(!delegate_);
123   CHECK(delegate);
124   delegate_ = delegate;
125 
126   CHECK(io_state_ == STATE_IDLE || io_state_ == STATE_RESERVED_REMOTE);
127 }
128 
ProduceHeadersFrame()129 std::unique_ptr<spdy::SpdySerializedFrame> SpdyStream::ProduceHeadersFrame() {
130   CHECK_EQ(io_state_, STATE_IDLE);
131   CHECK(request_headers_valid_);
132   CHECK_GT(stream_id_, 0u);
133 
134   spdy::SpdyControlFlags flags = (pending_send_status_ == NO_MORE_DATA_TO_SEND)
135                                      ? spdy::CONTROL_FLAG_FIN
136                                      : spdy::CONTROL_FLAG_NONE;
137   std::unique_ptr<spdy::SpdySerializedFrame> frame(session_->CreateHeaders(
138       stream_id_, priority_, flags, std::move(request_headers_),
139       delegate_->source_dependency()));
140   request_headers_valid_ = false;
141   send_time_ = base::TimeTicks::Now();
142   return frame;
143 }
144 
DetachDelegate()145 void SpdyStream::DetachDelegate() {
146   DCHECK(!IsClosed());
147   delegate_ = nullptr;
148   Cancel(ERR_ABORTED);
149 }
150 
SetPriority(RequestPriority priority)151 void SpdyStream::SetPriority(RequestPriority priority) {
152   if (priority_ == priority) {
153     return;
154   }
155 
156   session_->UpdateStreamPriority(this, /* old_priority = */ priority_,
157                                  /* new_priority = */ priority);
158 
159   priority_ = priority;
160 }
161 
AdjustSendWindowSize(int32_t delta_window_size)162 bool SpdyStream::AdjustSendWindowSize(int32_t delta_window_size) {
163   if (IsClosed())
164     return true;
165 
166   if (delta_window_size > 0) {
167     if (send_window_size_ >
168         std::numeric_limits<int32_t>::max() - delta_window_size) {
169       return false;
170     }
171   } else {
172     // Minimum allowed value for spdy::SETTINGS_INITIAL_WINDOW_SIZE is 0 and
173     // maximum is 2^31-1.  Data are not sent when |send_window_size_ < 0|, that
174     // is, |send_window_size_ | can only decrease by a change in
175     // spdy::SETTINGS_INITIAL_WINDOW_SIZE.  Therefore |send_window_size_| should
176     // never be able to become less than -(2^31-1).
177     DCHECK_LE(std::numeric_limits<int32_t>::min() - delta_window_size,
178               send_window_size_);
179   }
180 
181   send_window_size_ += delta_window_size;
182 
183   net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, [&] {
184     return NetLogSpdyStreamWindowUpdateParams(stream_id_, delta_window_size,
185                                               send_window_size_);
186   });
187 
188   PossiblyResumeIfSendStalled();
189   return true;
190 }
191 
OnWriteBufferConsumed(size_t frame_payload_size,size_t consume_size,SpdyBuffer::ConsumeSource consume_source)192 void SpdyStream::OnWriteBufferConsumed(
193     size_t frame_payload_size,
194     size_t consume_size,
195     SpdyBuffer::ConsumeSource consume_source) {
196   if (consume_source == SpdyBuffer::DISCARD) {
197     // If we're discarding a frame or part of it, increase the send
198     // window by the number of discarded bytes. (Although if we're
199     // discarding part of a frame, it's probably because of a write
200     // error and we'll be tearing down the stream soon.)
201     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
202     DCHECK_GT(remaining_payload_bytes, 0u);
203     IncreaseSendWindowSize(static_cast<int32_t>(remaining_payload_bytes));
204   }
205   // For consumed bytes, the send window is increased when we receive
206   // a WINDOW_UPDATE frame.
207 }
208 
IncreaseSendWindowSize(int32_t delta_window_size)209 void SpdyStream::IncreaseSendWindowSize(int32_t delta_window_size) {
210   DCHECK_GE(delta_window_size, 1);
211 
212   if (!AdjustSendWindowSize(delta_window_size)) {
213     std::string desc = base::StringPrintf(
214         "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
215         "send_window_size_ [current: %d]",
216         delta_window_size, stream_id_, send_window_size_);
217     session_->ResetStream(stream_id_, ERR_HTTP2_FLOW_CONTROL_ERROR, desc);
218   }
219 }
220 
DecreaseSendWindowSize(int32_t delta_window_size)221 void SpdyStream::DecreaseSendWindowSize(int32_t delta_window_size) {
222   if (IsClosed())
223     return;
224 
225   // We only call this method when sending a frame. Therefore,
226   // |delta_window_size| should be within the valid frame size range.
227   DCHECK_GE(delta_window_size, 1);
228   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
229 
230   // |send_window_size_| should have been at least |delta_window_size| for
231   // this call to happen.
232   DCHECK_GE(send_window_size_, delta_window_size);
233 
234   send_window_size_ -= delta_window_size;
235 
236   net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, [&] {
237     return NetLogSpdyStreamWindowUpdateParams(stream_id_, -delta_window_size,
238                                               send_window_size_);
239   });
240 }
241 
OnReadBufferConsumed(size_t consume_size,SpdyBuffer::ConsumeSource consume_source)242 void SpdyStream::OnReadBufferConsumed(
243     size_t consume_size,
244     SpdyBuffer::ConsumeSource consume_source) {
245   DCHECK_GE(consume_size, 1u);
246   DCHECK_LE(consume_size,
247             static_cast<size_t>(std::numeric_limits<int32_t>::max()));
248   IncreaseRecvWindowSize(static_cast<int32_t>(consume_size));
249 }
250 
IncreaseRecvWindowSize(int32_t delta_window_size)251 void SpdyStream::IncreaseRecvWindowSize(int32_t delta_window_size) {
252   // By the time a read is processed by the delegate, this stream may
253   // already be inactive.
254   if (!session_->IsStreamActive(stream_id_))
255     return;
256 
257   DCHECK_GE(unacked_recv_window_bytes_, 0);
258   DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
259   DCHECK_GE(delta_window_size, 1);
260   // Check for overflow.
261   DCHECK_LE(delta_window_size,
262             std::numeric_limits<int32_t>::max() - recv_window_size_);
263 
264   recv_window_size_ += delta_window_size;
265   net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, [&] {
266     return NetLogSpdyStreamWindowUpdateParams(stream_id_, delta_window_size,
267                                               recv_window_size_);
268   });
269 
270   // Update the receive window once half of the buffer is ready to be acked
271   // to prevent excessive window updates on fast downloads. Also send an update
272   // if too much time has elapsed since the last update to deal with
273   // slow-reading clients so the server doesn't think the stream is idle.
274   unacked_recv_window_bytes_ += delta_window_size;
275   const base::TimeDelta elapsed =
276       base::TimeTicks::Now() - last_recv_window_update_;
277   if (unacked_recv_window_bytes_ > max_recv_window_size_ / 2 ||
278       elapsed >= session_->TimeToBufferSmallWindowUpdates()) {
279     last_recv_window_update_ = base::TimeTicks::Now();
280     session_->SendStreamWindowUpdate(
281         stream_id_, static_cast<uint32_t>(unacked_recv_window_bytes_));
282     unacked_recv_window_bytes_ = 0;
283   }
284 }
285 
DecreaseRecvWindowSize(int32_t delta_window_size)286 void SpdyStream::DecreaseRecvWindowSize(int32_t delta_window_size) {
287   DCHECK(session_->IsStreamActive(stream_id_));
288   DCHECK_GE(delta_window_size, 1);
289 
290   // The receiving window size as the peer knows it is
291   // |recv_window_size_ - unacked_recv_window_bytes_|, if more data are sent by
292   // the peer, that means that the receive window is not being respected.
293   if (delta_window_size > recv_window_size_ - unacked_recv_window_bytes_) {
294     session_->ResetStream(
295         stream_id_, ERR_HTTP2_FLOW_CONTROL_ERROR,
296         "delta_window_size is " + base::NumberToString(delta_window_size) +
297             " in DecreaseRecvWindowSize, which is larger than the receive " +
298             "window size of " + base::NumberToString(recv_window_size_));
299     return;
300   }
301 
302   recv_window_size_ -= delta_window_size;
303   net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, [&] {
304     return NetLogSpdyStreamWindowUpdateParams(stream_id_, -delta_window_size,
305                                               recv_window_size_);
306   });
307 }
308 
GetPeerAddress(IPEndPoint * address) const309 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
310   return session_->GetPeerAddress(address);
311 }
312 
GetLocalAddress(IPEndPoint * address) const313 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
314   return session_->GetLocalAddress(address);
315 }
316 
WasEverUsed() const317 bool SpdyStream::WasEverUsed() const {
318   return session_->WasEverUsed();
319 }
320 
GetRequestTime() const321 base::Time SpdyStream::GetRequestTime() const {
322   return request_time_;
323 }
324 
SetRequestTime(base::Time t)325 void SpdyStream::SetRequestTime(base::Time t) {
326   request_time_ = t;
327 }
328 
OnHeadersReceived(const quiche::HttpHeaderBlock & response_headers,base::Time response_time,base::TimeTicks recv_first_byte_time)329 void SpdyStream::OnHeadersReceived(
330     const quiche::HttpHeaderBlock& response_headers,
331     base::Time response_time,
332     base::TimeTicks recv_first_byte_time) {
333   switch (response_state_) {
334     case READY_FOR_HEADERS: {
335       // No header block has been received yet.
336       DCHECK(response_headers_.empty());
337 
338       quiche::HttpHeaderBlock::const_iterator it =
339           response_headers.find(spdy::kHttp2StatusHeader);
340       if (it == response_headers.end()) {
341         const std::string error("Response headers do not include :status.");
342         LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
343         session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
344         return;
345       }
346 
347       int status;
348       if (!base::StringToInt(it->second, &status)) {
349         const std::string error("Cannot parse :status.");
350         LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
351         session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
352         return;
353       }
354 
355       base::UmaHistogramSparse("Net.SpdyResponseCode", status);
356 
357       // Include informational responses (1xx) in the TTFB as per the resource
358       // timing spec for responseStart.
359       if (recv_first_byte_time_.is_null())
360         recv_first_byte_time_ = recv_first_byte_time;
361       // Also record the TTFB of non-informational responses.
362       if (status / 100 != 1) {
363         DCHECK(recv_first_byte_time_for_non_informational_response_.is_null());
364         recv_first_byte_time_for_non_informational_response_ =
365             recv_first_byte_time;
366       }
367 
368       // Handle informational responses (1xx):
369       // * Pass through 101 Switching Protocols, because broken servers might
370       //   send this as a response to a WebSocket request, in which case it
371       //   needs to pass through so that the WebSocket layer can signal an
372       //   error.
373       // * Plumb 103 Early Hints to the delegate.
374       // * Ignore other informational responses.
375       if (status / 100 == 1 && status != HTTP_SWITCHING_PROTOCOLS) {
376         if (status == HTTP_EARLY_HINTS)
377           OnEarlyHintsReceived(response_headers, recv_first_byte_time);
378         return;
379       }
380 
381       response_state_ = READY_FOR_DATA_OR_TRAILERS;
382 
383       switch (type_) {
384         case SPDY_BIDIRECTIONAL_STREAM:
385         case SPDY_REQUEST_RESPONSE_STREAM:
386           // A bidirectional stream or a request/response stream is ready for
387           // the response headers only after request headers are sent.
388           if (io_state_ == STATE_IDLE) {
389             const std::string error("Response received before request sent.");
390             LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
391             session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
392             return;
393           }
394           break;
395       }
396 
397       DCHECK_NE(io_state_, STATE_IDLE);
398 
399       response_time_ = response_time;
400       SaveResponseHeaders(response_headers, status);
401 
402       break;
403     }
404     case READY_FOR_DATA_OR_TRAILERS:
405       // Second header block is trailers.
406       response_state_ = TRAILERS_RECEIVED;
407       delegate_->OnTrailers(response_headers);
408       break;
409 
410     case TRAILERS_RECEIVED:
411       // No further header blocks are allowed after trailers.
412       const std::string error("Header block received after trailers.");
413       LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
414       session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
415       break;
416   }
417 }
418 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)419 void SpdyStream::OnDataReceived(std::unique_ptr<SpdyBuffer> buffer) {
420   DCHECK(session_->IsStreamActive(stream_id_));
421 
422   if (response_state_ == READY_FOR_HEADERS) {
423     const std::string error("DATA received before headers.");
424     LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
425     session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
426     return;
427   }
428 
429   if (response_state_ == TRAILERS_RECEIVED && buffer) {
430     const std::string error("DATA received after trailers.");
431     LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
432     session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
433     return;
434   }
435 
436   if (io_state_ == STATE_HALF_CLOSED_REMOTE) {
437     const std::string error("DATA received on half-closed (remove) stream.");
438     LogStreamError(ERR_HTTP2_STREAM_CLOSED, error);
439     session_->ResetStream(stream_id_, ERR_HTTP2_STREAM_CLOSED, error);
440     return;
441   }
442 
443   // Track our bandwidth.
444   recv_bytes_ += buffer ? buffer->GetRemainingSize() : 0;
445   recv_last_byte_time_ = base::TimeTicks::Now();
446 
447   CHECK(!IsClosed());
448 
449   if (!buffer) {
450     if (io_state_ == STATE_OPEN) {
451       io_state_ = STATE_HALF_CLOSED_REMOTE;
452       // Inform the delegate of EOF. This may delete |this|.
453       delegate_->OnDataReceived(nullptr);
454     } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) {
455       io_state_ = STATE_CLOSED;
456       // Deletes |this|.
457       session_->CloseActiveStream(stream_id_, OK);
458     } else {
459       NOTREACHED() << io_state_;
460     }
461     return;
462   }
463 
464   size_t length = buffer->GetRemainingSize();
465   DCHECK_LE(length, spdy::kHttp2DefaultFramePayloadLimit);
466   base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
467   // May close the stream.
468   DecreaseRecvWindowSize(static_cast<int32_t>(length));
469   if (!weak_this)
470     return;
471   buffer->AddConsumeCallback(
472       base::BindRepeating(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
473 
474   // May close |this|.
475   delegate_->OnDataReceived(std::move(buffer));
476 }
477 
OnPaddingConsumed(size_t len)478 void SpdyStream::OnPaddingConsumed(size_t len) {
479   // Decrease window size because padding bytes are received.
480   // Increase window size because padding bytes are consumed (by discarding).
481   // Net result: |unacked_recv_window_bytes_| increases by |len|,
482   // |recv_window_size_| does not change.
483   base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
484   // May close the stream.
485   DecreaseRecvWindowSize(static_cast<int32_t>(len));
486   if (!weak_this)
487     return;
488   IncreaseRecvWindowSize(static_cast<int32_t>(len));
489 }
490 
OnFrameWriteComplete(spdy::SpdyFrameType frame_type,size_t frame_size)491 void SpdyStream::OnFrameWriteComplete(spdy::SpdyFrameType frame_type,
492                                       size_t frame_size) {
493   if (frame_type != spdy::SpdyFrameType::HEADERS &&
494       frame_type != spdy::SpdyFrameType::DATA) {
495     return;
496   }
497 
498   int result = (frame_type == spdy::SpdyFrameType::HEADERS)
499                    ? OnHeadersSent()
500                    : OnDataSent(frame_size);
501   if (result == ERR_IO_PENDING) {
502     // The write operation hasn't completed yet.
503     return;
504   }
505 
506   if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
507     if (io_state_ == STATE_OPEN) {
508       io_state_ = STATE_HALF_CLOSED_LOCAL;
509     } else if (io_state_ == STATE_HALF_CLOSED_REMOTE) {
510       io_state_ = STATE_CLOSED;
511     } else {
512       NOTREACHED() << io_state_;
513     }
514   }
515   // Notify delegate of write completion. Must not destroy |this|.
516   CHECK(delegate_);
517   {
518     base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
519     write_handler_guard_ = true;
520     if (frame_type == spdy::SpdyFrameType::HEADERS) {
521       delegate_->OnHeadersSent();
522     } else {
523       delegate_->OnDataSent();
524     }
525     CHECK(weak_this);
526     write_handler_guard_ = false;
527   }
528 
529   if (io_state_ == STATE_CLOSED) {
530     // Deletes |this|.
531     session_->CloseActiveStream(stream_id_, OK);
532   }
533 }
534 
OnHeadersSent()535 int SpdyStream::OnHeadersSent() {
536   CHECK_EQ(io_state_, STATE_IDLE);
537   CHECK_NE(stream_id_, 0u);
538 
539   io_state_ = STATE_OPEN;
540   return OK;
541 }
542 
OnDataSent(size_t frame_size)543 int SpdyStream::OnDataSent(size_t frame_size) {
544   CHECK(io_state_ == STATE_OPEN ||
545         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
546 
547   size_t frame_payload_size = frame_size - spdy::kDataFrameMinimumSize;
548 
549   CHECK_GE(frame_size, spdy::kDataFrameMinimumSize);
550   CHECK_LE(frame_payload_size, spdy::kHttp2DefaultFramePayloadLimit);
551 
552   // If more data is available to send, dispatch it and
553   // return that the write operation is still ongoing.
554   pending_send_data_->DidConsume(frame_payload_size);
555   if (pending_send_data_->BytesRemaining() > 0) {
556     QueueNextDataFrame();
557     return ERR_IO_PENDING;
558   } else {
559     pending_send_data_ = nullptr;
560     return OK;
561   }
562 }
563 
LogStreamError(int error,std::string_view description)564 void SpdyStream::LogStreamError(int error, std::string_view description) {
565   net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_ERROR, [&] {
566     return NetLogSpdyStreamErrorParams(stream_id_, error, description);
567   });
568 }
569 
OnClose(int status)570 void SpdyStream::OnClose(int status) {
571   // In most cases, the stream should already be CLOSED. The exception is when a
572   // SpdySession is shutting down while the stream is in an intermediate state.
573   io_state_ = STATE_CLOSED;
574   if (status == ERR_HTTP2_RST_STREAM_NO_ERROR_RECEIVED) {
575     if (response_state_ == READY_FOR_HEADERS) {
576       status = ERR_HTTP2_PROTOCOL_ERROR;
577     } else {
578       status = OK;
579     }
580   }
581   Delegate* delegate = delegate_;
582   delegate_ = nullptr;
583   if (delegate)
584     delegate->OnClose(status);
585   // Unset |stream_id_| last so that the delegate can look it up.
586   stream_id_ = 0;
587 }
588 
Cancel(int error)589 void SpdyStream::Cancel(int error) {
590   // We may be called again from a delegate's OnClose().
591   if (io_state_ == STATE_CLOSED)
592     return;
593 
594   if (stream_id_ != 0) {
595     session_->ResetStream(stream_id_, error, std::string());
596   } else {
597     session_->CloseCreatedStream(GetWeakPtr(), error);
598   }
599   // |this| is invalid at this point.
600 }
601 
Close()602 void SpdyStream::Close() {
603   // We may be called again from a delegate's OnClose().
604   if (io_state_ == STATE_CLOSED)
605     return;
606 
607   if (stream_id_ != 0) {
608     session_->CloseActiveStream(stream_id_, OK);
609   } else {
610     session_->CloseCreatedStream(GetWeakPtr(), OK);
611   }
612   // |this| is invalid at this point.
613 }
614 
GetWeakPtr()615 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
616   return weak_ptr_factory_.GetWeakPtr();
617 }
618 
SendRequestHeaders(quiche::HttpHeaderBlock request_headers,SpdySendStatus send_status)619 int SpdyStream::SendRequestHeaders(quiche::HttpHeaderBlock request_headers,
620                                    SpdySendStatus send_status) {
621   net_log_.AddEvent(
622       NetLogEventType::HTTP_TRANSACTION_HTTP2_SEND_REQUEST_HEADERS,
623       [&](NetLogCaptureMode capture_mode) {
624         return HttpHeaderBlockNetLogParams(&request_headers, capture_mode);
625       });
626   CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
627   CHECK(!request_headers_valid_);
628   CHECK(!pending_send_data_.get());
629   CHECK_EQ(io_state_, STATE_IDLE);
630   request_headers_ = std::move(request_headers);
631   request_headers_valid_ = true;
632   pending_send_status_ = send_status;
633   session_->EnqueueStreamWrite(
634       GetWeakPtr(), spdy::SpdyFrameType::HEADERS,
635       std::make_unique<HeadersBufferProducer>(GetWeakPtr()));
636   return ERR_IO_PENDING;
637 }
638 
SendData(IOBuffer * data,int length,SpdySendStatus send_status)639 void SpdyStream::SendData(IOBuffer* data,
640                           int length,
641                           SpdySendStatus send_status) {
642   CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
643   CHECK(io_state_ == STATE_OPEN ||
644         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
645   CHECK(!pending_send_data_.get());
646   pending_send_data_ = base::MakeRefCounted<DrainableIOBuffer>(data, length);
647   pending_send_status_ = send_status;
648   QueueNextDataFrame();
649 }
650 
GetSSLInfo(SSLInfo * ssl_info) const651 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info) const {
652   return session_->GetSSLInfo(ssl_info);
653 }
654 
GetNegotiatedProtocol() const655 NextProto SpdyStream::GetNegotiatedProtocol() const {
656   return session_->GetNegotiatedProtocol();
657 }
658 
PossiblyResumeIfSendStalled()659 SpdyStream::ShouldRequeueStream SpdyStream::PossiblyResumeIfSendStalled() {
660   if (IsLocallyClosed() || !send_stalled_by_flow_control_)
661     return DoNotRequeue;
662   if (session_->IsSendStalled() || send_window_size_ <= 0) {
663     return Requeue;
664   }
665   net_log_.AddEventWithIntParams(
666       NetLogEventType::HTTP2_STREAM_FLOW_CONTROL_UNSTALLED, "stream_id",
667       stream_id_);
668   send_stalled_by_flow_control_ = false;
669   QueueNextDataFrame();
670   return DoNotRequeue;
671 }
672 
IsClosed() const673 bool SpdyStream::IsClosed() const {
674   return io_state_ == STATE_CLOSED;
675 }
676 
IsLocallyClosed() const677 bool SpdyStream::IsLocallyClosed() const {
678   return io_state_ == STATE_HALF_CLOSED_LOCAL || io_state_ == STATE_CLOSED;
679 }
680 
IsIdle() const681 bool SpdyStream::IsIdle() const {
682   return io_state_ == STATE_IDLE;
683 }
684 
IsOpen() const685 bool SpdyStream::IsOpen() const {
686   return io_state_ == STATE_OPEN;
687 }
688 
IsReservedRemote() const689 bool SpdyStream::IsReservedRemote() const {
690   return io_state_ == STATE_RESERVED_REMOTE;
691 }
692 
AddRawReceivedBytes(size_t received_bytes)693 void SpdyStream::AddRawReceivedBytes(size_t received_bytes) {
694   raw_received_bytes_ += received_bytes;
695 }
696 
AddRawSentBytes(size_t sent_bytes)697 void SpdyStream::AddRawSentBytes(size_t sent_bytes) {
698   raw_sent_bytes_ += sent_bytes;
699 }
700 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const701 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
702   if (stream_id_ == 0)
703     return false;
704   bool result = session_->GetLoadTimingInfo(stream_id_, load_timing_info);
705   // TODO(acomminos): recv_first_byte_time_ is actually the time after all
706   // headers have been parsed. We should add support for reporting the time the
707   // first bytes of the HEADERS frame were received to BufferedSpdyFramer
708   // (https://crbug.com/568024).
709   load_timing_info->receive_headers_start = recv_first_byte_time_;
710   load_timing_info->receive_non_informational_headers_start =
711       recv_first_byte_time_for_non_informational_response_;
712   load_timing_info->first_early_hints_time = first_early_hints_time_;
713   return result;
714 }
715 
QueueNextDataFrame()716 void SpdyStream::QueueNextDataFrame() {
717   // Until the request has been completely sent, we cannot be sure
718   // that our stream_id is correct.
719   CHECK(io_state_ == STATE_OPEN ||
720         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
721   CHECK_GT(stream_id_, 0u);
722   CHECK(pending_send_data_.get());
723   // Only the final fame may have a length of 0.
724   if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
725     CHECK_GE(pending_send_data_->BytesRemaining(), 0);
726   } else {
727     CHECK_GT(pending_send_data_->BytesRemaining(), 0);
728   }
729 
730   spdy::SpdyDataFlags flags = (pending_send_status_ == NO_MORE_DATA_TO_SEND)
731                                   ? spdy::DATA_FLAG_FIN
732                                   : spdy::DATA_FLAG_NONE;
733   int effective_len;
734   bool end_stream;
735   std::unique_ptr<SpdyBuffer> data_buffer(
736       session_->CreateDataBuffer(stream_id_, pending_send_data_.get(),
737                                  pending_send_data_->BytesRemaining(), flags,
738                                  &effective_len, &end_stream));
739   // We'll get called again by PossiblyResumeIfSendStalled().
740   if (!data_buffer)
741     return;
742 
743   DCHECK_GE(data_buffer->GetRemainingSize(), spdy::kDataFrameMinimumSize);
744   size_t payload_size =
745       data_buffer->GetRemainingSize() - spdy::kDataFrameMinimumSize;
746   DCHECK_LE(payload_size, spdy::kHttp2DefaultFramePayloadLimit);
747 
748   // Send window size is based on payload size, so nothing to do if this is
749   // just a FIN with no payload.
750   if (payload_size != 0) {
751     DecreaseSendWindowSize(static_cast<int32_t>(payload_size));
752     // This currently isn't strictly needed, since write frames are
753     // discarded only if the stream is about to be closed. But have it
754     // here anyway just in case this changes.
755     data_buffer->AddConsumeCallback(base::BindRepeating(
756         &SpdyStream::OnWriteBufferConsumed, GetWeakPtr(), payload_size));
757   }
758 
759   if (session_->GreasedFramesEnabled() && delegate_ &&
760       delegate_->CanGreaseFrameType()) {
761     session_->EnqueueGreasedFrame(GetWeakPtr());
762   }
763 
764   session_->net_log().AddEvent(NetLogEventType::HTTP2_SESSION_SEND_DATA, [&] {
765     return NetLogSpdyDataParams(stream_id_, effective_len, end_stream);
766   });
767 
768   session_->EnqueueStreamWrite(
769       GetWeakPtr(), spdy::SpdyFrameType::DATA,
770       std::make_unique<SimpleBufferProducer>(std::move(data_buffer)));
771 }
772 
OnEarlyHintsReceived(const quiche::HttpHeaderBlock & response_headers,base::TimeTicks recv_first_byte_time)773 void SpdyStream::OnEarlyHintsReceived(
774     const quiche::HttpHeaderBlock& response_headers,
775     base::TimeTicks recv_first_byte_time) {
776   // Record the timing of the 103 Early Hints response for the experiment
777   // (https://crbug.com/1093693).
778   if (first_early_hints_time_.is_null())
779     first_early_hints_time_ = recv_first_byte_time;
780 
781   // Transfer-encoding is a connection specific header.
782   if (response_headers.find("transfer-encoding") != response_headers.end()) {
783     const char error[] = "Received transfer-encoding header";
784     LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
785     session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
786     return;
787   }
788 
789   if (type_ != SPDY_REQUEST_RESPONSE_STREAM || io_state_ == STATE_IDLE) {
790     const char error[] = "Early Hints received before request sent.";
791     LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
792     session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
793     return;
794   }
795 
796   // `delegate_` must be attached at this point when `type_` is
797   // SPDY_REQUEST_RESPONSE_STREAM.
798   CHECK(delegate_);
799   delegate_->OnEarlyHintsReceived(response_headers);
800 }
801 
SaveResponseHeaders(const quiche::HttpHeaderBlock & response_headers,int status)802 void SpdyStream::SaveResponseHeaders(
803     const quiche::HttpHeaderBlock& response_headers,
804     int status) {
805   if (response_headers.contains("transfer-encoding")) {
806     session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR,
807                           "Received transfer-encoding header");
808     return;
809   }
810 
811   DCHECK(response_headers_.empty());
812   response_headers_ = response_headers.Clone();
813 
814   // If delegate is not yet attached, OnHeadersReceived() will be called after
815   // the delegate gets attached to the stream.
816   if (!delegate_)
817     return;
818 
819   delegate_->OnHeadersReceived(response_headers_);
820 }
821 
822 #define STATE_CASE(s)                                       \
823   case s:                                                   \
824     description = base::StringPrintf("%s (0x%08X)", #s, s); \
825     break
826 
DescribeState(State state)827 std::string SpdyStream::DescribeState(State state) {
828   std::string description;
829   switch (state) {
830     STATE_CASE(STATE_IDLE);
831     STATE_CASE(STATE_OPEN);
832     STATE_CASE(STATE_HALF_CLOSED_LOCAL);
833     STATE_CASE(STATE_CLOSED);
834     default:
835       description =
836           base::StringPrintf("Unknown state 0x%08X (%u)", state, state);
837       break;
838   }
839   return description;
840 }
841 
842 #undef STATE_CASE
843 
844 }  // namespace net
845