1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_http_stream.h"
6
7 #include <algorithm>
8 #include <list>
9 #include <string>
10
11 #include "base/logging.h"
12 #include "base/message_loop.h"
13 #include "net/base/address_list.h"
14 #include "net/base/host_port_pair.h"
15 #include "net/base/load_flags.h"
16 #include "net/base/net_util.h"
17 #include "net/http/http_request_headers.h"
18 #include "net/http/http_request_info.h"
19 #include "net/http/http_response_info.h"
20 #include "net/http/http_util.h"
21 #include "net/spdy/spdy_http_utils.h"
22 #include "net/spdy/spdy_session.h"
23
24 namespace net {
25
SpdyHttpStream(SpdySession * spdy_session,bool direct)26 SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session,
27 bool direct)
28 : ALLOW_THIS_IN_INITIALIZER_LIST(read_callback_factory_(this)),
29 stream_(NULL),
30 spdy_session_(spdy_session),
31 response_info_(NULL),
32 download_finished_(false),
33 response_headers_received_(false),
34 user_callback_(NULL),
35 user_buffer_len_(0),
36 buffered_read_callback_pending_(false),
37 more_read_data_pending_(false),
38 direct_(direct) { }
39
InitializeWithExistingStream(SpdyStream * spdy_stream)40 void SpdyHttpStream::InitializeWithExistingStream(SpdyStream* spdy_stream) {
41 stream_ = spdy_stream;
42 stream_->SetDelegate(this);
43 response_headers_received_ = true;
44 }
45
~SpdyHttpStream()46 SpdyHttpStream::~SpdyHttpStream() {
47 if (stream_)
48 stream_->DetachDelegate();
49 }
50
InitializeStream(const HttpRequestInfo * request_info,const BoundNetLog & stream_net_log,CompletionCallback * callback)51 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
52 const BoundNetLog& stream_net_log,
53 CompletionCallback* callback) {
54 DCHECK(!stream_.get());
55 if (spdy_session_->IsClosed())
56 return ERR_CONNECTION_CLOSED;
57
58 request_info_ = request_info;
59 if (request_info_->method == "GET") {
60 int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
61 stream_net_log);
62 if (error != OK)
63 return error;
64 }
65
66 if (stream_.get())
67 return OK;
68
69 return spdy_session_->CreateStream(request_info_->url,
70 request_info_->priority, &stream_,
71 stream_net_log, callback);
72 }
73
GetResponseInfo() const74 const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
75 return response_info_;
76 }
77
GetUploadProgress() const78 uint64 SpdyHttpStream::GetUploadProgress() const {
79 if (!request_body_stream_.get())
80 return 0;
81
82 return request_body_stream_->position();
83 }
84
ReadResponseHeaders(CompletionCallback * callback)85 int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) {
86 CHECK(callback);
87 CHECK(!stream_->cancelled());
88
89 if (stream_->closed())
90 return stream_->response_status();
91
92 // Check if we already have the response headers. If so, return synchronously.
93 if(stream_->response_received()) {
94 CHECK(stream_->is_idle());
95 return OK;
96 }
97
98 // Still waiting for the response, return IO_PENDING.
99 CHECK(!user_callback_);
100 user_callback_ = callback;
101 return ERR_IO_PENDING;
102 }
103
ReadResponseBody(IOBuffer * buf,int buf_len,CompletionCallback * callback)104 int SpdyHttpStream::ReadResponseBody(
105 IOBuffer* buf, int buf_len, CompletionCallback* callback) {
106 CHECK(stream_->is_idle());
107 CHECK(buf);
108 CHECK(buf_len);
109 CHECK(callback);
110
111 // If we have data buffered, complete the IO immediately.
112 if (!response_body_.empty()) {
113 int bytes_read = 0;
114 while (!response_body_.empty() && buf_len > 0) {
115 scoped_refptr<IOBufferWithSize> data = response_body_.front();
116 const int bytes_to_copy = std::min(buf_len, data->size());
117 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
118 buf_len -= bytes_to_copy;
119 if (bytes_to_copy == data->size()) {
120 response_body_.pop_front();
121 } else {
122 const int bytes_remaining = data->size() - bytes_to_copy;
123 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
124 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
125 bytes_remaining);
126 response_body_.pop_front();
127 response_body_.push_front(make_scoped_refptr(new_buffer));
128 }
129 bytes_read += bytes_to_copy;
130 }
131 if (SpdySession::flow_control())
132 stream_->IncreaseRecvWindowSize(bytes_read);
133 return bytes_read;
134 } else if (stream_->closed()) {
135 return stream_->response_status();
136 }
137
138 CHECK(!user_callback_);
139 CHECK(!user_buffer_);
140 CHECK_EQ(0, user_buffer_len_);
141
142 user_callback_ = callback;
143 user_buffer_ = buf;
144 user_buffer_len_ = buf_len;
145 return ERR_IO_PENDING;
146 }
147
Close(bool not_reusable)148 void SpdyHttpStream::Close(bool not_reusable) {
149 // Note: the not_reusable flag has no meaning for SPDY streams.
150
151 Cancel();
152 }
153
RenewStreamForAuth()154 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
155 return NULL;
156 }
157
IsResponseBodyComplete() const158 bool SpdyHttpStream::IsResponseBodyComplete() const {
159 if (!stream_)
160 return false;
161 return stream_->closed();
162 }
163
CanFindEndOfResponse() const164 bool SpdyHttpStream::CanFindEndOfResponse() const {
165 return true;
166 }
167
IsMoreDataBuffered() const168 bool SpdyHttpStream::IsMoreDataBuffered() const {
169 return false;
170 }
171
IsConnectionReused() const172 bool SpdyHttpStream::IsConnectionReused() const {
173 return spdy_session_->IsReused();
174 }
175
SetConnectionReused()176 void SpdyHttpStream::SetConnectionReused() {
177 // SPDY doesn't need an indicator here.
178 }
179
IsConnectionReusable() const180 bool SpdyHttpStream::IsConnectionReusable() const {
181 // SPDY streams aren't considered reusable.
182 return false;
183 }
184
set_chunk_callback(ChunkCallback * callback)185 void SpdyHttpStream::set_chunk_callback(ChunkCallback* callback) {
186 if (request_body_stream_ != NULL)
187 request_body_stream_->set_chunk_callback(callback);
188 }
189
SendRequest(const HttpRequestHeaders & request_headers,UploadDataStream * request_body,HttpResponseInfo * response,CompletionCallback * callback)190 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
191 UploadDataStream* request_body,
192 HttpResponseInfo* response,
193 CompletionCallback* callback) {
194 base::Time request_time = base::Time::Now();
195 CHECK(stream_.get());
196
197 stream_->SetDelegate(this);
198
199 linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
200 CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers,
201 headers.get(), direct_);
202 stream_->set_spdy_headers(headers);
203
204 stream_->SetRequestTime(request_time);
205 // This should only get called in the case of a request occurring
206 // during server push that has already begun but hasn't finished,
207 // so we set the response's request time to be the actual one
208 if (response_info_)
209 response_info_->request_time = request_time;
210
211 CHECK(!request_body_stream_.get());
212 if (request_body) {
213 if (request_body->size() || request_body->is_chunked())
214 request_body_stream_.reset(request_body);
215 else
216 delete request_body;
217 }
218
219 CHECK(callback);
220 CHECK(!stream_->cancelled());
221 CHECK(response);
222
223 if (!stream_->pushed() && stream_->closed()) {
224 if (stream_->response_status() == OK)
225 return ERR_FAILED;
226 else
227 return stream_->response_status();
228 }
229
230 // SendRequest can be called in two cases.
231 //
232 // a) A client initiated request. In this case, |response_info_| should be
233 // NULL to start with.
234 // b) A client request which matches a response that the server has already
235 // pushed.
236 if (push_response_info_.get()) {
237 *response = *(push_response_info_.get());
238 push_response_info_.reset();
239 }
240 else
241 DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
242
243 response_info_ = response;
244
245 // Put the peer's IP address and port into the response.
246 AddressList address;
247 int result = stream_->GetPeerAddress(&address);
248 if (result != OK)
249 return result;
250 response_info_->socket_address = HostPortPair::FromAddrInfo(address.head());
251
252 bool has_upload_data = request_body_stream_.get() != NULL;
253 result = stream_->SendRequest(has_upload_data);
254 if (result == ERR_IO_PENDING) {
255 CHECK(!user_callback_);
256 user_callback_ = callback;
257 }
258 return result;
259 }
260
Cancel()261 void SpdyHttpStream::Cancel() {
262 if (spdy_session_)
263 spdy_session_->CancelPendingCreateStreams(&stream_);
264 user_callback_ = NULL;
265 if (stream_)
266 stream_->Cancel();
267 }
268
OnSendHeadersComplete(int status)269 bool SpdyHttpStream::OnSendHeadersComplete(int status) {
270 if (user_callback_)
271 DoCallback(status);
272 return request_body_stream_.get() == NULL;
273 }
274
OnSendBody()275 int SpdyHttpStream::OnSendBody() {
276 CHECK(request_body_stream_.get());
277
278 int buf_len = static_cast<int>(request_body_stream_->buf_len());
279 if (!buf_len)
280 return OK;
281 bool is_chunked = request_body_stream_->is_chunked();
282 // TODO(satish): For non-chunked POST data, we set DATA_FLAG_FIN for all
283 // blocks of data written out. This is wrong if the POST data was larger than
284 // UploadDataStream::kBufSize as that is the largest buffer that
285 // UploadDataStream returns at a time and we'll be setting the FIN flag for
286 // each block of data written out.
287 bool eof = !is_chunked || request_body_stream_->IsOnLastChunk();
288 return stream_->WriteStreamData(
289 request_body_stream_->buf(), buf_len,
290 eof ? spdy::DATA_FLAG_FIN : spdy::DATA_FLAG_NONE);
291 }
292
OnSendBodyComplete(int status,bool * eof)293 int SpdyHttpStream::OnSendBodyComplete(int status, bool* eof) {
294 CHECK(request_body_stream_.get());
295
296 request_body_stream_->MarkConsumedAndFillBuffer(status);
297 *eof = request_body_stream_->eof();
298 if (!*eof &&
299 request_body_stream_->is_chunked() &&
300 !request_body_stream_->buf_len())
301 return ERR_IO_PENDING;
302
303 return OK;
304 }
305
OnResponseReceived(const spdy::SpdyHeaderBlock & response,base::Time response_time,int status)306 int SpdyHttpStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response,
307 base::Time response_time,
308 int status) {
309 if (!response_info_) {
310 DCHECK(stream_->pushed());
311 push_response_info_.reset(new HttpResponseInfo);
312 response_info_ = push_response_info_.get();
313 }
314
315 // If the response is already received, these headers are too late.
316 if (response_headers_received_) {
317 LOG(WARNING) << "SpdyHttpStream headers received after response started.";
318 return OK;
319 }
320
321 // TODO(mbelshe): This is the time of all headers received, not just time
322 // to first byte.
323 response_info_->response_time = base::Time::Now();
324
325 if (!SpdyHeadersToHttpResponse(response, response_info_)) {
326 // We might not have complete headers yet.
327 return ERR_INCOMPLETE_SPDY_HEADERS;
328 }
329
330 response_headers_received_ = true;
331 // Don't store the SSLInfo in the response here, HttpNetworkTransaction
332 // will take care of that part.
333 SSLInfo ssl_info;
334 stream_->GetSSLInfo(&ssl_info,
335 &response_info_->was_npn_negotiated);
336 response_info_->request_time = stream_->GetRequestTime();
337 response_info_->vary_data.Init(*request_info_, *response_info_->headers);
338 // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control
339 // frame has been received and processed. Move to framer?
340 response_info_->response_time = response_time;
341
342 if (user_callback_)
343 DoCallback(status);
344 return status;
345 }
346
OnDataReceived(const char * data,int length)347 void SpdyHttpStream::OnDataReceived(const char* data, int length) {
348 // SpdyStream won't call us with data if the header block didn't contain a
349 // valid set of headers. So we don't expect to not have headers received
350 // here.
351 DCHECK(response_headers_received_);
352
353 // Note that data may be received for a SpdyStream prior to the user calling
354 // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often
355 // happen for server initiated streams.
356 DCHECK(!stream_->closed() || stream_->pushed());
357 if (length > 0) {
358 // Save the received data.
359 IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
360 memcpy(io_buffer->data(), data, length);
361 response_body_.push_back(make_scoped_refptr(io_buffer));
362
363 if (user_buffer_) {
364 // Handing small chunks of data to the caller creates measurable overhead.
365 // We buffer data in short time-spans and send a single read notification.
366 ScheduleBufferedReadCallback();
367 }
368 }
369 }
370
OnDataSent(int length)371 void SpdyHttpStream::OnDataSent(int length) {
372 // For HTTP streams, no data is sent from the client while in the OPEN state,
373 // so it is never called.
374 NOTREACHED();
375 }
376
OnClose(int status)377 void SpdyHttpStream::OnClose(int status) {
378 bool invoked_callback = false;
379 if (status == net::OK) {
380 // We need to complete any pending buffered read now.
381 invoked_callback = DoBufferedReadCallback();
382 }
383 if (!invoked_callback && user_callback_)
384 DoCallback(status);
385 }
386
ScheduleBufferedReadCallback()387 void SpdyHttpStream::ScheduleBufferedReadCallback() {
388 // If there is already a scheduled DoBufferedReadCallback, don't issue
389 // another one. Mark that we have received more data and return.
390 if (buffered_read_callback_pending_) {
391 more_read_data_pending_ = true;
392 return;
393 }
394
395 more_read_data_pending_ = false;
396 buffered_read_callback_pending_ = true;
397 const int kBufferTimeMs = 1;
398 MessageLoop::current()->PostDelayedTask(FROM_HERE, read_callback_factory_.
399 NewRunnableMethod(&SpdyHttpStream::DoBufferedReadCallback),
400 kBufferTimeMs);
401 }
402
403 // Checks to see if we should wait for more buffered data before notifying
404 // the caller. Returns true if we should wait, false otherwise.
ShouldWaitForMoreBufferedData() const405 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
406 // If the response is complete, there is no point in waiting.
407 if (stream_->closed())
408 return false;
409
410 int bytes_buffered = 0;
411 std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it;
412 for (it = response_body_.begin();
413 it != response_body_.end() && bytes_buffered < user_buffer_len_;
414 ++it)
415 bytes_buffered += (*it)->size();
416
417 return bytes_buffered < user_buffer_len_;
418 }
419
DoBufferedReadCallback()420 bool SpdyHttpStream::DoBufferedReadCallback() {
421 read_callback_factory_.RevokeAll();
422 buffered_read_callback_pending_ = false;
423
424 // If the transaction is cancelled or errored out, we don't need to complete
425 // the read.
426 if (!stream_ || stream_->response_status() != OK || stream_->cancelled())
427 return false;
428
429 // When more_read_data_pending_ is true, it means that more data has
430 // arrived since we started waiting. Wait a little longer and continue
431 // to buffer.
432 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
433 ScheduleBufferedReadCallback();
434 return false;
435 }
436
437 int rv = 0;
438 if (user_buffer_) {
439 rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
440 CHECK_NE(rv, ERR_IO_PENDING);
441 user_buffer_ = NULL;
442 user_buffer_len_ = 0;
443 DoCallback(rv);
444 return true;
445 }
446 return false;
447 }
448
DoCallback(int rv)449 void SpdyHttpStream::DoCallback(int rv) {
450 CHECK_NE(rv, ERR_IO_PENDING);
451 CHECK(user_callback_);
452
453 // Since Run may result in being called back, clear user_callback_ in advance.
454 CompletionCallback* c = user_callback_;
455 user_callback_ = NULL;
456 c->Run(rv);
457 }
458
GetSSLInfo(SSLInfo * ssl_info)459 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
460 DCHECK(stream_);
461 bool using_npn;
462 stream_->GetSSLInfo(ssl_info, &using_npn);
463 }
464
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)465 void SpdyHttpStream::GetSSLCertRequestInfo(
466 SSLCertRequestInfo* cert_request_info) {
467 DCHECK(stream_);
468 stream_->GetSSLCertRequestInfo(cert_request_info);
469 }
470
IsSpdyHttpStream() const471 bool SpdyHttpStream::IsSpdyHttpStream() const {
472 return true;
473 }
474
475 } // namespace net
476