1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_stream.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <utility>
10
11 #include "base/check_op.h"
12 #include "base/compiler_specific.h"
13 #include "base/functional/bind.h"
14 #include "base/location.h"
15 #include "base/metrics/histogram_functions.h"
16 #include "base/metrics/histogram_macros.h"
17 #include "base/notreached.h"
18 #include "base/strings/string_number_conversions.h"
19 #include "base/strings/stringprintf.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "base/trace_event/memory_usage_estimator.h"
22 #include "base/values.h"
23 #include "net/base/load_timing_info.h"
24 #include "net/http/http_status_code.h"
25 #include "net/log/net_log.h"
26 #include "net/log/net_log_capture_mode.h"
27 #include "net/log/net_log_event_type.h"
28 #include "net/spdy/spdy_buffer_producer.h"
29 #include "net/spdy/spdy_http_utils.h"
30 #include "net/spdy/spdy_session.h"
31
32 namespace net {
33
34 namespace {
35
NetLogSpdyStreamErrorParams(spdy::SpdyStreamId stream_id,int net_error,base::StringPiece description)36 base::Value::Dict NetLogSpdyStreamErrorParams(spdy::SpdyStreamId stream_id,
37 int net_error,
38 base::StringPiece description) {
39 base::Value::Dict dict;
40 dict.Set("stream_id", static_cast<int>(stream_id));
41 dict.Set("net_error", ErrorToShortString(net_error));
42 dict.Set("description", description);
43 return dict;
44 }
45
NetLogSpdyStreamWindowUpdateParams(spdy::SpdyStreamId stream_id,int32_t delta,int32_t window_size)46 base::Value::Dict NetLogSpdyStreamWindowUpdateParams(
47 spdy::SpdyStreamId stream_id,
48 int32_t delta,
49 int32_t window_size) {
50 base::Value::Dict dict;
51 dict.Set("stream_id", static_cast<int>(stream_id));
52 dict.Set("delta", delta);
53 dict.Set("window_size", window_size);
54 return dict;
55 }
56
NetLogSpdyDataParams(spdy::SpdyStreamId stream_id,int size,bool fin)57 base::Value::Dict NetLogSpdyDataParams(spdy::SpdyStreamId stream_id,
58 int size,
59 bool fin) {
60 base::Value::Dict dict;
61 dict.Set("stream_id", static_cast<int>(stream_id));
62 dict.Set("size", size);
63 dict.Set("fin", fin);
64 return dict;
65 }
66
67 } // namespace
68
69 // A wrapper around a stream that calls into ProduceHeadersFrame().
70 class SpdyStream::HeadersBufferProducer : public SpdyBufferProducer {
71 public:
HeadersBufferProducer(const base::WeakPtr<SpdyStream> & stream)72 explicit HeadersBufferProducer(const base::WeakPtr<SpdyStream>& stream)
73 : stream_(stream) {
74 DCHECK(stream_.get());
75 }
76
77 ~HeadersBufferProducer() override = default;
78
ProduceBuffer()79 std::unique_ptr<SpdyBuffer> ProduceBuffer() override {
80 if (!stream_.get()) {
81 NOTREACHED();
82 return nullptr;
83 }
84 DCHECK_GT(stream_->stream_id(), 0u);
85 return std::make_unique<SpdyBuffer>(stream_->ProduceHeadersFrame());
86 }
87
88 private:
89 const base::WeakPtr<SpdyStream> stream_;
90 };
91
SpdyStream(SpdyStreamType type,const base::WeakPtr<SpdySession> & session,const GURL & url,RequestPriority priority,int32_t initial_send_window_size,int32_t max_recv_window_size,const NetLogWithSource & net_log,const NetworkTrafficAnnotationTag & traffic_annotation,bool detect_broken_connection)92 SpdyStream::SpdyStream(SpdyStreamType type,
93 const base::WeakPtr<SpdySession>& session,
94 const GURL& url,
95 RequestPriority priority,
96 int32_t initial_send_window_size,
97 int32_t max_recv_window_size,
98 const NetLogWithSource& net_log,
99 const NetworkTrafficAnnotationTag& traffic_annotation,
100 bool detect_broken_connection)
101 : type_(type),
102 url_(url),
103 priority_(priority),
104 send_window_size_(initial_send_window_size),
105 max_recv_window_size_(max_recv_window_size),
106 recv_window_size_(max_recv_window_size),
107 last_recv_window_update_(base::TimeTicks::Now()),
108 session_(session),
109 request_time_(base::Time::Now()),
110 net_log_(net_log),
111 traffic_annotation_(traffic_annotation),
112 detect_broken_connection_(detect_broken_connection) {
113 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
114 type_ == SPDY_REQUEST_RESPONSE_STREAM ||
115 type_ == SPDY_PUSH_STREAM);
116 CHECK_GE(priority_, MINIMUM_PRIORITY);
117 CHECK_LE(priority_, MAXIMUM_PRIORITY);
118 }
119
~SpdyStream()120 SpdyStream::~SpdyStream() {
121 CHECK(!write_handler_guard_);
122 }
123
SetDelegate(Delegate * delegate)124 void SpdyStream::SetDelegate(Delegate* delegate) {
125 CHECK(!delegate_);
126 CHECK(delegate);
127 delegate_ = delegate;
128
129 CHECK(io_state_ == STATE_IDLE ||
130 io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
131 io_state_ == STATE_RESERVED_REMOTE);
132
133 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
134 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
135 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
136 FROM_HERE,
137 base::BindOnce(&SpdyStream::PushedStreamReplay, GetWeakPtr()));
138 }
139 }
140
PushedStreamReplay()141 void SpdyStream::PushedStreamReplay() {
142 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
143 DCHECK_NE(stream_id_, 0u);
144 CHECK_EQ(stream_id_ % 2, 0u);
145
146 CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
147 io_state_ = STATE_HALF_CLOSED_LOCAL;
148
149 // The delegate methods called below may delete |this|, so use
150 // |weak_this| to detect that.
151 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
152
153 CHECK(delegate_);
154 delegate_->OnHeadersReceived(response_headers_, &request_headers_);
155
156 // OnHeadersReceived() may have closed |this|.
157 if (!weak_this)
158 return;
159
160 while (!pending_recv_data_.empty()) {
161 // Take ownership of the first element of |pending_recv_data_|.
162 std::unique_ptr<SpdyBuffer> buffer = std::move(pending_recv_data_.at(0));
163 pending_recv_data_.erase(pending_recv_data_.begin());
164
165 bool eof = (buffer == nullptr);
166
167 CHECK(delegate_);
168 delegate_->OnDataReceived(std::move(buffer));
169
170 // OnDataReceived() may have closed |this|.
171 if (!weak_this)
172 return;
173
174 if (eof) {
175 DCHECK(pending_recv_data_.empty());
176 session_->CloseActiveStream(stream_id_, OK);
177 DCHECK(!weak_this);
178 // |pending_recv_data_| is invalid at this point.
179 break;
180 }
181 }
182 }
183
ProduceHeadersFrame()184 std::unique_ptr<spdy::SpdySerializedFrame> SpdyStream::ProduceHeadersFrame() {
185 CHECK_EQ(io_state_, STATE_IDLE);
186 CHECK(request_headers_valid_);
187 CHECK_GT(stream_id_, 0u);
188
189 spdy::SpdyControlFlags flags = (pending_send_status_ == NO_MORE_DATA_TO_SEND)
190 ? spdy::CONTROL_FLAG_FIN
191 : spdy::CONTROL_FLAG_NONE;
192 std::unique_ptr<spdy::SpdySerializedFrame> frame(session_->CreateHeaders(
193 stream_id_, priority_, flags, std::move(request_headers_),
194 delegate_->source_dependency()));
195 request_headers_valid_ = false;
196 send_time_ = base::TimeTicks::Now();
197 return frame;
198 }
199
DetachDelegate()200 void SpdyStream::DetachDelegate() {
201 DCHECK(!IsClosed());
202 delegate_ = nullptr;
203 Cancel(ERR_ABORTED);
204 }
205
SetPriority(RequestPriority priority)206 void SpdyStream::SetPriority(RequestPriority priority) {
207 if (priority_ == priority) {
208 return;
209 }
210
211 session_->UpdateStreamPriority(this, /* old_priority = */ priority_,
212 /* new_priority = */ priority);
213
214 priority_ = priority;
215 }
216
AdjustSendWindowSize(int32_t delta_window_size)217 bool SpdyStream::AdjustSendWindowSize(int32_t delta_window_size) {
218 if (IsClosed())
219 return true;
220
221 if (delta_window_size > 0) {
222 if (send_window_size_ >
223 std::numeric_limits<int32_t>::max() - delta_window_size) {
224 return false;
225 }
226 } else {
227 // Minimum allowed value for spdy::SETTINGS_INITIAL_WINDOW_SIZE is 0 and
228 // maximum is 2^31-1. Data are not sent when |send_window_size_ < 0|, that
229 // is, |send_window_size_ | can only decrease by a change in
230 // spdy::SETTINGS_INITIAL_WINDOW_SIZE. Therefore |send_window_size_| should
231 // never be able to become less than -(2^31-1).
232 DCHECK_LE(std::numeric_limits<int32_t>::min() - delta_window_size,
233 send_window_size_);
234 }
235
236 send_window_size_ += delta_window_size;
237
238 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, [&] {
239 return NetLogSpdyStreamWindowUpdateParams(stream_id_, delta_window_size,
240 send_window_size_);
241 });
242
243 PossiblyResumeIfSendStalled();
244 return true;
245 }
246
OnWriteBufferConsumed(size_t frame_payload_size,size_t consume_size,SpdyBuffer::ConsumeSource consume_source)247 void SpdyStream::OnWriteBufferConsumed(
248 size_t frame_payload_size,
249 size_t consume_size,
250 SpdyBuffer::ConsumeSource consume_source) {
251 if (consume_source == SpdyBuffer::DISCARD) {
252 // If we're discarding a frame or part of it, increase the send
253 // window by the number of discarded bytes. (Although if we're
254 // discarding part of a frame, it's probably because of a write
255 // error and we'll be tearing down the stream soon.)
256 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
257 DCHECK_GT(remaining_payload_bytes, 0u);
258 IncreaseSendWindowSize(static_cast<int32_t>(remaining_payload_bytes));
259 }
260 // For consumed bytes, the send window is increased when we receive
261 // a WINDOW_UPDATE frame.
262 }
263
IncreaseSendWindowSize(int32_t delta_window_size)264 void SpdyStream::IncreaseSendWindowSize(int32_t delta_window_size) {
265 DCHECK_GE(delta_window_size, 1);
266
267 if (!AdjustSendWindowSize(delta_window_size)) {
268 std::string desc = base::StringPrintf(
269 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
270 "send_window_size_ [current: %d]",
271 delta_window_size, stream_id_, send_window_size_);
272 session_->ResetStream(stream_id_, ERR_HTTP2_FLOW_CONTROL_ERROR, desc);
273 }
274 }
275
DecreaseSendWindowSize(int32_t delta_window_size)276 void SpdyStream::DecreaseSendWindowSize(int32_t delta_window_size) {
277 if (IsClosed())
278 return;
279
280 // We only call this method when sending a frame. Therefore,
281 // |delta_window_size| should be within the valid frame size range.
282 DCHECK_GE(delta_window_size, 1);
283 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
284
285 // |send_window_size_| should have been at least |delta_window_size| for
286 // this call to happen.
287 DCHECK_GE(send_window_size_, delta_window_size);
288
289 send_window_size_ -= delta_window_size;
290
291 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, [&] {
292 return NetLogSpdyStreamWindowUpdateParams(stream_id_, -delta_window_size,
293 send_window_size_);
294 });
295 }
296
OnReadBufferConsumed(size_t consume_size,SpdyBuffer::ConsumeSource consume_source)297 void SpdyStream::OnReadBufferConsumed(
298 size_t consume_size,
299 SpdyBuffer::ConsumeSource consume_source) {
300 DCHECK_GE(consume_size, 1u);
301 DCHECK_LE(consume_size,
302 static_cast<size_t>(std::numeric_limits<int32_t>::max()));
303 IncreaseRecvWindowSize(static_cast<int32_t>(consume_size));
304 }
305
IncreaseRecvWindowSize(int32_t delta_window_size)306 void SpdyStream::IncreaseRecvWindowSize(int32_t delta_window_size) {
307 // By the time a read is processed by the delegate, this stream may
308 // already be inactive.
309 if (!session_->IsStreamActive(stream_id_))
310 return;
311
312 DCHECK_GE(unacked_recv_window_bytes_, 0);
313 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
314 DCHECK_GE(delta_window_size, 1);
315 // Check for overflow.
316 DCHECK_LE(delta_window_size,
317 std::numeric_limits<int32_t>::max() - recv_window_size_);
318
319 recv_window_size_ += delta_window_size;
320 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, [&] {
321 return NetLogSpdyStreamWindowUpdateParams(stream_id_, delta_window_size,
322 recv_window_size_);
323 });
324
325 // Update the receive window once half of the buffer is ready to be acked
326 // to prevent excessive window updates on fast downloads. Also send an update
327 // if too much time has elapsed since the last update to deal with
328 // slow-reading clients so the server doesn't think the stream is idle.
329 unacked_recv_window_bytes_ += delta_window_size;
330 const base::TimeDelta elapsed =
331 base::TimeTicks::Now() - last_recv_window_update_;
332 if (unacked_recv_window_bytes_ > max_recv_window_size_ / 2 ||
333 elapsed >= session_->TimeToBufferSmallWindowUpdates()) {
334 last_recv_window_update_ = base::TimeTicks::Now();
335 session_->SendStreamWindowUpdate(
336 stream_id_, static_cast<uint32_t>(unacked_recv_window_bytes_));
337 unacked_recv_window_bytes_ = 0;
338 }
339 }
340
DecreaseRecvWindowSize(int32_t delta_window_size)341 void SpdyStream::DecreaseRecvWindowSize(int32_t delta_window_size) {
342 DCHECK(session_->IsStreamActive(stream_id_));
343 DCHECK_GE(delta_window_size, 1);
344
345 // The receiving window size as the peer knows it is
346 // |recv_window_size_ - unacked_recv_window_bytes_|, if more data are sent by
347 // the peer, that means that the receive window is not being respected.
348 if (delta_window_size > recv_window_size_ - unacked_recv_window_bytes_) {
349 session_->ResetStream(
350 stream_id_, ERR_HTTP2_FLOW_CONTROL_ERROR,
351 "delta_window_size is " + base::NumberToString(delta_window_size) +
352 " in DecreaseRecvWindowSize, which is larger than the receive " +
353 "window size of " + base::NumberToString(recv_window_size_));
354 return;
355 }
356
357 recv_window_size_ -= delta_window_size;
358 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, [&] {
359 return NetLogSpdyStreamWindowUpdateParams(stream_id_, -delta_window_size,
360 recv_window_size_);
361 });
362 }
363
GetPeerAddress(IPEndPoint * address) const364 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
365 return session_->GetPeerAddress(address);
366 }
367
GetLocalAddress(IPEndPoint * address) const368 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
369 return session_->GetLocalAddress(address);
370 }
371
WasEverUsed() const372 bool SpdyStream::WasEverUsed() const {
373 return session_->WasEverUsed();
374 }
375
GetRequestTime() const376 base::Time SpdyStream::GetRequestTime() const {
377 return request_time_;
378 }
379
SetRequestTime(base::Time t)380 void SpdyStream::SetRequestTime(base::Time t) {
381 request_time_ = t;
382 }
383
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers,base::Time response_time,base::TimeTicks recv_first_byte_time)384 void SpdyStream::OnHeadersReceived(
385 const spdy::Http2HeaderBlock& response_headers,
386 base::Time response_time,
387 base::TimeTicks recv_first_byte_time) {
388 switch (response_state_) {
389 case READY_FOR_HEADERS: {
390 // No header block has been received yet.
391 DCHECK(response_headers_.empty());
392
393 spdy::Http2HeaderBlock::const_iterator it =
394 response_headers.find(spdy::kHttp2StatusHeader);
395 if (it == response_headers.end()) {
396 const std::string error("Response headers do not include :status.");
397 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
398 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
399 return;
400 }
401
402 int status;
403 if (!base::StringToInt(it->second, &status)) {
404 const std::string error("Cannot parse :status.");
405 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
406 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
407 return;
408 }
409
410 base::UmaHistogramSparse("Net.SpdyResponseCode", status);
411
412 // Include informational responses (1xx) in the TTFB as per the resource
413 // timing spec for responseStart.
414 if (recv_first_byte_time_.is_null())
415 recv_first_byte_time_ = recv_first_byte_time;
416 // Also record the TTFB of non-informational responses.
417 if (status / 100 != 1) {
418 DCHECK(recv_first_byte_time_for_non_informational_response_.is_null());
419 recv_first_byte_time_for_non_informational_response_ =
420 recv_first_byte_time;
421 }
422
423 // Handle informational responses (1xx):
424 // * Pass through 101 Switching Protocols, because broken servers might
425 // send this as a response to a WebSocket request, in which case it
426 // needs to pass through so that the WebSocket layer can signal an
427 // error.
428 // * Plumb 103 Early Hints to the delegate.
429 // * Ignore other informational responses.
430 if (status / 100 == 1 && status != HTTP_SWITCHING_PROTOCOLS) {
431 if (status == HTTP_EARLY_HINTS)
432 OnEarlyHintsReceived(response_headers, recv_first_byte_time);
433 return;
434 }
435
436 response_state_ = READY_FOR_DATA_OR_TRAILERS;
437
438 switch (type_) {
439 case SPDY_BIDIRECTIONAL_STREAM:
440 case SPDY_REQUEST_RESPONSE_STREAM:
441 // A bidirectional stream or a request/response stream is ready for
442 // the response headers only after request headers are sent.
443 if (io_state_ == STATE_IDLE) {
444 const std::string error("Response received before request sent.");
445 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
446 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
447 return;
448 }
449 break;
450
451 case SPDY_PUSH_STREAM:
452 // Push streams transition to a locally half-closed state upon
453 // headers. We must continue to buffer data while waiting for a call
454 // to SetDelegate() (which may not ever happen).
455 DCHECK_EQ(io_state_, STATE_RESERVED_REMOTE);
456 if (!delegate_) {
457 io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED;
458 } else {
459 io_state_ = STATE_HALF_CLOSED_LOCAL;
460 }
461 break;
462 }
463
464 DCHECK_NE(io_state_, STATE_IDLE);
465
466 response_time_ = response_time;
467 SaveResponseHeaders(response_headers, status);
468
469 break;
470 }
471 case READY_FOR_DATA_OR_TRAILERS:
472 // Second header block is trailers.
473 if (type_ == SPDY_PUSH_STREAM) {
474 const std::string error("Trailers not supported for push stream.");
475 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
476 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
477 return;
478 }
479
480 response_state_ = TRAILERS_RECEIVED;
481 delegate_->OnTrailers(response_headers);
482 break;
483
484 case TRAILERS_RECEIVED:
485 // No further header blocks are allowed after trailers.
486 const std::string error("Header block received after trailers.");
487 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
488 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
489 break;
490 }
491 }
492
ShouldRetryRSTPushStream() const493 bool SpdyStream::ShouldRetryRSTPushStream() const {
494 // Retry if the stream is a pushed stream, has been claimed, but did not yet
495 // receive response headers
496 return (response_headers_.empty() && type_ == SPDY_PUSH_STREAM && delegate_);
497 }
498
OnPushPromiseHeadersReceived(spdy::Http2HeaderBlock headers,GURL url)499 void SpdyStream::OnPushPromiseHeadersReceived(spdy::Http2HeaderBlock headers,
500 GURL url) {
501 CHECK(!request_headers_valid_);
502 CHECK_EQ(io_state_, STATE_IDLE);
503 CHECK_EQ(type_, SPDY_PUSH_STREAM);
504 DCHECK(!delegate_);
505
506 io_state_ = STATE_RESERVED_REMOTE;
507 request_headers_ = std::move(headers);
508 request_headers_valid_ = true;
509 }
510
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)511 void SpdyStream::OnDataReceived(std::unique_ptr<SpdyBuffer> buffer) {
512 DCHECK(session_->IsStreamActive(stream_id_));
513
514 if (response_state_ == READY_FOR_HEADERS) {
515 const std::string error("DATA received before headers.");
516 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
517 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
518 return;
519 }
520
521 if (response_state_ == TRAILERS_RECEIVED && buffer) {
522 const std::string error("DATA received after trailers.");
523 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
524 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
525 return;
526 }
527
528 if (io_state_ == STATE_HALF_CLOSED_REMOTE) {
529 const std::string error("DATA received on half-closed (remove) stream.");
530 LogStreamError(ERR_HTTP2_STREAM_CLOSED, error);
531 session_->ResetStream(stream_id_, ERR_HTTP2_STREAM_CLOSED, error);
532 return;
533 }
534
535 // Track our bandwidth.
536 recv_bytes_ += buffer ? buffer->GetRemainingSize() : 0;
537 recv_last_byte_time_ = base::TimeTicks::Now();
538
539 // If we're still buffering data for a push stream, we will do the check for
540 // data received with incomplete headers in PushedStreamReplay().
541 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
542 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
543 // It should be valid for this to happen in the server push case.
544 // We'll return received data when delegate gets attached to the stream.
545 if (buffer) {
546 pending_recv_data_.push_back(std::move(buffer));
547 } else {
548 pending_recv_data_.push_back(nullptr);
549 // Note: we leave the stream open in the session until the stream
550 // is claimed.
551 }
552 return;
553 }
554
555 CHECK(!IsClosed());
556
557 if (!buffer) {
558 if (io_state_ == STATE_OPEN) {
559 io_state_ = STATE_HALF_CLOSED_REMOTE;
560 // Inform the delegate of EOF. This may delete |this|.
561 delegate_->OnDataReceived(nullptr);
562 } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) {
563 io_state_ = STATE_CLOSED;
564 // Deletes |this|.
565 session_->CloseActiveStream(stream_id_, OK);
566 } else {
567 NOTREACHED() << io_state_;
568 }
569 return;
570 }
571
572 size_t length = buffer->GetRemainingSize();
573 DCHECK_LE(length, spdy::kHttp2DefaultFramePayloadLimit);
574 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
575 // May close the stream.
576 DecreaseRecvWindowSize(static_cast<int32_t>(length));
577 if (!weak_this)
578 return;
579 buffer->AddConsumeCallback(
580 base::BindRepeating(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
581
582 // May close |this|.
583 delegate_->OnDataReceived(std::move(buffer));
584 }
585
OnPaddingConsumed(size_t len)586 void SpdyStream::OnPaddingConsumed(size_t len) {
587 // Decrease window size because padding bytes are received.
588 // Increase window size because padding bytes are consumed (by discarding).
589 // Net result: |unacked_recv_window_bytes_| increases by |len|,
590 // |recv_window_size_| does not change.
591 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
592 // May close the stream.
593 DecreaseRecvWindowSize(static_cast<int32_t>(len));
594 if (!weak_this)
595 return;
596 IncreaseRecvWindowSize(static_cast<int32_t>(len));
597 }
598
OnFrameWriteComplete(spdy::SpdyFrameType frame_type,size_t frame_size)599 void SpdyStream::OnFrameWriteComplete(spdy::SpdyFrameType frame_type,
600 size_t frame_size) {
601 if (frame_type != spdy::SpdyFrameType::HEADERS &&
602 frame_type != spdy::SpdyFrameType::DATA) {
603 return;
604 }
605
606 DCHECK_NE(type_, SPDY_PUSH_STREAM);
607
608 int result = (frame_type == spdy::SpdyFrameType::HEADERS)
609 ? OnHeadersSent()
610 : OnDataSent(frame_size);
611 if (result == ERR_IO_PENDING) {
612 // The write operation hasn't completed yet.
613 return;
614 }
615
616 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
617 if (io_state_ == STATE_OPEN) {
618 io_state_ = STATE_HALF_CLOSED_LOCAL;
619 } else if (io_state_ == STATE_HALF_CLOSED_REMOTE) {
620 io_state_ = STATE_CLOSED;
621 } else {
622 NOTREACHED() << io_state_;
623 }
624 }
625 // Notify delegate of write completion. Must not destroy |this|.
626 CHECK(delegate_);
627 {
628 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
629 write_handler_guard_ = true;
630 if (frame_type == spdy::SpdyFrameType::HEADERS) {
631 delegate_->OnHeadersSent();
632 } else {
633 delegate_->OnDataSent();
634 }
635 CHECK(weak_this);
636 write_handler_guard_ = false;
637 }
638
639 if (io_state_ == STATE_CLOSED) {
640 // Deletes |this|.
641 session_->CloseActiveStream(stream_id_, OK);
642 }
643 }
644
OnHeadersSent()645 int SpdyStream::OnHeadersSent() {
646 CHECK_EQ(io_state_, STATE_IDLE);
647 CHECK_NE(stream_id_, 0u);
648
649 io_state_ = STATE_OPEN;
650 return OK;
651 }
652
OnDataSent(size_t frame_size)653 int SpdyStream::OnDataSent(size_t frame_size) {
654 CHECK(io_state_ == STATE_OPEN ||
655 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
656
657 size_t frame_payload_size = frame_size - spdy::kDataFrameMinimumSize;
658
659 CHECK_GE(frame_size, spdy::kDataFrameMinimumSize);
660 CHECK_LE(frame_payload_size, spdy::kHttp2DefaultFramePayloadLimit);
661
662 // If more data is available to send, dispatch it and
663 // return that the write operation is still ongoing.
664 pending_send_data_->DidConsume(frame_payload_size);
665 if (pending_send_data_->BytesRemaining() > 0) {
666 QueueNextDataFrame();
667 return ERR_IO_PENDING;
668 } else {
669 pending_send_data_ = nullptr;
670 return OK;
671 }
672 }
673
LogStreamError(int error,base::StringPiece description)674 void SpdyStream::LogStreamError(int error, base::StringPiece description) {
675 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_ERROR, [&] {
676 return NetLogSpdyStreamErrorParams(stream_id_, error, description);
677 });
678 }
679
OnClose(int status)680 void SpdyStream::OnClose(int status) {
681 // In most cases, the stream should already be CLOSED. The exception is when a
682 // SpdySession is shutting down while the stream is in an intermediate state.
683 io_state_ = STATE_CLOSED;
684 if (status == ERR_HTTP2_RST_STREAM_NO_ERROR_RECEIVED) {
685 if (response_state_ == READY_FOR_HEADERS) {
686 status = ERR_HTTP2_PROTOCOL_ERROR;
687 } else {
688 status = OK;
689 }
690 }
691 Delegate* delegate = delegate_;
692 delegate_ = nullptr;
693 if (delegate)
694 delegate->OnClose(status);
695 // Unset |stream_id_| last so that the delegate can look it up.
696 stream_id_ = 0;
697 }
698
Cancel(int error)699 void SpdyStream::Cancel(int error) {
700 // We may be called again from a delegate's OnClose().
701 if (io_state_ == STATE_CLOSED)
702 return;
703
704 if (stream_id_ != 0) {
705 session_->ResetStream(stream_id_, error, std::string());
706 } else {
707 session_->CloseCreatedStream(GetWeakPtr(), error);
708 }
709 // |this| is invalid at this point.
710 }
711
Close()712 void SpdyStream::Close() {
713 // We may be called again from a delegate's OnClose().
714 if (io_state_ == STATE_CLOSED)
715 return;
716
717 if (stream_id_ != 0) {
718 session_->CloseActiveStream(stream_id_, OK);
719 } else {
720 session_->CloseCreatedStream(GetWeakPtr(), OK);
721 }
722 // |this| is invalid at this point.
723 }
724
GetWeakPtr()725 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
726 return weak_ptr_factory_.GetWeakPtr();
727 }
728
SendRequestHeaders(spdy::Http2HeaderBlock request_headers,SpdySendStatus send_status)729 int SpdyStream::SendRequestHeaders(spdy::Http2HeaderBlock request_headers,
730 SpdySendStatus send_status) {
731 CHECK_NE(type_, SPDY_PUSH_STREAM);
732 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
733 CHECK(!request_headers_valid_);
734 CHECK(!pending_send_data_.get());
735 CHECK_EQ(io_state_, STATE_IDLE);
736 request_headers_ = std::move(request_headers);
737 request_headers_valid_ = true;
738 pending_send_status_ = send_status;
739 session_->EnqueueStreamWrite(
740 GetWeakPtr(), spdy::SpdyFrameType::HEADERS,
741 std::make_unique<HeadersBufferProducer>(GetWeakPtr()));
742 return ERR_IO_PENDING;
743 }
744
SendData(IOBuffer * data,int length,SpdySendStatus send_status)745 void SpdyStream::SendData(IOBuffer* data,
746 int length,
747 SpdySendStatus send_status) {
748 CHECK_NE(type_, SPDY_PUSH_STREAM);
749 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
750 CHECK(io_state_ == STATE_OPEN ||
751 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
752 CHECK(!pending_send_data_.get());
753 pending_send_data_ = base::MakeRefCounted<DrainableIOBuffer>(data, length);
754 pending_send_status_ = send_status;
755 QueueNextDataFrame();
756 }
757
GetSSLInfo(SSLInfo * ssl_info) const758 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info) const {
759 return session_->GetSSLInfo(ssl_info);
760 }
761
WasAlpnNegotiated() const762 bool SpdyStream::WasAlpnNegotiated() const {
763 return session_->WasAlpnNegotiated();
764 }
765
GetNegotiatedProtocol() const766 NextProto SpdyStream::GetNegotiatedProtocol() const {
767 return session_->GetNegotiatedProtocol();
768 }
769
PossiblyResumeIfSendStalled()770 SpdyStream::ShouldRequeueStream SpdyStream::PossiblyResumeIfSendStalled() {
771 if (IsLocallyClosed() || !send_stalled_by_flow_control_)
772 return DoNotRequeue;
773 if (session_->IsSendStalled() || send_window_size_ <= 0) {
774 return Requeue;
775 }
776 net_log_.AddEventWithIntParams(
777 NetLogEventType::HTTP2_STREAM_FLOW_CONTROL_UNSTALLED, "stream_id",
778 stream_id_);
779 send_stalled_by_flow_control_ = false;
780 QueueNextDataFrame();
781 return DoNotRequeue;
782 }
783
IsClosed() const784 bool SpdyStream::IsClosed() const {
785 return io_state_ == STATE_CLOSED;
786 }
787
IsLocallyClosed() const788 bool SpdyStream::IsLocallyClosed() const {
789 return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
790 io_state_ == STATE_HALF_CLOSED_LOCAL ||
791 io_state_ == STATE_CLOSED;
792 }
793
IsIdle() const794 bool SpdyStream::IsIdle() const {
795 return io_state_ == STATE_IDLE;
796 }
797
IsOpen() const798 bool SpdyStream::IsOpen() const {
799 return io_state_ == STATE_OPEN;
800 }
801
IsReservedRemote() const802 bool SpdyStream::IsReservedRemote() const {
803 return io_state_ == STATE_RESERVED_REMOTE;
804 }
805
AddRawReceivedBytes(size_t received_bytes)806 void SpdyStream::AddRawReceivedBytes(size_t received_bytes) {
807 raw_received_bytes_ += received_bytes;
808 }
809
AddRawSentBytes(size_t sent_bytes)810 void SpdyStream::AddRawSentBytes(size_t sent_bytes) {
811 raw_sent_bytes_ += sent_bytes;
812 }
813
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const814 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
815 if (stream_id_ == 0)
816 return false;
817 bool result = session_->GetLoadTimingInfo(stream_id_, load_timing_info);
818 if (type_ == SPDY_PUSH_STREAM) {
819 load_timing_info->push_start = recv_first_byte_time_;
820 bool done_receiving = IsClosed() || (!pending_recv_data_.empty() &&
821 !pending_recv_data_.back());
822 if (done_receiving)
823 load_timing_info->push_end = recv_last_byte_time_;
824 }
825 // TODO(acomminos): recv_first_byte_time_ is actually the time after all
826 // headers have been parsed. We should add support for reporting the time the
827 // first bytes of the HEADERS frame were received to BufferedSpdyFramer
828 // (https://crbug.com/568024).
829 load_timing_info->receive_headers_start = recv_first_byte_time_;
830 load_timing_info->receive_non_informational_headers_start =
831 recv_first_byte_time_for_non_informational_response_;
832 load_timing_info->first_early_hints_time = first_early_hints_time_;
833 return result;
834 }
835
QueueNextDataFrame()836 void SpdyStream::QueueNextDataFrame() {
837 // Until the request has been completely sent, we cannot be sure
838 // that our stream_id is correct.
839 CHECK(io_state_ == STATE_OPEN ||
840 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
841 CHECK_GT(stream_id_, 0u);
842 CHECK(pending_send_data_.get());
843 // Only the final fame may have a length of 0.
844 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
845 CHECK_GE(pending_send_data_->BytesRemaining(), 0);
846 } else {
847 CHECK_GT(pending_send_data_->BytesRemaining(), 0);
848 }
849
850 spdy::SpdyDataFlags flags = (pending_send_status_ == NO_MORE_DATA_TO_SEND)
851 ? spdy::DATA_FLAG_FIN
852 : spdy::DATA_FLAG_NONE;
853 int effective_len;
854 bool end_stream;
855 std::unique_ptr<SpdyBuffer> data_buffer(
856 session_->CreateDataBuffer(stream_id_, pending_send_data_.get(),
857 pending_send_data_->BytesRemaining(), flags,
858 &effective_len, &end_stream));
859 // We'll get called again by PossiblyResumeIfSendStalled().
860 if (!data_buffer)
861 return;
862
863 DCHECK_GE(data_buffer->GetRemainingSize(), spdy::kDataFrameMinimumSize);
864 size_t payload_size =
865 data_buffer->GetRemainingSize() - spdy::kDataFrameMinimumSize;
866 DCHECK_LE(payload_size, spdy::kHttp2DefaultFramePayloadLimit);
867
868 // Send window size is based on payload size, so nothing to do if this is
869 // just a FIN with no payload.
870 if (payload_size != 0) {
871 DecreaseSendWindowSize(static_cast<int32_t>(payload_size));
872 // This currently isn't strictly needed, since write frames are
873 // discarded only if the stream is about to be closed. But have it
874 // here anyway just in case this changes.
875 data_buffer->AddConsumeCallback(base::BindRepeating(
876 &SpdyStream::OnWriteBufferConsumed, GetWeakPtr(), payload_size));
877 }
878
879 if (session_->GreasedFramesEnabled() && delegate_ &&
880 delegate_->CanGreaseFrameType()) {
881 session_->EnqueueGreasedFrame(GetWeakPtr());
882 }
883
884 session_->net_log().AddEvent(NetLogEventType::HTTP2_SESSION_SEND_DATA, [&] {
885 return NetLogSpdyDataParams(stream_id_, effective_len, end_stream);
886 });
887
888 session_->EnqueueStreamWrite(
889 GetWeakPtr(), spdy::SpdyFrameType::DATA,
890 std::make_unique<SimpleBufferProducer>(std::move(data_buffer)));
891 }
892
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & response_headers,base::TimeTicks recv_first_byte_time)893 void SpdyStream::OnEarlyHintsReceived(
894 const spdy::Http2HeaderBlock& response_headers,
895 base::TimeTicks recv_first_byte_time) {
896 // Record the timing of the 103 Early Hints response for the experiment
897 // (https://crbug.com/1093693).
898 if (first_early_hints_time_.is_null())
899 first_early_hints_time_ = recv_first_byte_time;
900
901 // Transfer-encoding is a connection specific header.
902 if (response_headers.find("transfer-encoding") != response_headers.end()) {
903 const char error[] = "Received transfer-encoding header";
904 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
905 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
906 return;
907 }
908
909 if (type_ != SPDY_REQUEST_RESPONSE_STREAM || io_state_ == STATE_IDLE) {
910 const char error[] = "Early Hints received before request sent.";
911 LogStreamError(ERR_HTTP2_PROTOCOL_ERROR, error);
912 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR, error);
913 return;
914 }
915
916 // `delegate_` must be attached at this point when `type_` is
917 // SPDY_REQUEST_RESPONSE_STREAM.
918 CHECK(delegate_);
919 delegate_->OnEarlyHintsReceived(response_headers);
920 }
921
SaveResponseHeaders(const spdy::Http2HeaderBlock & response_headers,int status)922 void SpdyStream::SaveResponseHeaders(
923 const spdy::Http2HeaderBlock& response_headers,
924 int status) {
925 DCHECK(response_headers_.empty());
926 if (response_headers.find("transfer-encoding") != response_headers.end()) {
927 session_->ResetStream(stream_id_, ERR_HTTP2_PROTOCOL_ERROR,
928 "Received transfer-encoding header");
929 return;
930 }
931
932 for (spdy::Http2HeaderBlock::const_iterator it = response_headers.begin();
933 it != response_headers.end(); ++it) {
934 response_headers_.insert(*it);
935 }
936
937 // Reject pushed stream with unsupported status code regardless of whether
938 // delegate is already attached or not.
939 if (type_ == SPDY_PUSH_STREAM &&
940 (status / 100 != 2 && status / 100 != 3 && status != 416)) {
941 SpdySession::RecordSpdyPushedStreamFateHistogram(
942 SpdyPushedStreamFate::kUnsupportedStatusCode);
943 session_->ResetStream(stream_id_, ERR_HTTP2_CLIENT_REFUSED_STREAM,
944 "Unsupported status code for pushed stream.");
945 return;
946 }
947
948 // If delegate is not yet attached, OnHeadersReceived() will be called after
949 // the delegate gets attached to the stream.
950 if (!delegate_)
951 return;
952
953 if (type_ == SPDY_PUSH_STREAM) {
954 // OnPushPromiseHeadersReceived() must have been called before
955 // OnHeadersReceived().
956 DCHECK(request_headers_valid_);
957 delegate_->OnHeadersReceived(response_headers_, &request_headers_);
958 } else {
959 delegate_->OnHeadersReceived(response_headers_, nullptr);
960 }
961 }
962
963 #define STATE_CASE(s) \
964 case s: \
965 description = base::StringPrintf("%s (0x%08X)", #s, s); \
966 break
967
DescribeState(State state)968 std::string SpdyStream::DescribeState(State state) {
969 std::string description;
970 switch (state) {
971 STATE_CASE(STATE_IDLE);
972 STATE_CASE(STATE_OPEN);
973 STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
974 STATE_CASE(STATE_HALF_CLOSED_LOCAL);
975 STATE_CASE(STATE_CLOSED);
976 default:
977 description =
978 base::StringPrintf("Unknown state 0x%08X (%u)", state, state);
979 break;
980 }
981 return description;
982 }
983
984 #undef STATE_CASE
985
986 } // namespace net
987