• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_stream.h"
6 
7 #include "base/logging.h"
8 #include "base/message_loop.h"
9 #include "base/values.h"
10 #include "net/spdy/spdy_session.h"
11 
12 namespace net {
13 
14 namespace {
15 
16 class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters {
17  public:
NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)18   NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,
19                                         int delta,
20                                         int window_size)
21       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
ToValue() const22   virtual Value* ToValue() const {
23     DictionaryValue* dict = new DictionaryValue();
24     dict->SetInteger("id", static_cast<int>(stream_id_));
25     dict->SetInteger("delta", delta_);
26     dict->SetInteger("window_size", window_size_);
27     return dict;
28   }
29  private:
30   const spdy::SpdyStreamId stream_id_;
31   const int delta_;
32   const int window_size_;
33   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter);
34 };
35 
36 }
37 
SpdyStream(SpdySession * session,spdy::SpdyStreamId stream_id,bool pushed,const BoundNetLog & net_log)38 SpdyStream::SpdyStream(SpdySession* session,
39                        spdy::SpdyStreamId stream_id,
40                        bool pushed,
41                        const BoundNetLog& net_log)
42     : continue_buffering_data_(true),
43       stream_id_(stream_id),
44       priority_(0),
45       stalled_by_flow_control_(false),
46       send_window_size_(spdy::kSpdyStreamInitialWindowSize),
47       recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
48       pushed_(pushed),
49       response_received_(false),
50       session_(session),
51       delegate_(NULL),
52       request_time_(base::Time::Now()),
53       response_(new spdy::SpdyHeaderBlock),
54       io_state_(STATE_NONE),
55       response_status_(OK),
56       cancelled_(false),
57       has_upload_data_(false),
58       net_log_(net_log),
59       send_bytes_(0),
60       recv_bytes_(0) {
61 }
62 
~SpdyStream()63 SpdyStream::~SpdyStream() {
64   UpdateHistograms();
65 }
66 
SetDelegate(Delegate * delegate)67 void SpdyStream::SetDelegate(Delegate* delegate) {
68   CHECK(delegate);
69   delegate_ = delegate;
70 
71   if (pushed_) {
72     CHECK(response_received());
73     MessageLoop::current()->PostTask(
74         FROM_HERE, NewRunnableMethod(this,
75                                      &SpdyStream::PushedStreamReplayData));
76   } else {
77     continue_buffering_data_ = false;
78   }
79 }
80 
PushedStreamReplayData()81 void SpdyStream::PushedStreamReplayData() {
82   if (cancelled_ || !delegate_)
83     return;
84 
85   continue_buffering_data_ = false;
86 
87   int rv = delegate_->OnResponseReceived(*response_, response_time_, OK);
88   if (rv == ERR_INCOMPLETE_SPDY_HEADERS) {
89     // We don't have complete headers.  Assume we're waiting for another
90     // HEADERS frame.  Since we don't have headers, we had better not have
91     // any pending data frames.
92     DCHECK_EQ(0U, pending_buffers_.size());
93     return;
94   }
95 
96   std::vector<scoped_refptr<IOBufferWithSize> > buffers;
97   buffers.swap(pending_buffers_);
98   for (size_t i = 0; i < buffers.size(); ++i) {
99     // It is always possible that a callback to the delegate results in
100     // the delegate no longer being available.
101     if (!delegate_)
102       break;
103     if (buffers[i]) {
104       delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
105     } else {
106       delegate_->OnDataReceived(NULL, 0);
107       session_->CloseStream(stream_id_, net::OK);
108       // Note: |this| may be deleted after calling CloseStream.
109       DCHECK_EQ(buffers.size() - 1, i);
110     }
111   }
112 }
113 
DetachDelegate()114 void SpdyStream::DetachDelegate() {
115   if (delegate_)
116     delegate_->set_chunk_callback(NULL);
117   delegate_ = NULL;
118   if (!closed())
119     Cancel();
120 }
121 
spdy_headers() const122 const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const {
123   return request_;
124 }
125 
set_spdy_headers(const linked_ptr<spdy::SpdyHeaderBlock> & headers)126 void SpdyStream::set_spdy_headers(
127     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
128   request_ = headers;
129 }
130 
IncreaseSendWindowSize(int delta_window_size)131 void SpdyStream::IncreaseSendWindowSize(int delta_window_size) {
132   DCHECK_GE(delta_window_size, 1);
133   int new_window_size = send_window_size_ + delta_window_size;
134 
135   // We should ignore WINDOW_UPDATEs received before or after this state,
136   // since before means we've not written SYN_STREAM yet (i.e. it's too
137   // early) and after means we've written a DATA frame with FIN bit.
138   if (io_state_ != STATE_SEND_BODY_COMPLETE)
139     return;
140 
141   // it's valid for send_window_size_ to become negative (via an incoming
142   // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make
143   // it positive; however, if send_window_size_ is positive and incoming
144   // WINDOW_UPDATE makes it negative, we have an overflow.
145   if (send_window_size_ > 0 && new_window_size < 0) {
146     LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size
147                  << "] for stream " << stream_id_
148                  << " overflows send_window_size_ [current:"
149                  << send_window_size_ << "]";
150     session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
151     return;
152   }
153 
154   send_window_size_ = new_window_size;
155 
156   net_log_.AddEvent(
157       NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
158       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
159           stream_id_, delta_window_size, send_window_size_)));
160   if (stalled_by_flow_control_) {
161     stalled_by_flow_control_ = false;
162     io_state_ = STATE_SEND_BODY;
163     DoLoop(OK);
164   }
165 }
166 
DecreaseSendWindowSize(int delta_window_size)167 void SpdyStream::DecreaseSendWindowSize(int delta_window_size) {
168   // we only call this method when sending a frame, therefore
169   // |delta_window_size| should be within the valid frame size range.
170   DCHECK_GE(delta_window_size, 1);
171   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
172 
173   // |send_window_size_| should have been at least |delta_window_size| for
174   // this call to happen.
175   DCHECK_GE(send_window_size_, delta_window_size);
176 
177   send_window_size_ -= delta_window_size;
178 
179   net_log_.AddEvent(
180       NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
181       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
182           stream_id_, -delta_window_size, send_window_size_)));
183 }
184 
IncreaseRecvWindowSize(int delta_window_size)185 void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) {
186   DCHECK_GE(delta_window_size, 1);
187   // By the time a read is isued, stream may become inactive.
188   if (!session_->IsStreamActive(stream_id_))
189     return;
190   int new_window_size = recv_window_size_ + delta_window_size;
191   if (recv_window_size_ > 0)
192     DCHECK(new_window_size > 0);
193 
194   recv_window_size_ = new_window_size;
195   net_log_.AddEvent(
196       NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
197       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
198           stream_id_, delta_window_size, recv_window_size_)));
199   session_->SendWindowUpdate(stream_id_, delta_window_size);
200 }
201 
DecreaseRecvWindowSize(int delta_window_size)202 void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) {
203   DCHECK_GE(delta_window_size, 1);
204 
205   recv_window_size_ -= delta_window_size;
206   net_log_.AddEvent(
207       NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
208       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
209           stream_id_, -delta_window_size, recv_window_size_)));
210 
211   // Since we never decrease the initial window size, we should never hit
212   // a negative |recv_window_size_|, if we do, it's a flow-control violation.
213   if (recv_window_size_ < 0)
214     session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
215 }
216 
GetPeerAddress(AddressList * address) const217 int SpdyStream::GetPeerAddress(AddressList* address) const {
218   return session_->GetPeerAddress(address);
219 }
220 
GetLocalAddress(IPEndPoint * address) const221 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
222   return session_->GetLocalAddress(address);
223 }
224 
WasEverUsed() const225 bool SpdyStream::WasEverUsed() const {
226   return session_->WasEverUsed();
227 }
228 
GetRequestTime() const229 base::Time SpdyStream::GetRequestTime() const {
230   return request_time_;
231 }
232 
SetRequestTime(base::Time t)233 void SpdyStream::SetRequestTime(base::Time t) {
234   request_time_ = t;
235 }
236 
OnResponseReceived(const spdy::SpdyHeaderBlock & response)237 int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) {
238   int rv = OK;
239 
240   metrics_.StartStream();
241 
242   DCHECK(response_->empty());
243   *response_ = response;  // TODO(ukai): avoid copy.
244 
245   recv_first_byte_time_ = base::TimeTicks::Now();
246   response_time_ = base::Time::Now();
247 
248   // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then
249   // the server has sent the SYN_REPLY too early.
250   if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE)
251     return ERR_SPDY_PROTOCOL_ERROR;
252   if (pushed_)
253     CHECK(io_state_ == STATE_NONE);
254   io_state_ = STATE_OPEN;
255 
256   if (delegate_)
257     rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
258   // If delegate_ is not yet attached, we'll call OnResponseReceived after the
259   // delegate gets attached to the stream.
260 
261   return rv;
262 }
263 
OnHeaders(const spdy::SpdyHeaderBlock & headers)264 int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) {
265   DCHECK(!response_->empty());
266 
267   // Append all the headers into the response header block.
268   for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin();
269       it != headers.end(); ++it) {
270     // Disallow duplicate headers.  This is just to be conservative.
271     if ((*response_).find(it->first) != (*response_).end()) {
272       LOG(WARNING) << "HEADERS duplicate header";
273       response_status_ = ERR_SPDY_PROTOCOL_ERROR;
274       return ERR_SPDY_PROTOCOL_ERROR;
275     }
276 
277     (*response_)[it->first] = it->second;
278   }
279 
280   int rv = OK;
281   if (delegate_) {
282     rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
283     // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
284     // headers before the response header block is complete.
285     if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
286       rv = OK;
287   }
288   return rv;
289 }
290 
OnDataReceived(const char * data,int length)291 void SpdyStream::OnDataReceived(const char* data, int length) {
292   DCHECK_GE(length, 0);
293 
294   // If we don't have a response, then the SYN_REPLY did not come through.
295   // We cannot pass data up to the caller unless the reply headers have been
296   // received.
297   if (!response_received()) {
298     session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
299     return;
300   }
301 
302   if (!delegate_ || continue_buffering_data_) {
303     // It should be valid for this to happen in the server push case.
304     // We'll return received data when delegate gets attached to the stream.
305     if (length > 0) {
306       IOBufferWithSize* buf = new IOBufferWithSize(length);
307       memcpy(buf->data(), data, length);
308       pending_buffers_.push_back(make_scoped_refptr(buf));
309     } else {
310       pending_buffers_.push_back(NULL);
311       metrics_.StopStream();
312       // Note: we leave the stream open in the session until the stream
313       //       is claimed.
314     }
315     return;
316   }
317 
318   CHECK(!closed());
319 
320   // A zero-length read means that the stream is being closed.
321   if (!length) {
322     metrics_.StopStream();
323     session_->CloseStream(stream_id_, net::OK);
324     // Note: |this| may be deleted after calling CloseStream.
325     return;
326   }
327 
328   if (session_->flow_control())
329     DecreaseRecvWindowSize(length);
330 
331   // Track our bandwidth.
332   metrics_.RecordBytes(length);
333   recv_bytes_ += length;
334   recv_last_byte_time_ = base::TimeTicks::Now();
335 
336   if (!delegate_) {
337     // It should be valid for this to happen in the server push case.
338     // We'll return received data when delegate gets attached to the stream.
339     IOBufferWithSize* buf = new IOBufferWithSize(length);
340     memcpy(buf->data(), data, length);
341     pending_buffers_.push_back(make_scoped_refptr(buf));
342     return;
343   }
344 
345   delegate_->OnDataReceived(data, length);
346 }
347 
348 // This function is only called when an entire frame is written.
OnWriteComplete(int bytes)349 void SpdyStream::OnWriteComplete(int bytes) {
350   DCHECK_LE(0, bytes);
351   send_bytes_ += bytes;
352   if (cancelled() || closed())
353     return;
354   DoLoop(bytes);
355 }
356 
OnChunkAvailable()357 void SpdyStream::OnChunkAvailable() {
358   DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY ||
359          io_state_ == STATE_SEND_BODY_COMPLETE);
360   if (io_state_ == STATE_SEND_BODY)
361     OnWriteComplete(0);
362 }
363 
OnClose(int status)364 void SpdyStream::OnClose(int status) {
365   io_state_ = STATE_DONE;
366   response_status_ = status;
367   Delegate* delegate = delegate_;
368   delegate_ = NULL;
369   if (delegate) {
370     delegate->set_chunk_callback(NULL);
371     delegate->OnClose(status);
372   }
373 }
374 
Cancel()375 void SpdyStream::Cancel() {
376   if (cancelled())
377     return;
378 
379   cancelled_ = true;
380   if (session_->IsStreamActive(stream_id_))
381     session_->ResetStream(stream_id_, spdy::CANCEL);
382 }
383 
SendRequest(bool has_upload_data)384 int SpdyStream::SendRequest(bool has_upload_data) {
385   if (delegate_)
386     delegate_->set_chunk_callback(this);
387 
388   // Pushed streams do not send any data, and should always be in STATE_OPEN or
389   // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push
390   // behavior.
391   has_upload_data_ = has_upload_data;
392   if (pushed_) {
393     send_time_ = base::TimeTicks::Now();
394     DCHECK(!has_upload_data_);
395     DCHECK(response_received());
396     return ERR_IO_PENDING;
397   }
398   CHECK_EQ(STATE_NONE, io_state_);
399   io_state_ = STATE_SEND_HEADERS;
400   return DoLoop(OK);
401 }
402 
WriteStreamData(IOBuffer * data,int length,spdy::SpdyDataFlags flags)403 int SpdyStream::WriteStreamData(IOBuffer* data, int length,
404                                 spdy::SpdyDataFlags flags) {
405   return session_->WriteStreamData(stream_id_, data, length, flags);
406 }
407 
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)408 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
409   return session_->GetSSLInfo(ssl_info, was_npn_negotiated);
410 }
411 
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)412 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
413   return session_->GetSSLCertRequestInfo(cert_request_info);
414 }
415 
HasUrl() const416 bool SpdyStream::HasUrl() const {
417   if (pushed_)
418     return response_received();
419   return request_.get() != NULL;
420 }
421 
GetUrl() const422 GURL SpdyStream::GetUrl() const {
423   DCHECK(HasUrl());
424 
425   if (pushed_) {
426     // assemble from the response
427     std::string url;
428     spdy::SpdyHeaderBlock::const_iterator it;
429     it = response_->find("url");
430     if (it != (*response_).end())
431       url = it->second;
432     return GURL(url);
433   }
434 
435   // assemble from the request
436   std::string scheme;
437   std::string host_port;
438   std::string path;
439   spdy::SpdyHeaderBlock::const_iterator it;
440   it = request_->find("scheme");
441   if (it != (*request_).end())
442     scheme = it->second;
443   it = request_->find("host");
444   if (it != (*request_).end())
445     host_port = it->second;
446   it = request_->find("path");
447   if (it != (*request_).end())
448     path = it->second;
449   std::string url = scheme + "://" + host_port + path;
450   return GURL(url);
451 }
452 
DoLoop(int result)453 int SpdyStream::DoLoop(int result) {
454   do {
455     State state = io_state_;
456     io_state_ = STATE_NONE;
457     switch (state) {
458       // State machine 1: Send headers and body.
459       case STATE_SEND_HEADERS:
460         CHECK_EQ(OK, result);
461         result = DoSendHeaders();
462         break;
463       case STATE_SEND_HEADERS_COMPLETE:
464         result = DoSendHeadersComplete(result);
465         break;
466       case STATE_SEND_BODY:
467         CHECK_EQ(OK, result);
468         result = DoSendBody();
469         break;
470       case STATE_SEND_BODY_COMPLETE:
471         result = DoSendBodyComplete(result);
472         break;
473       // This is an intermediary waiting state. This state is reached when all
474       // data has been sent, but no data has been received.
475       case STATE_WAITING_FOR_RESPONSE:
476         io_state_ = STATE_WAITING_FOR_RESPONSE;
477         result = ERR_IO_PENDING;
478         break;
479       // State machine 2: connection is established.
480       // In STATE_OPEN, OnResponseReceived has already been called.
481       // OnDataReceived, OnClose and OnWriteCompelte can be called.
482       // Only OnWriteCompletee calls DoLoop(().
483       //
484       // For HTTP streams, no data is sent from the client while in the OPEN
485       // state, so OnWriteComplete is never called here.  The HTTP body is
486       // handled in the OnDataReceived callback, which does not call into
487       // DoLoop.
488       //
489       // For WebSocket streams, which are bi-directional, we'll send and
490       // receive data once the connection is established.  Received data is
491       // handled in OnDataReceived.  Sent data is handled in OnWriteComplete,
492       // which calls DoOpen().
493       case STATE_OPEN:
494         result = DoOpen(result);
495         break;
496 
497       case STATE_DONE:
498         DCHECK(result != ERR_IO_PENDING);
499         break;
500       default:
501         NOTREACHED() << io_state_;
502         break;
503     }
504   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
505            io_state_ != STATE_OPEN);
506 
507   return result;
508 }
509 
DoSendHeaders()510 int SpdyStream::DoSendHeaders() {
511   CHECK(!cancelled_);
512 
513   spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE;
514   if (!has_upload_data_)
515     flags = spdy::CONTROL_FLAG_FIN;
516 
517   CHECK(request_.get());
518   int result = session_->WriteSynStream(
519       stream_id_, static_cast<RequestPriority>(priority_), flags,
520       request_);
521   if (result != ERR_IO_PENDING)
522     return result;
523 
524   send_time_ = base::TimeTicks::Now();
525   io_state_ = STATE_SEND_HEADERS_COMPLETE;
526   return ERR_IO_PENDING;
527 }
528 
DoSendHeadersComplete(int result)529 int SpdyStream::DoSendHeadersComplete(int result) {
530   if (result < 0)
531     return result;
532 
533   CHECK_GT(result, 0);
534 
535   if (!delegate_)
536     return ERR_UNEXPECTED;
537 
538   // There is no body, skip that state.
539   if (delegate_->OnSendHeadersComplete(result)) {
540     io_state_ = STATE_WAITING_FOR_RESPONSE;
541     return OK;
542   }
543 
544   io_state_ = STATE_SEND_BODY;
545   return OK;
546 }
547 
548 // DoSendBody is called to send the optional body for the request.  This call
549 // will also be called as each write of a chunk of the body completes.
DoSendBody()550 int SpdyStream::DoSendBody() {
551   // If we're already in the STATE_SENDING_BODY state, then we've already
552   // sent a portion of the body.  In that case, we need to first consume
553   // the bytes written in the body stream.  Note that the bytes written is
554   // the number of bytes in the frame that were written, only consume the
555   // data portion, of course.
556   io_state_ = STATE_SEND_BODY_COMPLETE;
557   if (!delegate_)
558     return ERR_UNEXPECTED;
559   return delegate_->OnSendBody();
560 }
561 
DoSendBodyComplete(int result)562 int SpdyStream::DoSendBodyComplete(int result) {
563   if (result < 0)
564     return result;
565 
566   if (!delegate_)
567     return ERR_UNEXPECTED;
568 
569   bool eof = false;
570   result = delegate_->OnSendBodyComplete(result, &eof);
571   if (!eof)
572     io_state_ = STATE_SEND_BODY;
573   else
574     io_state_ = STATE_WAITING_FOR_RESPONSE;
575 
576   return result;
577 }
578 
DoOpen(int result)579 int SpdyStream::DoOpen(int result) {
580   if (delegate_)
581     delegate_->OnDataSent(result);
582   io_state_ = STATE_OPEN;
583   return result;
584 }
585 
UpdateHistograms()586 void SpdyStream::UpdateHistograms() {
587   // We need all timers to be filled in, otherwise metrics can be bogus.
588   if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
589       recv_last_byte_time_.is_null())
590     return;
591 
592   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
593       recv_first_byte_time_ - send_time_);
594   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
595       recv_last_byte_time_ - recv_first_byte_time_);
596   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
597       recv_last_byte_time_ - send_time_);
598 
599   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
600   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
601 }
602 
603 }  // namespace net
604