• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_stream.h"
6 
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/logging.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/strings/string_number_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/values.h"
14 #include "net/spdy/spdy_buffer_producer.h"
15 #include "net/spdy/spdy_http_utils.h"
16 #include "net/spdy/spdy_session.h"
17 
18 namespace net {
19 
20 namespace {
21 
NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,int status,const std::string * description,NetLog::LogLevel)22 base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
23                                            int status,
24                                            const std::string* description,
25                                            NetLog::LogLevel /* log_level */) {
26   base::DictionaryValue* dict = new base::DictionaryValue();
27   dict->SetInteger("stream_id", static_cast<int>(stream_id));
28   dict->SetInteger("status", status);
29   dict->SetString("description", *description);
30   return dict;
31 }
32 
NetLogSpdyStreamWindowUpdateCallback(SpdyStreamId stream_id,int32 delta,int32 window_size,NetLog::LogLevel)33 base::Value* NetLogSpdyStreamWindowUpdateCallback(
34     SpdyStreamId stream_id,
35     int32 delta,
36     int32 window_size,
37     NetLog::LogLevel /* log_level */) {
38   base::DictionaryValue* dict = new base::DictionaryValue();
39   dict->SetInteger("stream_id", stream_id);
40   dict->SetInteger("delta", delta);
41   dict->SetInteger("window_size", window_size);
42   return dict;
43 }
44 
ContainsUppercaseAscii(const std::string & str)45 bool ContainsUppercaseAscii(const std::string& str) {
46   for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
47     if (*i >= 'A' && *i <= 'Z') {
48       return true;
49     }
50   }
51   return false;
52 }
53 
54 }  // namespace
55 
56 // A wrapper around a stream that calls into ProduceSynStreamFrame().
57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58  public:
SynStreamBufferProducer(const base::WeakPtr<SpdyStream> & stream)59   SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60       : stream_(stream) {
61     DCHECK(stream_.get());
62   }
63 
~SynStreamBufferProducer()64   virtual ~SynStreamBufferProducer() {}
65 
ProduceBuffer()66   virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67     if (!stream_.get()) {
68       NOTREACHED();
69       return scoped_ptr<SpdyBuffer>();
70     }
71     DCHECK_GT(stream_->stream_id(), 0u);
72     return scoped_ptr<SpdyBuffer>(
73         new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74   }
75 
76  private:
77   const base::WeakPtr<SpdyStream> stream_;
78 };
79 
SpdyStream(SpdyStreamType type,const base::WeakPtr<SpdySession> & session,const GURL & url,RequestPriority priority,int32 initial_send_window_size,int32 initial_recv_window_size,const BoundNetLog & net_log)80 SpdyStream::SpdyStream(SpdyStreamType type,
81                        const base::WeakPtr<SpdySession>& session,
82                        const GURL& url,
83                        RequestPriority priority,
84                        int32 initial_send_window_size,
85                        int32 initial_recv_window_size,
86                        const BoundNetLog& net_log)
87     : type_(type),
88       stream_id_(0),
89       url_(url),
90       priority_(priority),
91       send_stalled_by_flow_control_(false),
92       send_window_size_(initial_send_window_size),
93       recv_window_size_(initial_recv_window_size),
94       unacked_recv_window_bytes_(0),
95       session_(session),
96       delegate_(NULL),
97       pending_send_status_(MORE_DATA_TO_SEND),
98       request_time_(base::Time::Now()),
99       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
100       io_state_(STATE_IDLE),
101       response_status_(OK),
102       net_log_(net_log),
103       raw_received_bytes_(0),
104       send_bytes_(0),
105       recv_bytes_(0),
106       write_handler_guard_(false),
107       weak_ptr_factory_(this) {
108   CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
109         type_ == SPDY_REQUEST_RESPONSE_STREAM ||
110         type_ == SPDY_PUSH_STREAM);
111   CHECK_GE(priority_, MINIMUM_PRIORITY);
112   CHECK_LE(priority_, MAXIMUM_PRIORITY);
113 }
114 
~SpdyStream()115 SpdyStream::~SpdyStream() {
116   CHECK(!write_handler_guard_);
117   UpdateHistograms();
118 }
119 
SetDelegate(Delegate * delegate)120 void SpdyStream::SetDelegate(Delegate* delegate) {
121   CHECK(!delegate_);
122   CHECK(delegate);
123   delegate_ = delegate;
124 
125   CHECK(io_state_ == STATE_IDLE ||
126         io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
127         io_state_ == STATE_RESERVED_REMOTE);
128 
129   if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
130     DCHECK_EQ(type_, SPDY_PUSH_STREAM);
131     base::MessageLoop::current()->PostTask(
132         FROM_HERE,
133         base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr()));
134   }
135 }
136 
PushedStreamReplay()137 void SpdyStream::PushedStreamReplay() {
138   DCHECK_EQ(type_, SPDY_PUSH_STREAM);
139   DCHECK_NE(stream_id_, 0u);
140   CHECK_EQ(stream_id_ % 2, 0u);
141 
142   CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
143   io_state_ = STATE_HALF_CLOSED_LOCAL;
144 
145   // The delegate methods called below may delete |this|, so use
146   // |weak_this| to detect that.
147   base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
148 
149   CHECK(delegate_);
150   SpdyResponseHeadersStatus status =
151       delegate_->OnResponseHeadersUpdated(response_headers_);
152   if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
153     // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
154     // have been closed. Since we don't have complete headers, assume
155     // we're waiting for another HEADERS frame, and we had better not
156     // have any pending data frames.
157     CHECK(weak_this);
158     if (!pending_recv_data_.empty()) {
159       LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
160                      "Data received with incomplete headers.");
161       session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
162     }
163     return;
164   }
165 
166   // OnResponseHeadersUpdated() may have closed |this|.
167   if (!weak_this)
168     return;
169 
170   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
171 
172   while (!pending_recv_data_.empty()) {
173     // Take ownership of the first element of |pending_recv_data_|.
174     scoped_ptr<SpdyBuffer> buffer(pending_recv_data_.front());
175     pending_recv_data_.weak_erase(pending_recv_data_.begin());
176 
177     bool eof = (buffer == NULL);
178 
179     CHECK(delegate_);
180     delegate_->OnDataReceived(buffer.Pass());
181 
182     // OnDataReceived() may have closed |this|.
183     if (!weak_this)
184       return;
185 
186     if (eof) {
187       DCHECK(pending_recv_data_.empty());
188       session_->CloseActiveStream(stream_id_, OK);
189       DCHECK(!weak_this);
190       // |pending_recv_data_| is invalid at this point.
191       break;
192     }
193   }
194 }
195 
ProduceSynStreamFrame()196 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
197   CHECK_EQ(io_state_, STATE_IDLE);
198   CHECK(request_headers_);
199   CHECK_GT(stream_id_, 0u);
200 
201   SpdyControlFlags flags =
202       (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
203       CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
204   scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
205       stream_id_, priority_, flags, *request_headers_));
206   send_time_ = base::TimeTicks::Now();
207   return frame.Pass();
208 }
209 
DetachDelegate()210 void SpdyStream::DetachDelegate() {
211   DCHECK(!IsClosed());
212   delegate_ = NULL;
213   Cancel();
214 }
215 
AdjustSendWindowSize(int32 delta_window_size)216 void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
217   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
218 
219   if (IsClosed())
220     return;
221 
222   // Check for wraparound.
223   if (send_window_size_ > 0) {
224     DCHECK_LE(delta_window_size, kint32max - send_window_size_);
225   }
226   if (send_window_size_ < 0) {
227     DCHECK_GE(delta_window_size, kint32min - send_window_size_);
228   }
229   send_window_size_ += delta_window_size;
230   PossiblyResumeIfSendStalled();
231 }
232 
OnWriteBufferConsumed(size_t frame_payload_size,size_t consume_size,SpdyBuffer::ConsumeSource consume_source)233 void SpdyStream::OnWriteBufferConsumed(
234     size_t frame_payload_size,
235     size_t consume_size,
236     SpdyBuffer::ConsumeSource consume_source) {
237   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
238   if (consume_source == SpdyBuffer::DISCARD) {
239     // If we're discarding a frame or part of it, increase the send
240     // window by the number of discarded bytes. (Although if we're
241     // discarding part of a frame, it's probably because of a write
242     // error and we'll be tearing down the stream soon.)
243     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
244     DCHECK_GT(remaining_payload_bytes, 0u);
245     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
246   }
247   // For consumed bytes, the send window is increased when we receive
248   // a WINDOW_UPDATE frame.
249 }
250 
IncreaseSendWindowSize(int32 delta_window_size)251 void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
252   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
253   DCHECK_GE(delta_window_size, 1);
254 
255   // Ignore late WINDOW_UPDATEs.
256   if (IsClosed())
257     return;
258 
259   if (send_window_size_ > 0) {
260     // Check for overflow.
261     int32 max_delta_window_size = kint32max - send_window_size_;
262     if (delta_window_size > max_delta_window_size) {
263       std::string desc = base::StringPrintf(
264           "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
265           "send_window_size_ [current: %d]", delta_window_size, stream_id_,
266           send_window_size_);
267       session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
268       return;
269     }
270   }
271 
272   send_window_size_ += delta_window_size;
273 
274   net_log_.AddEvent(
275       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
276       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
277                  stream_id_, delta_window_size, send_window_size_));
278 
279   PossiblyResumeIfSendStalled();
280 }
281 
DecreaseSendWindowSize(int32 delta_window_size)282 void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
283   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
284 
285   if (IsClosed())
286     return;
287 
288   // We only call this method when sending a frame. Therefore,
289   // |delta_window_size| should be within the valid frame size range.
290   DCHECK_GE(delta_window_size, 1);
291   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
292 
293   // |send_window_size_| should have been at least |delta_window_size| for
294   // this call to happen.
295   DCHECK_GE(send_window_size_, delta_window_size);
296 
297   send_window_size_ -= delta_window_size;
298 
299   net_log_.AddEvent(
300       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
301       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
302                  stream_id_, -delta_window_size, send_window_size_));
303 }
304 
OnReadBufferConsumed(size_t consume_size,SpdyBuffer::ConsumeSource consume_source)305 void SpdyStream::OnReadBufferConsumed(
306     size_t consume_size,
307     SpdyBuffer::ConsumeSource consume_source) {
308   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
309   DCHECK_GE(consume_size, 1u);
310   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
311   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
312 }
313 
IncreaseRecvWindowSize(int32 delta_window_size)314 void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
315   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
316 
317   // By the time a read is processed by the delegate, this stream may
318   // already be inactive.
319   if (!session_->IsStreamActive(stream_id_))
320     return;
321 
322   DCHECK_GE(unacked_recv_window_bytes_, 0);
323   DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
324   DCHECK_GE(delta_window_size, 1);
325   // Check for overflow.
326   DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
327 
328   recv_window_size_ += delta_window_size;
329   net_log_.AddEvent(
330       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
331       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
332                  stream_id_, delta_window_size, recv_window_size_));
333 
334   unacked_recv_window_bytes_ += delta_window_size;
335   if (unacked_recv_window_bytes_ >
336       session_->stream_initial_recv_window_size() / 2) {
337     session_->SendStreamWindowUpdate(
338         stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
339     unacked_recv_window_bytes_ = 0;
340   }
341 }
342 
DecreaseRecvWindowSize(int32 delta_window_size)343 void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
344   DCHECK(session_->IsStreamActive(stream_id_));
345   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
346   DCHECK_GE(delta_window_size, 1);
347 
348   // Since we never decrease the initial receive window size,
349   // |delta_window_size| should never cause |recv_window_size_| to go
350   // negative. If we do, the receive window isn't being respected.
351   if (delta_window_size > recv_window_size_) {
352     session_->ResetStream(
353         stream_id_, RST_STREAM_PROTOCOL_ERROR,
354         "delta_window_size is " + base::IntToString(delta_window_size) +
355             " in DecreaseRecvWindowSize, which is larger than the receive " +
356             "window size of " + base::IntToString(recv_window_size_));
357     return;
358   }
359 
360   recv_window_size_ -= delta_window_size;
361   net_log_.AddEvent(
362       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
363       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
364                  stream_id_, -delta_window_size, recv_window_size_));
365 }
366 
GetPeerAddress(IPEndPoint * address) const367 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
368   return session_->GetPeerAddress(address);
369 }
370 
GetLocalAddress(IPEndPoint * address) const371 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
372   return session_->GetLocalAddress(address);
373 }
374 
WasEverUsed() const375 bool SpdyStream::WasEverUsed() const {
376   return session_->WasEverUsed();
377 }
378 
GetRequestTime() const379 base::Time SpdyStream::GetRequestTime() const {
380   return request_time_;
381 }
382 
SetRequestTime(base::Time t)383 void SpdyStream::SetRequestTime(base::Time t) {
384   request_time_ = t;
385 }
386 
OnInitialResponseHeadersReceived(const SpdyHeaderBlock & initial_response_headers,base::Time response_time,base::TimeTicks recv_first_byte_time)387 int SpdyStream::OnInitialResponseHeadersReceived(
388     const SpdyHeaderBlock& initial_response_headers,
389     base::Time response_time,
390     base::TimeTicks recv_first_byte_time) {
391   // SpdySession guarantees that this is called at most once.
392   CHECK(response_headers_.empty());
393 
394   // Check to make sure that we don't receive the response headers
395   // before we're ready for it.
396   switch (type_) {
397     case SPDY_BIDIRECTIONAL_STREAM:
398       // For a bidirectional stream, we're ready for the response
399       // headers once we've finished sending the request headers.
400       if (io_state_ == STATE_IDLE) {
401         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
402                               "Response received before request sent");
403         return ERR_SPDY_PROTOCOL_ERROR;
404       }
405       break;
406 
407     case SPDY_REQUEST_RESPONSE_STREAM:
408       // For a request/response stream, we're ready for the response
409       // headers once we've finished sending the request headers.
410       if (io_state_ == STATE_IDLE) {
411         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
412                               "Response received before request sent");
413         return ERR_SPDY_PROTOCOL_ERROR;
414       }
415       break;
416 
417     case SPDY_PUSH_STREAM:
418       // Push streams transition to a locally half-closed state upon headers.
419       // We must continue to buffer data while waiting for a call to
420       // SetDelegate() (which may not ever happen).
421       CHECK_EQ(io_state_, STATE_RESERVED_REMOTE);
422       if (!delegate_) {
423         io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED;
424       } else {
425         io_state_ = STATE_HALF_CLOSED_LOCAL;
426       }
427       break;
428   }
429 
430   metrics_.StartStream();
431 
432   DCHECK_NE(io_state_, STATE_IDLE);
433 
434   response_time_ = response_time;
435   recv_first_byte_time_ = recv_first_byte_time;
436   return MergeWithResponseHeaders(initial_response_headers);
437 }
438 
OnAdditionalResponseHeadersReceived(const SpdyHeaderBlock & additional_response_headers)439 int SpdyStream::OnAdditionalResponseHeadersReceived(
440     const SpdyHeaderBlock& additional_response_headers) {
441   if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
442     session_->ResetStream(
443         stream_id_, RST_STREAM_PROTOCOL_ERROR,
444         "Additional headers received for request/response stream");
445     return ERR_SPDY_PROTOCOL_ERROR;
446   } else if (type_ == SPDY_PUSH_STREAM &&
447              response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
448     session_->ResetStream(
449         stream_id_, RST_STREAM_PROTOCOL_ERROR,
450         "Additional headers received for push stream");
451     return ERR_SPDY_PROTOCOL_ERROR;
452   }
453   return MergeWithResponseHeaders(additional_response_headers);
454 }
455 
OnPushPromiseHeadersReceived(const SpdyHeaderBlock & headers)456 void SpdyStream::OnPushPromiseHeadersReceived(const SpdyHeaderBlock& headers) {
457   CHECK(!request_headers_.get());
458   CHECK_EQ(io_state_, STATE_IDLE);
459   CHECK_EQ(type_, SPDY_PUSH_STREAM);
460   DCHECK(!delegate_);
461 
462   io_state_ = STATE_RESERVED_REMOTE;
463   request_headers_.reset(new SpdyHeaderBlock(headers));
464 }
465 
OnDataReceived(scoped_ptr<SpdyBuffer> buffer)466 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
467   DCHECK(session_->IsStreamActive(stream_id_));
468 
469   // If we're still buffering data for a push stream, we will do the
470   // check for data received with incomplete headers in
471   // PushedStreamReplayData().
472   if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
473     DCHECK_EQ(type_, SPDY_PUSH_STREAM);
474     // It should be valid for this to happen in the server push case.
475     // We'll return received data when delegate gets attached to the stream.
476     if (buffer) {
477       pending_recv_data_.push_back(buffer.release());
478     } else {
479       pending_recv_data_.push_back(NULL);
480       metrics_.StopStream();
481       // Note: we leave the stream open in the session until the stream
482       //       is claimed.
483     }
484     return;
485   }
486 
487   // If we have response headers but the delegate has indicated that
488   // it's still incomplete, then that's a protocol error.
489   if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
490     LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
491                    "Data received with incomplete headers.");
492     session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
493     return;
494   }
495 
496   CHECK(!IsClosed());
497 
498   if (!buffer) {
499     metrics_.StopStream();
500     if (io_state_ == STATE_OPEN) {
501       io_state_ = STATE_HALF_CLOSED_REMOTE;
502     } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) {
503       io_state_ = STATE_CLOSED;
504       // Deletes |this|.
505       session_->CloseActiveStream(stream_id_, OK);
506     } else {
507       NOTREACHED() << io_state_;
508     }
509     return;
510   }
511 
512   size_t length = buffer->GetRemainingSize();
513   DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
514   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
515     DecreaseRecvWindowSize(static_cast<int32>(length));
516     buffer->AddConsumeCallback(
517         base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
518   }
519 
520   // Track our bandwidth.
521   metrics_.RecordBytes(length);
522   recv_bytes_ += length;
523   recv_last_byte_time_ = base::TimeTicks::Now();
524 
525   // May close |this|.
526   delegate_->OnDataReceived(buffer.Pass());
527 }
528 
OnFrameWriteComplete(SpdyFrameType frame_type,size_t frame_size)529 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
530                                       size_t frame_size) {
531   DCHECK_NE(type_, SPDY_PUSH_STREAM);
532 
533   if (frame_size < session_->GetFrameMinimumSize() ||
534       frame_size > session_->GetFrameMaximumSize()) {
535     NOTREACHED();
536     return;
537   }
538   CHECK(frame_type == SYN_STREAM ||
539         frame_type == DATA) << frame_type;
540 
541   int result = (frame_type == SYN_STREAM) ?
542       OnRequestHeadersSent() : OnDataSent(frame_size);
543   if (result == ERR_IO_PENDING) {
544     // The write operation hasn't completed yet.
545     return;
546   }
547 
548   if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
549     if(io_state_ == STATE_OPEN) {
550       io_state_ = STATE_HALF_CLOSED_LOCAL;
551     } else if(io_state_ == STATE_HALF_CLOSED_REMOTE) {
552       io_state_ = STATE_CLOSED;
553     } else {
554       NOTREACHED() << io_state_;
555     }
556   }
557   // Notify delegate of write completion. Must not destroy |this|.
558   CHECK(delegate_);
559   {
560     base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
561     write_handler_guard_ = true;
562     if (frame_type == SYN_STREAM) {
563       delegate_->OnRequestHeadersSent();
564     } else {
565       delegate_->OnDataSent();
566     }
567     CHECK(weak_this);
568     write_handler_guard_ = false;
569   }
570 
571   if (io_state_ == STATE_CLOSED) {
572     // Deletes |this|.
573     session_->CloseActiveStream(stream_id_, OK);
574   }
575 }
576 
OnRequestHeadersSent()577 int SpdyStream::OnRequestHeadersSent() {
578   CHECK_EQ(io_state_, STATE_IDLE);
579   CHECK_NE(stream_id_, 0u);
580 
581   io_state_ = STATE_OPEN;
582   return OK;
583 }
584 
OnDataSent(size_t frame_size)585 int SpdyStream::OnDataSent(size_t frame_size) {
586   CHECK(io_state_ == STATE_OPEN ||
587         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
588 
589   size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize();
590 
591   CHECK_GE(frame_size, session_->GetDataFrameMinimumSize());
592   CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload());
593 
594   send_bytes_ += frame_payload_size;
595 
596   // If more data is available to send, dispatch it and
597   // return that the write operation is still ongoing.
598   pending_send_data_->DidConsume(frame_payload_size);
599   if (pending_send_data_->BytesRemaining() > 0) {
600     QueueNextDataFrame();
601     return ERR_IO_PENDING;
602   } else {
603     pending_send_data_ = NULL;
604     return OK;
605   }
606 }
607 
GetProtocolVersion() const608 SpdyMajorVersion SpdyStream::GetProtocolVersion() const {
609   return session_->GetProtocolVersion();
610 }
611 
LogStreamError(int status,const std::string & description)612 void SpdyStream::LogStreamError(int status, const std::string& description) {
613   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
614                     base::Bind(&NetLogSpdyStreamErrorCallback,
615                                stream_id_, status, &description));
616 }
617 
OnClose(int status)618 void SpdyStream::OnClose(int status) {
619   // In most cases, the stream should already be CLOSED. The exception is when a
620   // SpdySession is shutting down while the stream is in an intermediate state.
621   io_state_ = STATE_CLOSED;
622   response_status_ = status;
623   Delegate* delegate = delegate_;
624   delegate_ = NULL;
625   if (delegate)
626     delegate->OnClose(status);
627   // Unset |stream_id_| last so that the delegate can look it up.
628   stream_id_ = 0;
629 }
630 
Cancel()631 void SpdyStream::Cancel() {
632   // We may be called again from a delegate's OnClose().
633   if (io_state_ == STATE_CLOSED)
634     return;
635 
636   if (stream_id_ != 0) {
637     session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
638   } else {
639     session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
640   }
641   // |this| is invalid at this point.
642 }
643 
Close()644 void SpdyStream::Close() {
645   // We may be called again from a delegate's OnClose().
646   if (io_state_ == STATE_CLOSED)
647     return;
648 
649   if (stream_id_ != 0) {
650     session_->CloseActiveStream(stream_id_, OK);
651   } else {
652     session_->CloseCreatedStream(GetWeakPtr(), OK);
653   }
654   // |this| is invalid at this point.
655 }
656 
GetWeakPtr()657 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
658   return weak_ptr_factory_.GetWeakPtr();
659 }
660 
SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,SpdySendStatus send_status)661 int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
662                                    SpdySendStatus send_status) {
663   CHECK_NE(type_, SPDY_PUSH_STREAM);
664   CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
665   CHECK(!request_headers_);
666   CHECK(!pending_send_data_.get());
667   CHECK_EQ(io_state_, STATE_IDLE);
668   request_headers_ = request_headers.Pass();
669   pending_send_status_ = send_status;
670   session_->EnqueueStreamWrite(
671       GetWeakPtr(), SYN_STREAM,
672       scoped_ptr<SpdyBufferProducer>(
673           new SynStreamBufferProducer(GetWeakPtr())));
674   return ERR_IO_PENDING;
675 }
676 
SendData(IOBuffer * data,int length,SpdySendStatus send_status)677 void SpdyStream::SendData(IOBuffer* data,
678                           int length,
679                           SpdySendStatus send_status) {
680   CHECK_NE(type_, SPDY_PUSH_STREAM);
681   CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
682   CHECK(io_state_ == STATE_OPEN ||
683         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
684   CHECK(!pending_send_data_.get());
685   pending_send_data_ = new DrainableIOBuffer(data, length);
686   pending_send_status_ = send_status;
687   QueueNextDataFrame();
688 }
689 
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated,NextProto * protocol_negotiated)690 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
691                             bool* was_npn_negotiated,
692                             NextProto* protocol_negotiated) {
693   return session_->GetSSLInfo(
694       ssl_info, was_npn_negotiated, protocol_negotiated);
695 }
696 
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)697 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
698   return session_->GetSSLCertRequestInfo(cert_request_info);
699 }
700 
PossiblyResumeIfSendStalled()701 void SpdyStream::PossiblyResumeIfSendStalled() {
702   if (IsLocallyClosed()) {
703     return;
704   }
705   if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
706       send_window_size_ > 0) {
707     net_log_.AddEvent(
708         NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
709         NetLog::IntegerCallback("stream_id", stream_id_));
710     send_stalled_by_flow_control_ = false;
711     QueueNextDataFrame();
712   }
713 }
714 
IsClosed() const715 bool SpdyStream::IsClosed() const {
716   return io_state_ == STATE_CLOSED;
717 }
718 
IsLocallyClosed() const719 bool SpdyStream::IsLocallyClosed() const {
720   return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
721       io_state_ == STATE_HALF_CLOSED_LOCAL ||
722       io_state_ == STATE_CLOSED;
723 }
724 
IsIdle() const725 bool SpdyStream::IsIdle() const {
726   return io_state_ == STATE_IDLE;
727 }
728 
IsOpen() const729 bool SpdyStream::IsOpen() const {
730   return io_state_ == STATE_OPEN;
731 }
732 
IsReservedRemote() const733 bool SpdyStream::IsReservedRemote() const {
734   return io_state_ == STATE_RESERVED_REMOTE;
735 }
736 
GetProtocol() const737 NextProto SpdyStream::GetProtocol() const {
738   return session_->protocol();
739 }
740 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const741 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
742   if (stream_id_ == 0)
743     return false;
744 
745   return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
746 }
747 
GetUrlFromHeaders() const748 GURL SpdyStream::GetUrlFromHeaders() const {
749   if (!request_headers_)
750     return GURL();
751 
752   return GetUrlFromHeaderBlock(
753       *request_headers_, GetProtocolVersion(), type_ == SPDY_PUSH_STREAM);
754 }
755 
HasUrlFromHeaders() const756 bool SpdyStream::HasUrlFromHeaders() const {
757   return !GetUrlFromHeaders().is_empty();
758 }
759 
UpdateHistograms()760 void SpdyStream::UpdateHistograms() {
761   // We need at least the receive timers to be filled in, as otherwise
762   // metrics can be bogus.
763   if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
764     return;
765 
766   base::TimeTicks effective_send_time;
767   if (type_ == SPDY_PUSH_STREAM) {
768     // Push streams shouldn't have |send_time_| filled in.
769     DCHECK(send_time_.is_null());
770     effective_send_time = recv_first_byte_time_;
771   } else {
772     // For non-push streams, we also need |send_time_| to be filled
773     // in.
774     if (send_time_.is_null())
775       return;
776     effective_send_time = send_time_;
777   }
778 
779   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
780                       recv_first_byte_time_ - effective_send_time);
781   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
782                       recv_last_byte_time_ - recv_first_byte_time_);
783   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
784                       recv_last_byte_time_ - effective_send_time);
785 
786   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
787   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
788 }
789 
QueueNextDataFrame()790 void SpdyStream::QueueNextDataFrame() {
791   // Until the request has been completely sent, we cannot be sure
792   // that our stream_id is correct.
793   CHECK(io_state_ == STATE_OPEN ||
794         io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
795   CHECK_GT(stream_id_, 0u);
796   CHECK(pending_send_data_.get());
797   // Only the final fame may have a length of 0.
798   if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
799     CHECK_GE(pending_send_data_->BytesRemaining(), 0);
800   } else {
801     CHECK_GT(pending_send_data_->BytesRemaining(), 0);
802   }
803 
804   SpdyDataFlags flags =
805       (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
806       DATA_FLAG_FIN : DATA_FLAG_NONE;
807   scoped_ptr<SpdyBuffer> data_buffer(
808       session_->CreateDataBuffer(stream_id_,
809                                  pending_send_data_.get(),
810                                  pending_send_data_->BytesRemaining(),
811                                  flags));
812   // We'll get called again by PossiblyResumeIfSendStalled().
813   if (!data_buffer)
814     return;
815 
816   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
817     DCHECK_GE(data_buffer->GetRemainingSize(),
818               session_->GetDataFrameMinimumSize());
819     size_t payload_size =
820         data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
821     DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
822 
823     // Send window size is based on payload size, so nothing to do if this is
824     // just a FIN with no payload.
825     if (payload_size != 0) {
826       DecreaseSendWindowSize(static_cast<int32>(payload_size));
827       // This currently isn't strictly needed, since write frames are
828       // discarded only if the stream is about to be closed. But have it
829       // here anyway just in case this changes.
830       data_buffer->AddConsumeCallback(
831           base::Bind(&SpdyStream::OnWriteBufferConsumed,
832                      GetWeakPtr(), payload_size));
833     }
834   }
835 
836   session_->EnqueueStreamWrite(
837       GetWeakPtr(), DATA,
838       scoped_ptr<SpdyBufferProducer>(
839           new SimpleBufferProducer(data_buffer.Pass())));
840 }
841 
MergeWithResponseHeaders(const SpdyHeaderBlock & new_response_headers)842 int SpdyStream::MergeWithResponseHeaders(
843     const SpdyHeaderBlock& new_response_headers) {
844   if (new_response_headers.find("transfer-encoding") !=
845       new_response_headers.end()) {
846     session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
847                          "Received transfer-encoding header");
848     return ERR_SPDY_PROTOCOL_ERROR;
849   }
850 
851   for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
852       it != new_response_headers.end(); ++it) {
853     // Disallow uppercase headers.
854     if (ContainsUppercaseAscii(it->first)) {
855       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
856                             "Upper case characters in header: " + it->first);
857       return ERR_SPDY_PROTOCOL_ERROR;
858     }
859 
860     SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
861     // Disallow duplicate headers.  This is just to be conservative.
862     if (it2 != response_headers_.end() && it2->first == it->first) {
863       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
864                             "Duplicate header: " + it->first);
865       return ERR_SPDY_PROTOCOL_ERROR;
866     }
867 
868     response_headers_.insert(it2, *it);
869   }
870 
871   // If delegate_ is not yet attached, we'll call
872   // OnResponseHeadersUpdated() after the delegate gets attached to
873   // the stream.
874   if (delegate_) {
875     // The call to OnResponseHeadersUpdated() below may delete |this|,
876     // so use |weak_this| to detect that.
877     base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
878 
879     SpdyResponseHeadersStatus status =
880         delegate_->OnResponseHeadersUpdated(response_headers_);
881     if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
882       // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
883       // have been closed.
884       CHECK(weak_this);
885       // Incomplete headers are OK only for push streams.
886       if (type_ != SPDY_PUSH_STREAM) {
887         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
888                               "Incomplete headers");
889         return ERR_INCOMPLETE_SPDY_HEADERS;
890       }
891     } else if (weak_this) {
892       response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
893     }
894   }
895 
896   return OK;
897 }
898 
899 #define STATE_CASE(s) \
900   case s: \
901     description = base::StringPrintf("%s (0x%08X)", #s, s); \
902     break
903 
DescribeState(State state)904 std::string SpdyStream::DescribeState(State state) {
905   std::string description;
906   switch (state) {
907     STATE_CASE(STATE_IDLE);
908     STATE_CASE(STATE_OPEN);
909     STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
910     STATE_CASE(STATE_HALF_CLOSED_LOCAL);
911     STATE_CASE(STATE_CLOSED);
912     default:
913       description = base::StringPrintf("Unknown state 0x%08X (%u)", state,
914                                        state);
915       break;
916   }
917   return description;
918 }
919 
920 #undef STATE_CASE
921 
922 }  // namespace net
923