1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_stream.h"
6
7 #include "base/logging.h"
8 #include "base/message_loop.h"
9 #include "base/values.h"
10 #include "net/spdy/spdy_session.h"
11
12 namespace net {
13
14 namespace {
15
16 class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters {
17 public:
NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)18 NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,
19 int delta,
20 int window_size)
21 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
ToValue() const22 virtual Value* ToValue() const {
23 DictionaryValue* dict = new DictionaryValue();
24 dict->SetInteger("id", static_cast<int>(stream_id_));
25 dict->SetInteger("delta", delta_);
26 dict->SetInteger("window_size", window_size_);
27 return dict;
28 }
29 private:
30 const spdy::SpdyStreamId stream_id_;
31 const int delta_;
32 const int window_size_;
33 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter);
34 };
35
36 }
37
SpdyStream(SpdySession * session,spdy::SpdyStreamId stream_id,bool pushed,const BoundNetLog & net_log)38 SpdyStream::SpdyStream(SpdySession* session,
39 spdy::SpdyStreamId stream_id,
40 bool pushed,
41 const BoundNetLog& net_log)
42 : continue_buffering_data_(true),
43 stream_id_(stream_id),
44 priority_(0),
45 stalled_by_flow_control_(false),
46 send_window_size_(spdy::kSpdyStreamInitialWindowSize),
47 recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
48 pushed_(pushed),
49 response_received_(false),
50 session_(session),
51 delegate_(NULL),
52 request_time_(base::Time::Now()),
53 response_(new spdy::SpdyHeaderBlock),
54 io_state_(STATE_NONE),
55 response_status_(OK),
56 cancelled_(false),
57 has_upload_data_(false),
58 net_log_(net_log),
59 send_bytes_(0),
60 recv_bytes_(0) {
61 }
62
~SpdyStream()63 SpdyStream::~SpdyStream() {
64 UpdateHistograms();
65 }
66
SetDelegate(Delegate * delegate)67 void SpdyStream::SetDelegate(Delegate* delegate) {
68 CHECK(delegate);
69 delegate_ = delegate;
70
71 if (pushed_) {
72 CHECK(response_received());
73 MessageLoop::current()->PostTask(
74 FROM_HERE, NewRunnableMethod(this,
75 &SpdyStream::PushedStreamReplayData));
76 } else {
77 continue_buffering_data_ = false;
78 }
79 }
80
PushedStreamReplayData()81 void SpdyStream::PushedStreamReplayData() {
82 if (cancelled_ || !delegate_)
83 return;
84
85 continue_buffering_data_ = false;
86
87 int rv = delegate_->OnResponseReceived(*response_, response_time_, OK);
88 if (rv == ERR_INCOMPLETE_SPDY_HEADERS) {
89 // We don't have complete headers. Assume we're waiting for another
90 // HEADERS frame. Since we don't have headers, we had better not have
91 // any pending data frames.
92 DCHECK_EQ(0U, pending_buffers_.size());
93 return;
94 }
95
96 std::vector<scoped_refptr<IOBufferWithSize> > buffers;
97 buffers.swap(pending_buffers_);
98 for (size_t i = 0; i < buffers.size(); ++i) {
99 // It is always possible that a callback to the delegate results in
100 // the delegate no longer being available.
101 if (!delegate_)
102 break;
103 if (buffers[i]) {
104 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
105 } else {
106 delegate_->OnDataReceived(NULL, 0);
107 session_->CloseStream(stream_id_, net::OK);
108 // Note: |this| may be deleted after calling CloseStream.
109 DCHECK_EQ(buffers.size() - 1, i);
110 }
111 }
112 }
113
DetachDelegate()114 void SpdyStream::DetachDelegate() {
115 if (delegate_)
116 delegate_->set_chunk_callback(NULL);
117 delegate_ = NULL;
118 if (!closed())
119 Cancel();
120 }
121
spdy_headers() const122 const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const {
123 return request_;
124 }
125
set_spdy_headers(const linked_ptr<spdy::SpdyHeaderBlock> & headers)126 void SpdyStream::set_spdy_headers(
127 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
128 request_ = headers;
129 }
130
IncreaseSendWindowSize(int delta_window_size)131 void SpdyStream::IncreaseSendWindowSize(int delta_window_size) {
132 DCHECK_GE(delta_window_size, 1);
133 int new_window_size = send_window_size_ + delta_window_size;
134
135 // We should ignore WINDOW_UPDATEs received before or after this state,
136 // since before means we've not written SYN_STREAM yet (i.e. it's too
137 // early) and after means we've written a DATA frame with FIN bit.
138 if (io_state_ != STATE_SEND_BODY_COMPLETE)
139 return;
140
141 // it's valid for send_window_size_ to become negative (via an incoming
142 // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make
143 // it positive; however, if send_window_size_ is positive and incoming
144 // WINDOW_UPDATE makes it negative, we have an overflow.
145 if (send_window_size_ > 0 && new_window_size < 0) {
146 LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size
147 << "] for stream " << stream_id_
148 << " overflows send_window_size_ [current:"
149 << send_window_size_ << "]";
150 session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
151 return;
152 }
153
154 send_window_size_ = new_window_size;
155
156 net_log_.AddEvent(
157 NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
158 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
159 stream_id_, delta_window_size, send_window_size_)));
160 if (stalled_by_flow_control_) {
161 stalled_by_flow_control_ = false;
162 io_state_ = STATE_SEND_BODY;
163 DoLoop(OK);
164 }
165 }
166
DecreaseSendWindowSize(int delta_window_size)167 void SpdyStream::DecreaseSendWindowSize(int delta_window_size) {
168 // we only call this method when sending a frame, therefore
169 // |delta_window_size| should be within the valid frame size range.
170 DCHECK_GE(delta_window_size, 1);
171 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
172
173 // |send_window_size_| should have been at least |delta_window_size| for
174 // this call to happen.
175 DCHECK_GE(send_window_size_, delta_window_size);
176
177 send_window_size_ -= delta_window_size;
178
179 net_log_.AddEvent(
180 NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
181 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
182 stream_id_, -delta_window_size, send_window_size_)));
183 }
184
IncreaseRecvWindowSize(int delta_window_size)185 void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) {
186 DCHECK_GE(delta_window_size, 1);
187 // By the time a read is isued, stream may become inactive.
188 if (!session_->IsStreamActive(stream_id_))
189 return;
190 int new_window_size = recv_window_size_ + delta_window_size;
191 if (recv_window_size_ > 0)
192 DCHECK(new_window_size > 0);
193
194 recv_window_size_ = new_window_size;
195 net_log_.AddEvent(
196 NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
197 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
198 stream_id_, delta_window_size, recv_window_size_)));
199 session_->SendWindowUpdate(stream_id_, delta_window_size);
200 }
201
DecreaseRecvWindowSize(int delta_window_size)202 void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) {
203 DCHECK_GE(delta_window_size, 1);
204
205 recv_window_size_ -= delta_window_size;
206 net_log_.AddEvent(
207 NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
208 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
209 stream_id_, -delta_window_size, recv_window_size_)));
210
211 // Since we never decrease the initial window size, we should never hit
212 // a negative |recv_window_size_|, if we do, it's a flow-control violation.
213 if (recv_window_size_ < 0)
214 session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
215 }
216
GetPeerAddress(AddressList * address) const217 int SpdyStream::GetPeerAddress(AddressList* address) const {
218 return session_->GetPeerAddress(address);
219 }
220
GetLocalAddress(IPEndPoint * address) const221 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
222 return session_->GetLocalAddress(address);
223 }
224
WasEverUsed() const225 bool SpdyStream::WasEverUsed() const {
226 return session_->WasEverUsed();
227 }
228
GetRequestTime() const229 base::Time SpdyStream::GetRequestTime() const {
230 return request_time_;
231 }
232
SetRequestTime(base::Time t)233 void SpdyStream::SetRequestTime(base::Time t) {
234 request_time_ = t;
235 }
236
OnResponseReceived(const spdy::SpdyHeaderBlock & response)237 int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) {
238 int rv = OK;
239
240 metrics_.StartStream();
241
242 DCHECK(response_->empty());
243 *response_ = response; // TODO(ukai): avoid copy.
244
245 recv_first_byte_time_ = base::TimeTicks::Now();
246 response_time_ = base::Time::Now();
247
248 // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then
249 // the server has sent the SYN_REPLY too early.
250 if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE)
251 return ERR_SPDY_PROTOCOL_ERROR;
252 if (pushed_)
253 CHECK(io_state_ == STATE_NONE);
254 io_state_ = STATE_OPEN;
255
256 if (delegate_)
257 rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
258 // If delegate_ is not yet attached, we'll call OnResponseReceived after the
259 // delegate gets attached to the stream.
260
261 return rv;
262 }
263
OnHeaders(const spdy::SpdyHeaderBlock & headers)264 int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) {
265 DCHECK(!response_->empty());
266
267 // Append all the headers into the response header block.
268 for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin();
269 it != headers.end(); ++it) {
270 // Disallow duplicate headers. This is just to be conservative.
271 if ((*response_).find(it->first) != (*response_).end()) {
272 LOG(WARNING) << "HEADERS duplicate header";
273 response_status_ = ERR_SPDY_PROTOCOL_ERROR;
274 return ERR_SPDY_PROTOCOL_ERROR;
275 }
276
277 (*response_)[it->first] = it->second;
278 }
279
280 int rv = OK;
281 if (delegate_) {
282 rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
283 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
284 // headers before the response header block is complete.
285 if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
286 rv = OK;
287 }
288 return rv;
289 }
290
OnDataReceived(const char * data,int length)291 void SpdyStream::OnDataReceived(const char* data, int length) {
292 DCHECK_GE(length, 0);
293
294 // If we don't have a response, then the SYN_REPLY did not come through.
295 // We cannot pass data up to the caller unless the reply headers have been
296 // received.
297 if (!response_received()) {
298 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
299 return;
300 }
301
302 if (!delegate_ || continue_buffering_data_) {
303 // It should be valid for this to happen in the server push case.
304 // We'll return received data when delegate gets attached to the stream.
305 if (length > 0) {
306 IOBufferWithSize* buf = new IOBufferWithSize(length);
307 memcpy(buf->data(), data, length);
308 pending_buffers_.push_back(make_scoped_refptr(buf));
309 } else {
310 pending_buffers_.push_back(NULL);
311 metrics_.StopStream();
312 // Note: we leave the stream open in the session until the stream
313 // is claimed.
314 }
315 return;
316 }
317
318 CHECK(!closed());
319
320 // A zero-length read means that the stream is being closed.
321 if (!length) {
322 metrics_.StopStream();
323 session_->CloseStream(stream_id_, net::OK);
324 // Note: |this| may be deleted after calling CloseStream.
325 return;
326 }
327
328 if (session_->flow_control())
329 DecreaseRecvWindowSize(length);
330
331 // Track our bandwidth.
332 metrics_.RecordBytes(length);
333 recv_bytes_ += length;
334 recv_last_byte_time_ = base::TimeTicks::Now();
335
336 if (!delegate_) {
337 // It should be valid for this to happen in the server push case.
338 // We'll return received data when delegate gets attached to the stream.
339 IOBufferWithSize* buf = new IOBufferWithSize(length);
340 memcpy(buf->data(), data, length);
341 pending_buffers_.push_back(make_scoped_refptr(buf));
342 return;
343 }
344
345 delegate_->OnDataReceived(data, length);
346 }
347
348 // This function is only called when an entire frame is written.
OnWriteComplete(int bytes)349 void SpdyStream::OnWriteComplete(int bytes) {
350 DCHECK_LE(0, bytes);
351 send_bytes_ += bytes;
352 if (cancelled() || closed())
353 return;
354 DoLoop(bytes);
355 }
356
OnChunkAvailable()357 void SpdyStream::OnChunkAvailable() {
358 DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY ||
359 io_state_ == STATE_SEND_BODY_COMPLETE);
360 if (io_state_ == STATE_SEND_BODY)
361 OnWriteComplete(0);
362 }
363
OnClose(int status)364 void SpdyStream::OnClose(int status) {
365 io_state_ = STATE_DONE;
366 response_status_ = status;
367 Delegate* delegate = delegate_;
368 delegate_ = NULL;
369 if (delegate) {
370 delegate->set_chunk_callback(NULL);
371 delegate->OnClose(status);
372 }
373 }
374
Cancel()375 void SpdyStream::Cancel() {
376 if (cancelled())
377 return;
378
379 cancelled_ = true;
380 if (session_->IsStreamActive(stream_id_))
381 session_->ResetStream(stream_id_, spdy::CANCEL);
382 }
383
SendRequest(bool has_upload_data)384 int SpdyStream::SendRequest(bool has_upload_data) {
385 if (delegate_)
386 delegate_->set_chunk_callback(this);
387
388 // Pushed streams do not send any data, and should always be in STATE_OPEN or
389 // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push
390 // behavior.
391 has_upload_data_ = has_upload_data;
392 if (pushed_) {
393 send_time_ = base::TimeTicks::Now();
394 DCHECK(!has_upload_data_);
395 DCHECK(response_received());
396 return ERR_IO_PENDING;
397 }
398 CHECK_EQ(STATE_NONE, io_state_);
399 io_state_ = STATE_SEND_HEADERS;
400 return DoLoop(OK);
401 }
402
WriteStreamData(IOBuffer * data,int length,spdy::SpdyDataFlags flags)403 int SpdyStream::WriteStreamData(IOBuffer* data, int length,
404 spdy::SpdyDataFlags flags) {
405 return session_->WriteStreamData(stream_id_, data, length, flags);
406 }
407
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)408 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
409 return session_->GetSSLInfo(ssl_info, was_npn_negotiated);
410 }
411
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)412 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
413 return session_->GetSSLCertRequestInfo(cert_request_info);
414 }
415
HasUrl() const416 bool SpdyStream::HasUrl() const {
417 if (pushed_)
418 return response_received();
419 return request_.get() != NULL;
420 }
421
GetUrl() const422 GURL SpdyStream::GetUrl() const {
423 DCHECK(HasUrl());
424
425 if (pushed_) {
426 // assemble from the response
427 std::string url;
428 spdy::SpdyHeaderBlock::const_iterator it;
429 it = response_->find("url");
430 if (it != (*response_).end())
431 url = it->second;
432 return GURL(url);
433 }
434
435 // assemble from the request
436 std::string scheme;
437 std::string host_port;
438 std::string path;
439 spdy::SpdyHeaderBlock::const_iterator it;
440 it = request_->find("scheme");
441 if (it != (*request_).end())
442 scheme = it->second;
443 it = request_->find("host");
444 if (it != (*request_).end())
445 host_port = it->second;
446 it = request_->find("path");
447 if (it != (*request_).end())
448 path = it->second;
449 std::string url = scheme + "://" + host_port + path;
450 return GURL(url);
451 }
452
DoLoop(int result)453 int SpdyStream::DoLoop(int result) {
454 do {
455 State state = io_state_;
456 io_state_ = STATE_NONE;
457 switch (state) {
458 // State machine 1: Send headers and body.
459 case STATE_SEND_HEADERS:
460 CHECK_EQ(OK, result);
461 result = DoSendHeaders();
462 break;
463 case STATE_SEND_HEADERS_COMPLETE:
464 result = DoSendHeadersComplete(result);
465 break;
466 case STATE_SEND_BODY:
467 CHECK_EQ(OK, result);
468 result = DoSendBody();
469 break;
470 case STATE_SEND_BODY_COMPLETE:
471 result = DoSendBodyComplete(result);
472 break;
473 // This is an intermediary waiting state. This state is reached when all
474 // data has been sent, but no data has been received.
475 case STATE_WAITING_FOR_RESPONSE:
476 io_state_ = STATE_WAITING_FOR_RESPONSE;
477 result = ERR_IO_PENDING;
478 break;
479 // State machine 2: connection is established.
480 // In STATE_OPEN, OnResponseReceived has already been called.
481 // OnDataReceived, OnClose and OnWriteCompelte can be called.
482 // Only OnWriteCompletee calls DoLoop(().
483 //
484 // For HTTP streams, no data is sent from the client while in the OPEN
485 // state, so OnWriteComplete is never called here. The HTTP body is
486 // handled in the OnDataReceived callback, which does not call into
487 // DoLoop.
488 //
489 // For WebSocket streams, which are bi-directional, we'll send and
490 // receive data once the connection is established. Received data is
491 // handled in OnDataReceived. Sent data is handled in OnWriteComplete,
492 // which calls DoOpen().
493 case STATE_OPEN:
494 result = DoOpen(result);
495 break;
496
497 case STATE_DONE:
498 DCHECK(result != ERR_IO_PENDING);
499 break;
500 default:
501 NOTREACHED() << io_state_;
502 break;
503 }
504 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
505 io_state_ != STATE_OPEN);
506
507 return result;
508 }
509
DoSendHeaders()510 int SpdyStream::DoSendHeaders() {
511 CHECK(!cancelled_);
512
513 spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE;
514 if (!has_upload_data_)
515 flags = spdy::CONTROL_FLAG_FIN;
516
517 CHECK(request_.get());
518 int result = session_->WriteSynStream(
519 stream_id_, static_cast<RequestPriority>(priority_), flags,
520 request_);
521 if (result != ERR_IO_PENDING)
522 return result;
523
524 send_time_ = base::TimeTicks::Now();
525 io_state_ = STATE_SEND_HEADERS_COMPLETE;
526 return ERR_IO_PENDING;
527 }
528
DoSendHeadersComplete(int result)529 int SpdyStream::DoSendHeadersComplete(int result) {
530 if (result < 0)
531 return result;
532
533 CHECK_GT(result, 0);
534
535 if (!delegate_)
536 return ERR_UNEXPECTED;
537
538 // There is no body, skip that state.
539 if (delegate_->OnSendHeadersComplete(result)) {
540 io_state_ = STATE_WAITING_FOR_RESPONSE;
541 return OK;
542 }
543
544 io_state_ = STATE_SEND_BODY;
545 return OK;
546 }
547
548 // DoSendBody is called to send the optional body for the request. This call
549 // will also be called as each write of a chunk of the body completes.
DoSendBody()550 int SpdyStream::DoSendBody() {
551 // If we're already in the STATE_SENDING_BODY state, then we've already
552 // sent a portion of the body. In that case, we need to first consume
553 // the bytes written in the body stream. Note that the bytes written is
554 // the number of bytes in the frame that were written, only consume the
555 // data portion, of course.
556 io_state_ = STATE_SEND_BODY_COMPLETE;
557 if (!delegate_)
558 return ERR_UNEXPECTED;
559 return delegate_->OnSendBody();
560 }
561
DoSendBodyComplete(int result)562 int SpdyStream::DoSendBodyComplete(int result) {
563 if (result < 0)
564 return result;
565
566 if (!delegate_)
567 return ERR_UNEXPECTED;
568
569 bool eof = false;
570 result = delegate_->OnSendBodyComplete(result, &eof);
571 if (!eof)
572 io_state_ = STATE_SEND_BODY;
573 else
574 io_state_ = STATE_WAITING_FOR_RESPONSE;
575
576 return result;
577 }
578
DoOpen(int result)579 int SpdyStream::DoOpen(int result) {
580 if (delegate_)
581 delegate_->OnDataSent(result);
582 io_state_ = STATE_OPEN;
583 return result;
584 }
585
UpdateHistograms()586 void SpdyStream::UpdateHistograms() {
587 // We need all timers to be filled in, otherwise metrics can be bogus.
588 if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
589 recv_last_byte_time_.is_null())
590 return;
591
592 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
593 recv_first_byte_time_ - send_time_);
594 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
595 recv_last_byte_time_ - recv_first_byte_time_);
596 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
597 recv_last_byte_time_ - send_time_);
598
599 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
600 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
601 }
602
603 } // namespace net
604