• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_http_stream.h"
6 
7 #include <algorithm>
8 #include <list>
9 
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/strings/stringprintf.h"
14 #include "net/base/host_port_pair.h"
15 #include "net/base/net_log.h"
16 #include "net/base/net_util.h"
17 #include "net/base/upload_data_stream.h"
18 #include "net/http/http_request_headers.h"
19 #include "net/http/http_request_info.h"
20 #include "net/http/http_response_info.h"
21 #include "net/spdy/spdy_header_block.h"
22 #include "net/spdy/spdy_http_utils.h"
23 #include "net/spdy/spdy_protocol.h"
24 #include "net/spdy/spdy_session.h"
25 
26 namespace net {
27 
SpdyHttpStream(const base::WeakPtr<SpdySession> & spdy_session,bool direct)28 SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
29                                bool direct)
30     : spdy_session_(spdy_session),
31       is_reused_(spdy_session_->IsReused()),
32       stream_closed_(false),
33       closed_stream_status_(ERR_FAILED),
34       closed_stream_id_(0),
35       closed_stream_received_bytes_(0),
36       request_info_(NULL),
37       response_info_(NULL),
38       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
39       user_buffer_len_(0),
40       request_body_buf_size_(0),
41       buffered_read_callback_pending_(false),
42       more_read_data_pending_(false),
43       direct_(direct),
44       weak_factory_(this) {
45   DCHECK(spdy_session_.get());
46 }
47 
~SpdyHttpStream()48 SpdyHttpStream::~SpdyHttpStream() {
49   if (stream_.get()) {
50     stream_->DetachDelegate();
51     DCHECK(!stream_.get());
52   }
53 }
54 
InitializeStream(const HttpRequestInfo * request_info,RequestPriority priority,const BoundNetLog & stream_net_log,const CompletionCallback & callback)55 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
56                                      RequestPriority priority,
57                                      const BoundNetLog& stream_net_log,
58                                      const CompletionCallback& callback) {
59   DCHECK(!stream_);
60   if (!spdy_session_)
61     return ERR_CONNECTION_CLOSED;
62 
63   request_info_ = request_info;
64   if (request_info_->method == "GET") {
65     int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
66                                              stream_net_log);
67     if (error != OK)
68       return error;
69 
70     // |stream_| may be NULL even if OK was returned.
71     if (stream_.get()) {
72       DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
73       stream_->SetDelegate(this);
74       return OK;
75     }
76   }
77 
78   int rv = stream_request_.StartRequest(
79       SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
80       priority, stream_net_log,
81       base::Bind(&SpdyHttpStream::OnStreamCreated,
82                  weak_factory_.GetWeakPtr(), callback));
83 
84   if (rv == OK) {
85     stream_ = stream_request_.ReleaseStream();
86     stream_->SetDelegate(this);
87   }
88 
89   return rv;
90 }
91 
GetUploadProgress() const92 UploadProgress SpdyHttpStream::GetUploadProgress() const {
93   if (!request_info_ || !HasUploadData())
94     return UploadProgress();
95 
96   return UploadProgress(request_info_->upload_data_stream->position(),
97                         request_info_->upload_data_stream->size());
98 }
99 
ReadResponseHeaders(const CompletionCallback & callback)100 int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
101   CHECK(!callback.is_null());
102   if (stream_closed_)
103     return closed_stream_status_;
104 
105   CHECK(stream_.get());
106 
107   // Check if we already have the response headers. If so, return synchronously.
108   if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
109     CHECK(!stream_->IsIdle());
110     return OK;
111   }
112 
113   // Still waiting for the response, return IO_PENDING.
114   CHECK(callback_.is_null());
115   callback_ = callback;
116   return ERR_IO_PENDING;
117 }
118 
ReadResponseBody(IOBuffer * buf,int buf_len,const CompletionCallback & callback)119 int SpdyHttpStream::ReadResponseBody(
120     IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
121   if (stream_.get())
122     CHECK(!stream_->IsIdle());
123 
124   CHECK(buf);
125   CHECK(buf_len);
126   CHECK(!callback.is_null());
127 
128   // If we have data buffered, complete the IO immediately.
129   if (!response_body_queue_.IsEmpty()) {
130     return response_body_queue_.Dequeue(buf->data(), buf_len);
131   } else if (stream_closed_) {
132     return closed_stream_status_;
133   }
134 
135   CHECK(callback_.is_null());
136   CHECK(!user_buffer_.get());
137   CHECK_EQ(0, user_buffer_len_);
138 
139   callback_ = callback;
140   user_buffer_ = buf;
141   user_buffer_len_ = buf_len;
142   return ERR_IO_PENDING;
143 }
144 
Close(bool not_reusable)145 void SpdyHttpStream::Close(bool not_reusable) {
146   // Note: the not_reusable flag has no meaning for SPDY streams.
147 
148   Cancel();
149   DCHECK(!stream_.get());
150 }
151 
RenewStreamForAuth()152 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
153   return NULL;
154 }
155 
IsResponseBodyComplete() const156 bool SpdyHttpStream::IsResponseBodyComplete() const {
157   return stream_closed_;
158 }
159 
CanFindEndOfResponse() const160 bool SpdyHttpStream::CanFindEndOfResponse() const {
161   return true;
162 }
163 
IsConnectionReused() const164 bool SpdyHttpStream::IsConnectionReused() const {
165   return is_reused_;
166 }
167 
SetConnectionReused()168 void SpdyHttpStream::SetConnectionReused() {
169   // SPDY doesn't need an indicator here.
170 }
171 
IsConnectionReusable() const172 bool SpdyHttpStream::IsConnectionReusable() const {
173   // SPDY streams aren't considered reusable.
174   return false;
175 }
176 
GetTotalReceivedBytes() const177 int64 SpdyHttpStream::GetTotalReceivedBytes() const {
178   if (stream_closed_)
179     return closed_stream_received_bytes_;
180 
181   if (!stream_)
182     return 0;
183 
184   return stream_->raw_received_bytes();
185 }
186 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const187 bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
188   if (stream_closed_) {
189     if (!closed_stream_has_load_timing_info_)
190       return false;
191     *load_timing_info = closed_stream_load_timing_info_;
192     return true;
193   }
194 
195   // If |stream_| has yet to be created, or does not yet have an ID, fail.
196   // The reused flag can only be correctly set once a stream has an ID.  Streams
197   // get their IDs once the request has been successfully sent, so this does not
198   // behave that differently from other stream types.
199   if (!stream_ || stream_->stream_id() == 0)
200     return false;
201 
202   return stream_->GetLoadTimingInfo(load_timing_info);
203 }
204 
SendRequest(const HttpRequestHeaders & request_headers,HttpResponseInfo * response,const CompletionCallback & callback)205 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
206                                 HttpResponseInfo* response,
207                                 const CompletionCallback& callback) {
208   if (stream_closed_) {
209     return closed_stream_status_;
210   }
211 
212   base::Time request_time = base::Time::Now();
213   CHECK(stream_.get());
214 
215   stream_->SetRequestTime(request_time);
216   // This should only get called in the case of a request occurring
217   // during server push that has already begun but hasn't finished,
218   // so we set the response's request time to be the actual one
219   if (response_info_)
220     response_info_->request_time = request_time;
221 
222   CHECK(!request_body_buf_.get());
223   if (HasUploadData()) {
224     // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
225     // body data is written with this size at a time.
226     request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
227     // The request body buffer is empty at first.
228     request_body_buf_size_ = 0;
229   }
230 
231   CHECK(!callback.is_null());
232   CHECK(response);
233 
234   // SendRequest can be called in two cases.
235   //
236   // a) A client initiated request. In this case, |response_info_| should be
237   //    NULL to start with.
238   // b) A client request which matches a response that the server has already
239   //    pushed.
240   if (push_response_info_.get()) {
241     *response = *(push_response_info_.get());
242     push_response_info_.reset();
243   } else {
244     DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
245   }
246 
247   response_info_ = response;
248 
249   // Put the peer's IP address and port into the response.
250   IPEndPoint address;
251   int result = stream_->GetPeerAddress(&address);
252   if (result != OK)
253     return result;
254   response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
255 
256   if (stream_->type() == SPDY_PUSH_STREAM) {
257     // Pushed streams do not send any data, and should always be
258     // idle. However, we still want to return ERR_IO_PENDING to mimic
259     // non-push behavior. The callback will be called when the
260     // response is received.
261     result = ERR_IO_PENDING;
262   } else {
263     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
264     CreateSpdyHeadersFromHttpRequest(
265         *request_info_, request_headers,
266         stream_->GetProtocolVersion(), direct_,
267         headers.get());
268     stream_->net_log().AddEvent(
269         NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
270         base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
271     result =
272         stream_->SendRequestHeaders(
273             headers.Pass(),
274             HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
275   }
276 
277   if (result == ERR_IO_PENDING) {
278     CHECK(callback_.is_null());
279     callback_ = callback;
280   }
281   return result;
282 }
283 
Cancel()284 void SpdyHttpStream::Cancel() {
285   callback_.Reset();
286   if (stream_.get()) {
287     stream_->Cancel();
288     DCHECK(!stream_.get());
289   }
290 }
291 
OnRequestHeadersSent()292 void SpdyHttpStream::OnRequestHeadersSent() {
293   if (!callback_.is_null())
294     DoCallback(OK);
295 
296   // TODO(akalin): Do this immediately after sending the request
297   // headers.
298   if (HasUploadData())
299     ReadAndSendRequestBodyData();
300 }
301 
OnResponseHeadersUpdated(const SpdyHeaderBlock & response_headers)302 SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
303     const SpdyHeaderBlock& response_headers) {
304   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
305 
306   if (!response_info_) {
307     DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
308     push_response_info_.reset(new HttpResponseInfo);
309     response_info_ = push_response_info_.get();
310   }
311 
312   if (!SpdyHeadersToHttpResponse(
313           response_headers, stream_->GetProtocolVersion(), response_info_)) {
314     // We do not have complete headers yet.
315     return RESPONSE_HEADERS_ARE_INCOMPLETE;
316   }
317 
318   response_info_->response_time = stream_->response_time();
319   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
320   // Don't store the SSLInfo in the response here, HttpNetworkTransaction
321   // will take care of that part.
322   SSLInfo ssl_info;
323   NextProto protocol_negotiated = kProtoUnknown;
324   stream_->GetSSLInfo(&ssl_info,
325                       &response_info_->was_npn_negotiated,
326                       &protocol_negotiated);
327   response_info_->npn_negotiated_protocol =
328       SSLClientSocket::NextProtoToString(protocol_negotiated);
329   response_info_->request_time = stream_->GetRequestTime();
330   response_info_->connection_info =
331       HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
332   response_info_->vary_data
333       .Init(*request_info_, *response_info_->headers.get());
334 
335   if (!callback_.is_null())
336     DoCallback(OK);
337 
338   return RESPONSE_HEADERS_ARE_COMPLETE;
339 }
340 
OnDataReceived(scoped_ptr<SpdyBuffer> buffer)341 void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
342   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
343 
344   // Note that data may be received for a SpdyStream prior to the user calling
345   // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
346   // happen for server initiated streams.
347   DCHECK(stream_.get());
348   DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
349   if (buffer) {
350     response_body_queue_.Enqueue(buffer.Pass());
351 
352     if (user_buffer_.get()) {
353       // Handing small chunks of data to the caller creates measurable overhead.
354       // We buffer data in short time-spans and send a single read notification.
355       ScheduleBufferedReadCallback();
356     }
357   }
358 }
359 
OnDataSent()360 void SpdyHttpStream::OnDataSent() {
361   request_body_buf_size_ = 0;
362   ReadAndSendRequestBodyData();
363 }
364 
OnClose(int status)365 void SpdyHttpStream::OnClose(int status) {
366   if (stream_.get()) {
367     stream_closed_ = true;
368     closed_stream_status_ = status;
369     closed_stream_id_ = stream_->stream_id();
370     closed_stream_has_load_timing_info_ =
371         stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
372     closed_stream_received_bytes_ = stream_->raw_received_bytes();
373   }
374   stream_.reset();
375   bool invoked_callback = false;
376   if (status == net::OK) {
377     // We need to complete any pending buffered read now.
378     invoked_callback = DoBufferedReadCallback();
379   }
380   if (!invoked_callback && !callback_.is_null())
381     DoCallback(status);
382 }
383 
HasUploadData() const384 bool SpdyHttpStream::HasUploadData() const {
385   CHECK(request_info_);
386   return
387       request_info_->upload_data_stream &&
388       ((request_info_->upload_data_stream->size() > 0) ||
389        request_info_->upload_data_stream->is_chunked());
390 }
391 
OnStreamCreated(const CompletionCallback & callback,int rv)392 void SpdyHttpStream::OnStreamCreated(
393     const CompletionCallback& callback,
394     int rv) {
395   if (rv == OK) {
396     stream_ = stream_request_.ReleaseStream();
397     stream_->SetDelegate(this);
398   }
399   callback.Run(rv);
400 }
401 
ReadAndSendRequestBodyData()402 void SpdyHttpStream::ReadAndSendRequestBodyData() {
403   CHECK(HasUploadData());
404   CHECK_EQ(request_body_buf_size_, 0);
405 
406   if (request_info_->upload_data_stream->IsEOF())
407     return;
408 
409   // Read the data from the request body stream.
410   const int rv = request_info_->upload_data_stream
411       ->Read(request_body_buf_.get(),
412              request_body_buf_->size(),
413              base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
414                         weak_factory_.GetWeakPtr()));
415 
416   if (rv != ERR_IO_PENDING) {
417     // ERR_IO_PENDING is the only possible error.
418     CHECK_GE(rv, 0);
419     OnRequestBodyReadCompleted(rv);
420   }
421 }
422 
OnRequestBodyReadCompleted(int status)423 void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
424   CHECK_GE(status, 0);
425   request_body_buf_size_ = status;
426   const bool eof = request_info_->upload_data_stream->IsEOF();
427   // Only the final fame may have a length of 0.
428   if (eof) {
429     CHECK_GE(request_body_buf_size_, 0);
430   } else {
431     CHECK_GT(request_body_buf_size_, 0);
432   }
433   stream_->SendData(request_body_buf_.get(),
434                     request_body_buf_size_,
435                     eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
436 }
437 
ScheduleBufferedReadCallback()438 void SpdyHttpStream::ScheduleBufferedReadCallback() {
439   // If there is already a scheduled DoBufferedReadCallback, don't issue
440   // another one.  Mark that we have received more data and return.
441   if (buffered_read_callback_pending_) {
442     more_read_data_pending_ = true;
443     return;
444   }
445 
446   more_read_data_pending_ = false;
447   buffered_read_callback_pending_ = true;
448   const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
449   base::MessageLoop::current()->PostDelayedTask(
450       FROM_HERE,
451       base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
452                  weak_factory_.GetWeakPtr()),
453       kBufferTime);
454 }
455 
456 // Checks to see if we should wait for more buffered data before notifying
457 // the caller.  Returns true if we should wait, false otherwise.
ShouldWaitForMoreBufferedData() const458 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
459   // If the response is complete, there is no point in waiting.
460   if (stream_closed_)
461     return false;
462 
463   DCHECK_GT(user_buffer_len_, 0);
464   return response_body_queue_.GetTotalSize() <
465       static_cast<size_t>(user_buffer_len_);
466 }
467 
DoBufferedReadCallback()468 bool SpdyHttpStream::DoBufferedReadCallback() {
469   buffered_read_callback_pending_ = false;
470 
471   // If the transaction is cancelled or errored out, we don't need to complete
472   // the read.
473   if (!stream_.get() && !stream_closed_)
474     return false;
475 
476   int stream_status =
477       stream_closed_ ? closed_stream_status_ : stream_->response_status();
478   if (stream_status != OK)
479     return false;
480 
481   // When more_read_data_pending_ is true, it means that more data has
482   // arrived since we started waiting.  Wait a little longer and continue
483   // to buffer.
484   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
485     ScheduleBufferedReadCallback();
486     return false;
487   }
488 
489   int rv = 0;
490   if (user_buffer_.get()) {
491     rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
492     CHECK_NE(rv, ERR_IO_PENDING);
493     user_buffer_ = NULL;
494     user_buffer_len_ = 0;
495     DoCallback(rv);
496     return true;
497   }
498   return false;
499 }
500 
DoCallback(int rv)501 void SpdyHttpStream::DoCallback(int rv) {
502   CHECK_NE(rv, ERR_IO_PENDING);
503   CHECK(!callback_.is_null());
504 
505   // Since Run may result in being called back, clear user_callback_ in advance.
506   CompletionCallback c = callback_;
507   callback_.Reset();
508   c.Run(rv);
509 }
510 
GetSSLInfo(SSLInfo * ssl_info)511 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
512   DCHECK(stream_.get());
513   bool using_npn;
514   NextProto protocol_negotiated = kProtoUnknown;
515   stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
516 }
517 
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)518 void SpdyHttpStream::GetSSLCertRequestInfo(
519     SSLCertRequestInfo* cert_request_info) {
520   DCHECK(stream_.get());
521   stream_->GetSSLCertRequestInfo(cert_request_info);
522 }
523 
IsSpdyHttpStream() const524 bool SpdyHttpStream::IsSpdyHttpStream() const {
525   return true;
526 }
527 
Drain(HttpNetworkSession * session)528 void SpdyHttpStream::Drain(HttpNetworkSession* session) {
529   Close(false);
530   delete this;
531 }
532 
SetPriority(RequestPriority priority)533 void SpdyHttpStream::SetPriority(RequestPriority priority) {
534   // TODO(akalin): Plumb this through to |stream_request_| and
535   // |stream_|.
536 }
537 
538 }  // namespace net
539