1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_http_stream.h"
6
7 #include <algorithm>
8 #include <list>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/strings/stringprintf.h"
14 #include "net/base/host_port_pair.h"
15 #include "net/base/net_log.h"
16 #include "net/base/net_util.h"
17 #include "net/base/upload_data_stream.h"
18 #include "net/http/http_request_headers.h"
19 #include "net/http/http_request_info.h"
20 #include "net/http/http_response_info.h"
21 #include "net/spdy/spdy_header_block.h"
22 #include "net/spdy/spdy_http_utils.h"
23 #include "net/spdy/spdy_protocol.h"
24 #include "net/spdy/spdy_session.h"
25
26 namespace net {
27
SpdyHttpStream(const base::WeakPtr<SpdySession> & spdy_session,bool direct)28 SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
29 bool direct)
30 : spdy_session_(spdy_session),
31 is_reused_(spdy_session_->IsReused()),
32 stream_closed_(false),
33 closed_stream_status_(ERR_FAILED),
34 closed_stream_id_(0),
35 closed_stream_received_bytes_(0),
36 request_info_(NULL),
37 response_info_(NULL),
38 response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
39 user_buffer_len_(0),
40 request_body_buf_size_(0),
41 buffered_read_callback_pending_(false),
42 more_read_data_pending_(false),
43 direct_(direct),
44 weak_factory_(this) {
45 DCHECK(spdy_session_.get());
46 }
47
~SpdyHttpStream()48 SpdyHttpStream::~SpdyHttpStream() {
49 if (stream_.get()) {
50 stream_->DetachDelegate();
51 DCHECK(!stream_.get());
52 }
53 }
54
InitializeStream(const HttpRequestInfo * request_info,RequestPriority priority,const BoundNetLog & stream_net_log,const CompletionCallback & callback)55 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
56 RequestPriority priority,
57 const BoundNetLog& stream_net_log,
58 const CompletionCallback& callback) {
59 DCHECK(!stream_);
60 if (!spdy_session_)
61 return ERR_CONNECTION_CLOSED;
62
63 request_info_ = request_info;
64 if (request_info_->method == "GET") {
65 int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
66 stream_net_log);
67 if (error != OK)
68 return error;
69
70 // |stream_| may be NULL even if OK was returned.
71 if (stream_.get()) {
72 DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
73 stream_->SetDelegate(this);
74 return OK;
75 }
76 }
77
78 int rv = stream_request_.StartRequest(
79 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
80 priority, stream_net_log,
81 base::Bind(&SpdyHttpStream::OnStreamCreated,
82 weak_factory_.GetWeakPtr(), callback));
83
84 if (rv == OK) {
85 stream_ = stream_request_.ReleaseStream();
86 stream_->SetDelegate(this);
87 }
88
89 return rv;
90 }
91
GetUploadProgress() const92 UploadProgress SpdyHttpStream::GetUploadProgress() const {
93 if (!request_info_ || !HasUploadData())
94 return UploadProgress();
95
96 return UploadProgress(request_info_->upload_data_stream->position(),
97 request_info_->upload_data_stream->size());
98 }
99
ReadResponseHeaders(const CompletionCallback & callback)100 int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
101 CHECK(!callback.is_null());
102 if (stream_closed_)
103 return closed_stream_status_;
104
105 CHECK(stream_.get());
106
107 // Check if we already have the response headers. If so, return synchronously.
108 if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
109 CHECK(!stream_->IsIdle());
110 return OK;
111 }
112
113 // Still waiting for the response, return IO_PENDING.
114 CHECK(callback_.is_null());
115 callback_ = callback;
116 return ERR_IO_PENDING;
117 }
118
ReadResponseBody(IOBuffer * buf,int buf_len,const CompletionCallback & callback)119 int SpdyHttpStream::ReadResponseBody(
120 IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
121 if (stream_.get())
122 CHECK(!stream_->IsIdle());
123
124 CHECK(buf);
125 CHECK(buf_len);
126 CHECK(!callback.is_null());
127
128 // If we have data buffered, complete the IO immediately.
129 if (!response_body_queue_.IsEmpty()) {
130 return response_body_queue_.Dequeue(buf->data(), buf_len);
131 } else if (stream_closed_) {
132 return closed_stream_status_;
133 }
134
135 CHECK(callback_.is_null());
136 CHECK(!user_buffer_.get());
137 CHECK_EQ(0, user_buffer_len_);
138
139 callback_ = callback;
140 user_buffer_ = buf;
141 user_buffer_len_ = buf_len;
142 return ERR_IO_PENDING;
143 }
144
Close(bool not_reusable)145 void SpdyHttpStream::Close(bool not_reusable) {
146 // Note: the not_reusable flag has no meaning for SPDY streams.
147
148 Cancel();
149 DCHECK(!stream_.get());
150 }
151
RenewStreamForAuth()152 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
153 return NULL;
154 }
155
IsResponseBodyComplete() const156 bool SpdyHttpStream::IsResponseBodyComplete() const {
157 return stream_closed_;
158 }
159
CanFindEndOfResponse() const160 bool SpdyHttpStream::CanFindEndOfResponse() const {
161 return true;
162 }
163
IsConnectionReused() const164 bool SpdyHttpStream::IsConnectionReused() const {
165 return is_reused_;
166 }
167
SetConnectionReused()168 void SpdyHttpStream::SetConnectionReused() {
169 // SPDY doesn't need an indicator here.
170 }
171
IsConnectionReusable() const172 bool SpdyHttpStream::IsConnectionReusable() const {
173 // SPDY streams aren't considered reusable.
174 return false;
175 }
176
GetTotalReceivedBytes() const177 int64 SpdyHttpStream::GetTotalReceivedBytes() const {
178 if (stream_closed_)
179 return closed_stream_received_bytes_;
180
181 if (!stream_)
182 return 0;
183
184 return stream_->raw_received_bytes();
185 }
186
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const187 bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
188 if (stream_closed_) {
189 if (!closed_stream_has_load_timing_info_)
190 return false;
191 *load_timing_info = closed_stream_load_timing_info_;
192 return true;
193 }
194
195 // If |stream_| has yet to be created, or does not yet have an ID, fail.
196 // The reused flag can only be correctly set once a stream has an ID. Streams
197 // get their IDs once the request has been successfully sent, so this does not
198 // behave that differently from other stream types.
199 if (!stream_ || stream_->stream_id() == 0)
200 return false;
201
202 return stream_->GetLoadTimingInfo(load_timing_info);
203 }
204
SendRequest(const HttpRequestHeaders & request_headers,HttpResponseInfo * response,const CompletionCallback & callback)205 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
206 HttpResponseInfo* response,
207 const CompletionCallback& callback) {
208 if (stream_closed_) {
209 return closed_stream_status_;
210 }
211
212 base::Time request_time = base::Time::Now();
213 CHECK(stream_.get());
214
215 stream_->SetRequestTime(request_time);
216 // This should only get called in the case of a request occurring
217 // during server push that has already begun but hasn't finished,
218 // so we set the response's request time to be the actual one
219 if (response_info_)
220 response_info_->request_time = request_time;
221
222 CHECK(!request_body_buf_.get());
223 if (HasUploadData()) {
224 // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
225 // body data is written with this size at a time.
226 request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
227 // The request body buffer is empty at first.
228 request_body_buf_size_ = 0;
229 }
230
231 CHECK(!callback.is_null());
232 CHECK(response);
233
234 // SendRequest can be called in two cases.
235 //
236 // a) A client initiated request. In this case, |response_info_| should be
237 // NULL to start with.
238 // b) A client request which matches a response that the server has already
239 // pushed.
240 if (push_response_info_.get()) {
241 *response = *(push_response_info_.get());
242 push_response_info_.reset();
243 } else {
244 DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
245 }
246
247 response_info_ = response;
248
249 // Put the peer's IP address and port into the response.
250 IPEndPoint address;
251 int result = stream_->GetPeerAddress(&address);
252 if (result != OK)
253 return result;
254 response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
255
256 if (stream_->type() == SPDY_PUSH_STREAM) {
257 // Pushed streams do not send any data, and should always be
258 // idle. However, we still want to return ERR_IO_PENDING to mimic
259 // non-push behavior. The callback will be called when the
260 // response is received.
261 result = ERR_IO_PENDING;
262 } else {
263 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
264 CreateSpdyHeadersFromHttpRequest(
265 *request_info_, request_headers,
266 stream_->GetProtocolVersion(), direct_,
267 headers.get());
268 stream_->net_log().AddEvent(
269 NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
270 base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
271 result =
272 stream_->SendRequestHeaders(
273 headers.Pass(),
274 HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
275 }
276
277 if (result == ERR_IO_PENDING) {
278 CHECK(callback_.is_null());
279 callback_ = callback;
280 }
281 return result;
282 }
283
Cancel()284 void SpdyHttpStream::Cancel() {
285 callback_.Reset();
286 if (stream_.get()) {
287 stream_->Cancel();
288 DCHECK(!stream_.get());
289 }
290 }
291
OnRequestHeadersSent()292 void SpdyHttpStream::OnRequestHeadersSent() {
293 if (!callback_.is_null())
294 DoCallback(OK);
295
296 // TODO(akalin): Do this immediately after sending the request
297 // headers.
298 if (HasUploadData())
299 ReadAndSendRequestBodyData();
300 }
301
OnResponseHeadersUpdated(const SpdyHeaderBlock & response_headers)302 SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
303 const SpdyHeaderBlock& response_headers) {
304 CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
305
306 if (!response_info_) {
307 DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
308 push_response_info_.reset(new HttpResponseInfo);
309 response_info_ = push_response_info_.get();
310 }
311
312 if (!SpdyHeadersToHttpResponse(
313 response_headers, stream_->GetProtocolVersion(), response_info_)) {
314 // We do not have complete headers yet.
315 return RESPONSE_HEADERS_ARE_INCOMPLETE;
316 }
317
318 response_info_->response_time = stream_->response_time();
319 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
320 // Don't store the SSLInfo in the response here, HttpNetworkTransaction
321 // will take care of that part.
322 SSLInfo ssl_info;
323 NextProto protocol_negotiated = kProtoUnknown;
324 stream_->GetSSLInfo(&ssl_info,
325 &response_info_->was_npn_negotiated,
326 &protocol_negotiated);
327 response_info_->npn_negotiated_protocol =
328 SSLClientSocket::NextProtoToString(protocol_negotiated);
329 response_info_->request_time = stream_->GetRequestTime();
330 response_info_->connection_info =
331 HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
332 response_info_->vary_data
333 .Init(*request_info_, *response_info_->headers.get());
334
335 if (!callback_.is_null())
336 DoCallback(OK);
337
338 return RESPONSE_HEADERS_ARE_COMPLETE;
339 }
340
OnDataReceived(scoped_ptr<SpdyBuffer> buffer)341 void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
342 CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
343
344 // Note that data may be received for a SpdyStream prior to the user calling
345 // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often
346 // happen for server initiated streams.
347 DCHECK(stream_.get());
348 DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
349 if (buffer) {
350 response_body_queue_.Enqueue(buffer.Pass());
351
352 if (user_buffer_.get()) {
353 // Handing small chunks of data to the caller creates measurable overhead.
354 // We buffer data in short time-spans and send a single read notification.
355 ScheduleBufferedReadCallback();
356 }
357 }
358 }
359
OnDataSent()360 void SpdyHttpStream::OnDataSent() {
361 request_body_buf_size_ = 0;
362 ReadAndSendRequestBodyData();
363 }
364
OnClose(int status)365 void SpdyHttpStream::OnClose(int status) {
366 if (stream_.get()) {
367 stream_closed_ = true;
368 closed_stream_status_ = status;
369 closed_stream_id_ = stream_->stream_id();
370 closed_stream_has_load_timing_info_ =
371 stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
372 closed_stream_received_bytes_ = stream_->raw_received_bytes();
373 }
374 stream_.reset();
375 bool invoked_callback = false;
376 if (status == net::OK) {
377 // We need to complete any pending buffered read now.
378 invoked_callback = DoBufferedReadCallback();
379 }
380 if (!invoked_callback && !callback_.is_null())
381 DoCallback(status);
382 }
383
HasUploadData() const384 bool SpdyHttpStream::HasUploadData() const {
385 CHECK(request_info_);
386 return
387 request_info_->upload_data_stream &&
388 ((request_info_->upload_data_stream->size() > 0) ||
389 request_info_->upload_data_stream->is_chunked());
390 }
391
OnStreamCreated(const CompletionCallback & callback,int rv)392 void SpdyHttpStream::OnStreamCreated(
393 const CompletionCallback& callback,
394 int rv) {
395 if (rv == OK) {
396 stream_ = stream_request_.ReleaseStream();
397 stream_->SetDelegate(this);
398 }
399 callback.Run(rv);
400 }
401
ReadAndSendRequestBodyData()402 void SpdyHttpStream::ReadAndSendRequestBodyData() {
403 CHECK(HasUploadData());
404 CHECK_EQ(request_body_buf_size_, 0);
405
406 if (request_info_->upload_data_stream->IsEOF())
407 return;
408
409 // Read the data from the request body stream.
410 const int rv = request_info_->upload_data_stream
411 ->Read(request_body_buf_.get(),
412 request_body_buf_->size(),
413 base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
414 weak_factory_.GetWeakPtr()));
415
416 if (rv != ERR_IO_PENDING) {
417 // ERR_IO_PENDING is the only possible error.
418 CHECK_GE(rv, 0);
419 OnRequestBodyReadCompleted(rv);
420 }
421 }
422
OnRequestBodyReadCompleted(int status)423 void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
424 CHECK_GE(status, 0);
425 request_body_buf_size_ = status;
426 const bool eof = request_info_->upload_data_stream->IsEOF();
427 // Only the final fame may have a length of 0.
428 if (eof) {
429 CHECK_GE(request_body_buf_size_, 0);
430 } else {
431 CHECK_GT(request_body_buf_size_, 0);
432 }
433 stream_->SendData(request_body_buf_.get(),
434 request_body_buf_size_,
435 eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
436 }
437
ScheduleBufferedReadCallback()438 void SpdyHttpStream::ScheduleBufferedReadCallback() {
439 // If there is already a scheduled DoBufferedReadCallback, don't issue
440 // another one. Mark that we have received more data and return.
441 if (buffered_read_callback_pending_) {
442 more_read_data_pending_ = true;
443 return;
444 }
445
446 more_read_data_pending_ = false;
447 buffered_read_callback_pending_ = true;
448 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
449 base::MessageLoop::current()->PostDelayedTask(
450 FROM_HERE,
451 base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
452 weak_factory_.GetWeakPtr()),
453 kBufferTime);
454 }
455
456 // Checks to see if we should wait for more buffered data before notifying
457 // the caller. Returns true if we should wait, false otherwise.
ShouldWaitForMoreBufferedData() const458 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
459 // If the response is complete, there is no point in waiting.
460 if (stream_closed_)
461 return false;
462
463 DCHECK_GT(user_buffer_len_, 0);
464 return response_body_queue_.GetTotalSize() <
465 static_cast<size_t>(user_buffer_len_);
466 }
467
DoBufferedReadCallback()468 bool SpdyHttpStream::DoBufferedReadCallback() {
469 buffered_read_callback_pending_ = false;
470
471 // If the transaction is cancelled or errored out, we don't need to complete
472 // the read.
473 if (!stream_.get() && !stream_closed_)
474 return false;
475
476 int stream_status =
477 stream_closed_ ? closed_stream_status_ : stream_->response_status();
478 if (stream_status != OK)
479 return false;
480
481 // When more_read_data_pending_ is true, it means that more data has
482 // arrived since we started waiting. Wait a little longer and continue
483 // to buffer.
484 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
485 ScheduleBufferedReadCallback();
486 return false;
487 }
488
489 int rv = 0;
490 if (user_buffer_.get()) {
491 rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
492 CHECK_NE(rv, ERR_IO_PENDING);
493 user_buffer_ = NULL;
494 user_buffer_len_ = 0;
495 DoCallback(rv);
496 return true;
497 }
498 return false;
499 }
500
DoCallback(int rv)501 void SpdyHttpStream::DoCallback(int rv) {
502 CHECK_NE(rv, ERR_IO_PENDING);
503 CHECK(!callback_.is_null());
504
505 // Since Run may result in being called back, clear user_callback_ in advance.
506 CompletionCallback c = callback_;
507 callback_.Reset();
508 c.Run(rv);
509 }
510
GetSSLInfo(SSLInfo * ssl_info)511 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
512 DCHECK(stream_.get());
513 bool using_npn;
514 NextProto protocol_negotiated = kProtoUnknown;
515 stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
516 }
517
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)518 void SpdyHttpStream::GetSSLCertRequestInfo(
519 SSLCertRequestInfo* cert_request_info) {
520 DCHECK(stream_.get());
521 stream_->GetSSLCertRequestInfo(cert_request_info);
522 }
523
IsSpdyHttpStream() const524 bool SpdyHttpStream::IsSpdyHttpStream() const {
525 return true;
526 }
527
Drain(HttpNetworkSession * session)528 void SpdyHttpStream::Drain(HttpNetworkSession* session) {
529 Close(false);
530 delete this;
531 }
532
SetPriority(RequestPriority priority)533 void SpdyHttpStream::SetPriority(RequestPriority priority) {
534 // TODO(akalin): Plumb this through to |stream_request_| and
535 // |stream_|.
536 }
537
538 } // namespace net
539