• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/flip/flip_stream.h"
6 
7 #include "base/logging.h"
8 #include "net/flip/flip_session.h"
9 #include "net/http/http_request_info.h"
10 #include "net/http/http_response_info.h"
11 
12 namespace net {
13 
FlipStream(FlipSession * session,flip::FlipStreamId stream_id,bool pushed,LoadLog * log)14 FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id,
15                        bool pushed, LoadLog* log)
16     : stream_id_(stream_id),
17       priority_(0),
18       pushed_(pushed),
19       download_finished_(false),
20       metrics_(Singleton<BandwidthMetrics>::get()),
21       session_(session),
22       response_(NULL),
23       request_body_stream_(NULL),
24       response_complete_(false),
25       io_state_(STATE_NONE),
26       response_status_(OK),
27       user_callback_(NULL),
28       user_buffer_(NULL),
29       user_buffer_len_(0),
30       cancelled_(false),
31       load_log_(log),
32       send_bytes_(0),
33       recv_bytes_(0),
34       histograms_recorded_(false) {}
35 
~FlipStream()36 FlipStream::~FlipStream() {
37   DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
38 
39   // TODO(willchan): We're still calling CancelStream() too many times, because
40   // inactive pending/pushed streams will still have stream_id_ set.
41   if (stream_id_) {
42     session_->CancelStream(stream_id_);
43   } else if (!response_complete_) {
44     NOTREACHED();
45   }
46 }
47 
GetUploadProgress() const48 uint64 FlipStream::GetUploadProgress() const {
49   if (!request_body_stream_.get())
50     return 0;
51 
52   return request_body_stream_->position();
53 }
54 
GetResponseInfo() const55 const HttpResponseInfo* FlipStream::GetResponseInfo() const {
56   return response_;
57 }
58 
ReadResponseHeaders(CompletionCallback * callback)59 int FlipStream::ReadResponseHeaders(CompletionCallback* callback) {
60   // Note: The FlipStream may have already received the response headers, so
61   //       this call may complete synchronously.
62   CHECK(callback);
63   CHECK(io_state_ == STATE_NONE);
64   CHECK(!cancelled_);
65 
66   // The SYN_REPLY has already been received.
67   if (response_->headers)
68     return OK;
69 
70   io_state_ = STATE_READ_HEADERS;
71   CHECK(!user_callback_);
72   user_callback_ = callback;
73   return ERR_IO_PENDING;
74 }
75 
ReadResponseBody(IOBuffer * buf,int buf_len,CompletionCallback * callback)76 int FlipStream::ReadResponseBody(
77     IOBuffer* buf, int buf_len, CompletionCallback* callback) {
78   DCHECK_EQ(io_state_, STATE_NONE);
79   CHECK(buf);
80   CHECK(buf_len);
81   CHECK(callback);
82   CHECK(!cancelled_);
83 
84   // If we have data buffered, complete the IO immediately.
85   if (response_body_.size()) {
86     int bytes_read = 0;
87     while (response_body_.size() && buf_len > 0) {
88       scoped_refptr<IOBufferWithSize> data = response_body_.front();
89       const int bytes_to_copy = std::min(buf_len, data->size());
90       memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
91       buf_len -= bytes_to_copy;
92       if (bytes_to_copy == data->size()) {
93         response_body_.pop_front();
94       } else {
95         const int bytes_remaining = data->size() - bytes_to_copy;
96         IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
97         memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
98                bytes_remaining);
99         response_body_.pop_front();
100         response_body_.push_front(new_buffer);
101       }
102       bytes_read += bytes_to_copy;
103     }
104     if (bytes_read > 0)
105       recv_bytes_ += bytes_read;
106     return bytes_read;
107   } else if (response_complete_) {
108     return response_status_;
109   }
110 
111   CHECK(!user_callback_);
112   CHECK(!user_buffer_);
113   CHECK(user_buffer_len_ == 0);
114 
115   user_callback_ = callback;
116   user_buffer_ = buf;
117   user_buffer_len_ = buf_len;
118   return ERR_IO_PENDING;
119 }
120 
SendRequest(UploadDataStream * upload_data,HttpResponseInfo * response,CompletionCallback * callback)121 int FlipStream::SendRequest(UploadDataStream* upload_data,
122                             HttpResponseInfo* response,
123                             CompletionCallback* callback) {
124   CHECK(callback);
125   CHECK(!cancelled_);
126   CHECK(response);
127 
128   response_ = response;
129 
130   if (upload_data) {
131     if (upload_data->size())
132       request_body_stream_.reset(upload_data);
133     else
134       delete upload_data;
135   }
136 
137   send_time_ = base::TimeTicks::Now();
138 
139   DCHECK_EQ(io_state_, STATE_NONE);
140   if (!pushed_)
141     io_state_ = STATE_SEND_HEADERS;
142   else
143     io_state_ = STATE_READ_HEADERS;
144   int result = DoLoop(OK);
145   if (result == ERR_IO_PENDING) {
146     CHECK(!user_callback_);
147     user_callback_ = callback;
148   }
149   return result;
150 }
151 
Cancel()152 void FlipStream::Cancel() {
153   cancelled_ = true;
154   user_callback_ = NULL;
155 
156   session_->CancelStream(stream_id_);
157 }
158 
OnResponseReceived(const HttpResponseInfo & response)159 void FlipStream::OnResponseReceived(const HttpResponseInfo& response) {
160   metrics_.StartStream();
161 
162   CHECK(!response_->headers);
163 
164   *response_ = response;  // TODO(mbelshe): avoid copy.
165   DCHECK(response_->headers);
166 
167   recv_first_byte_time_ = base::TimeTicks::Now();
168 
169   if (io_state_ == STATE_NONE) {
170     CHECK(pushed_);
171   } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) {
172     CHECK(!pushed_);
173   } else {
174     NOTREACHED();
175   }
176 
177   int rv = DoLoop(OK);
178 
179   if (user_callback_)
180     DoCallback(rv);
181 }
182 
OnDataReceived(const char * data,int length)183 bool FlipStream::OnDataReceived(const char* data, int length) {
184   DCHECK_GE(length, 0);
185   LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
186             << stream_id_;
187 
188   // If we don't have a response, then the SYN_REPLY did not come through.
189   // We cannot pass data up to the caller unless the reply headers have been
190   // received.
191   if (!response_->headers) {
192     OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
193     return false;
194   }
195 
196   if (length > 0)
197     recv_bytes_ += length;
198   recv_last_byte_time_ = base::TimeTicks::Now();
199 
200   // A zero-length read means that the stream is being closed.
201   if (!length) {
202     metrics_.StopStream();
203     download_finished_ = true;
204     OnClose(net::OK);
205     return true;
206   }
207 
208   // Track our bandwidth.
209   metrics_.RecordBytes(length);
210 
211   if (length > 0) {
212     // TODO(mbelshe): If read is pending, we should copy the data straight into
213     // the read buffer here.  For now, we'll queue it always.
214     // TODO(mbelshe): We need to have some throttling on this.  We shouldn't
215     //                buffer an infinite amount of data.
216 
217     IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
218     memcpy(io_buffer->data(), data, length);
219 
220     response_body_.push_back(io_buffer);
221   }
222 
223   // Note that data may be received for a FlipStream prior to the user calling
224   // ReadResponseBody(), therefore user_callback_ may be NULL.  This may often
225   // happen for server initiated streams.
226   if (user_callback_) {
227     int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
228     CHECK(rv != ERR_IO_PENDING);
229     user_buffer_ = NULL;
230     user_buffer_len_ = 0;
231     DoCallback(rv);
232   }
233 
234   return true;
235 }
236 
OnWriteComplete(int status)237 void FlipStream::OnWriteComplete(int status) {
238   // TODO(mbelshe): Check for cancellation here.  If we're cancelled, we
239   // should discontinue the DoLoop.
240 
241   if (status > 0)
242     send_bytes_ += status;
243 
244   DoLoop(status);
245 }
246 
OnClose(int status)247 void FlipStream::OnClose(int status) {
248   response_complete_ = true;
249   response_status_ = status;
250   stream_id_ = 0;
251 
252   if (user_callback_)
253     DoCallback(status);
254 
255   UpdateHistograms();
256 }
257 
DoLoop(int result)258 int FlipStream::DoLoop(int result) {
259   do {
260     State state = io_state_;
261     io_state_ = STATE_NONE;
262     switch (state) {
263       // State machine 1: Send headers and wait for response headers.
264       case STATE_SEND_HEADERS:
265         CHECK(result == OK);
266         LoadLog::BeginEvent(load_log_,
267                             LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
268         result = DoSendHeaders();
269         break;
270       case STATE_SEND_HEADERS_COMPLETE:
271         LoadLog::EndEvent(load_log_,
272                           LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
273         result = DoSendHeadersComplete(result);
274         break;
275       case STATE_SEND_BODY:
276         CHECK(result == OK);
277         LoadLog::BeginEvent(load_log_,
278                             LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
279         result = DoSendBody();
280         break;
281       case STATE_SEND_BODY_COMPLETE:
282         LoadLog::EndEvent(load_log_,
283                           LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
284         result = DoSendBodyComplete(result);
285         break;
286       case STATE_READ_HEADERS:
287         CHECK(result == OK);
288         LoadLog::BeginEvent(load_log_,
289                             LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
290         result = DoReadHeaders();
291         break;
292       case STATE_READ_HEADERS_COMPLETE:
293         LoadLog::EndEvent(load_log_,
294                           LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
295         result = DoReadHeadersComplete(result);
296         break;
297 
298       // State machine 2: Read body.
299       // NOTE(willchan): Currently unused.  Currently we handle this stuff in
300       // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc.  Only reason
301       // to do this is for consistency with the Http code.
302       case STATE_READ_BODY:
303         LoadLog::BeginEvent(load_log_,
304                             LoadLog::TYPE_FLIP_STREAM_READ_BODY);
305         result = DoReadBody();
306         break;
307       case STATE_READ_BODY_COMPLETE:
308         LoadLog::EndEvent(load_log_,
309                           LoadLog::TYPE_FLIP_STREAM_READ_BODY);
310         result = DoReadBodyComplete(result);
311         break;
312       case STATE_DONE:
313         DCHECK(result != ERR_IO_PENDING);
314         break;
315       default:
316         NOTREACHED();
317         break;
318     }
319   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);
320 
321   return result;
322 }
323 
DoCallback(int rv)324 void FlipStream::DoCallback(int rv) {
325   CHECK(rv != ERR_IO_PENDING);
326   CHECK(user_callback_);
327 
328   // Since Run may result in being called back, clear user_callback_ in advance.
329   CompletionCallback* c = user_callback_;
330   user_callback_ = NULL;
331   c->Run(rv);
332 }
333 
DoSendHeaders()334 int FlipStream::DoSendHeaders() {
335   // The FlipSession will always call us back when the send is complete.
336   // TODO(willchan): This code makes the assumption that for the non-push stream
337   // case, the client code calls SendRequest() after creating the stream and
338   // before yielding back to the MessageLoop.  This is true in the current code,
339   // but is not obvious from the headers.  We should make the code handle
340   // SendRequest() being called after the SYN_REPLY has been received.
341   io_state_ = STATE_SEND_HEADERS_COMPLETE;
342   return ERR_IO_PENDING;
343 }
344 
DoSendHeadersComplete(int result)345 int FlipStream::DoSendHeadersComplete(int result) {
346   if (result < 0)
347     return result;
348 
349   CHECK(result > 0);
350 
351   // There is no body, skip that state.
352   if (!request_body_stream_.get()) {
353     io_state_ = STATE_READ_HEADERS;
354     return OK;
355   }
356 
357   io_state_ = STATE_SEND_BODY;
358   return OK;
359 }
360 
361 // DoSendBody is called to send the optional body for the request.  This call
362 // will also be called as each write of a chunk of the body completes.
DoSendBody()363 int FlipStream::DoSendBody() {
364   // If we're already in the STATE_SENDING_BODY state, then we've already
365   // sent a portion of the body.  In that case, we need to first consume
366   // the bytes written in the body stream.  Note that the bytes written is
367   // the number of bytes in the frame that were written, only consume the
368   // data portion, of course.
369   io_state_ = STATE_SEND_BODY_COMPLETE;
370   int buf_len = static_cast<int>(request_body_stream_->buf_len());
371   return session_->WriteStreamData(stream_id_,
372                                    request_body_stream_->buf(),
373                                    buf_len);
374 }
375 
DoSendBodyComplete(int result)376 int FlipStream::DoSendBodyComplete(int result) {
377   if (result < 0)
378     return result;
379 
380   CHECK(result != 0);
381 
382   request_body_stream_->DidConsume(result);
383 
384   if (request_body_stream_->position() < request_body_stream_->size())
385     io_state_ = STATE_SEND_BODY;
386   else
387     io_state_ = STATE_READ_HEADERS;
388 
389   return OK;
390 }
391 
DoReadHeaders()392 int FlipStream::DoReadHeaders() {
393   io_state_ = STATE_READ_HEADERS_COMPLETE;
394   return response_->headers ? OK : ERR_IO_PENDING;
395 }
396 
DoReadHeadersComplete(int result)397 int FlipStream::DoReadHeadersComplete(int result) {
398   return result;
399 }
400 
DoReadBody()401 int FlipStream::DoReadBody() {
402   // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
403   // makes sense.
404   return ERR_IO_PENDING;
405 }
406 
DoReadBodyComplete(int result)407 int FlipStream::DoReadBodyComplete(int result) {
408   // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
409   // makes sense.
410   return ERR_IO_PENDING;
411 }
412 
UpdateHistograms()413 void FlipStream::UpdateHistograms() {
414   if (histograms_recorded_)
415     return;
416 
417   histograms_recorded_ = true;
418 
419   // We need all timers to be filled in, otherwise metrics can be bogus.
420   if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
421       recv_last_byte_time_.is_null())
422     return;
423 
424   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
425       recv_first_byte_time_ - send_time_);
426   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
427       recv_last_byte_time_ - recv_first_byte_time_);
428   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
429       recv_last_byte_time_ - send_time_);
430 
431   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
432   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
433 }
434 
435 }  // namespace net
436