1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/flip/flip_stream.h"
6
7 #include "base/logging.h"
8 #include "net/flip/flip_session.h"
9 #include "net/http/http_request_info.h"
10 #include "net/http/http_response_info.h"
11
12 namespace net {
13
FlipStream(FlipSession * session,flip::FlipStreamId stream_id,bool pushed,LoadLog * log)14 FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id,
15 bool pushed, LoadLog* log)
16 : stream_id_(stream_id),
17 priority_(0),
18 pushed_(pushed),
19 download_finished_(false),
20 metrics_(Singleton<BandwidthMetrics>::get()),
21 session_(session),
22 response_(NULL),
23 request_body_stream_(NULL),
24 response_complete_(false),
25 io_state_(STATE_NONE),
26 response_status_(OK),
27 user_callback_(NULL),
28 user_buffer_(NULL),
29 user_buffer_len_(0),
30 cancelled_(false),
31 load_log_(log),
32 send_bytes_(0),
33 recv_bytes_(0),
34 histograms_recorded_(false) {}
35
~FlipStream()36 FlipStream::~FlipStream() {
37 DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
38
39 // TODO(willchan): We're still calling CancelStream() too many times, because
40 // inactive pending/pushed streams will still have stream_id_ set.
41 if (stream_id_) {
42 session_->CancelStream(stream_id_);
43 } else if (!response_complete_) {
44 NOTREACHED();
45 }
46 }
47
GetUploadProgress() const48 uint64 FlipStream::GetUploadProgress() const {
49 if (!request_body_stream_.get())
50 return 0;
51
52 return request_body_stream_->position();
53 }
54
GetResponseInfo() const55 const HttpResponseInfo* FlipStream::GetResponseInfo() const {
56 return response_;
57 }
58
ReadResponseHeaders(CompletionCallback * callback)59 int FlipStream::ReadResponseHeaders(CompletionCallback* callback) {
60 // Note: The FlipStream may have already received the response headers, so
61 // this call may complete synchronously.
62 CHECK(callback);
63 CHECK(io_state_ == STATE_NONE);
64 CHECK(!cancelled_);
65
66 // The SYN_REPLY has already been received.
67 if (response_->headers)
68 return OK;
69
70 io_state_ = STATE_READ_HEADERS;
71 CHECK(!user_callback_);
72 user_callback_ = callback;
73 return ERR_IO_PENDING;
74 }
75
ReadResponseBody(IOBuffer * buf,int buf_len,CompletionCallback * callback)76 int FlipStream::ReadResponseBody(
77 IOBuffer* buf, int buf_len, CompletionCallback* callback) {
78 DCHECK_EQ(io_state_, STATE_NONE);
79 CHECK(buf);
80 CHECK(buf_len);
81 CHECK(callback);
82 CHECK(!cancelled_);
83
84 // If we have data buffered, complete the IO immediately.
85 if (response_body_.size()) {
86 int bytes_read = 0;
87 while (response_body_.size() && buf_len > 0) {
88 scoped_refptr<IOBufferWithSize> data = response_body_.front();
89 const int bytes_to_copy = std::min(buf_len, data->size());
90 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
91 buf_len -= bytes_to_copy;
92 if (bytes_to_copy == data->size()) {
93 response_body_.pop_front();
94 } else {
95 const int bytes_remaining = data->size() - bytes_to_copy;
96 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
97 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
98 bytes_remaining);
99 response_body_.pop_front();
100 response_body_.push_front(new_buffer);
101 }
102 bytes_read += bytes_to_copy;
103 }
104 if (bytes_read > 0)
105 recv_bytes_ += bytes_read;
106 return bytes_read;
107 } else if (response_complete_) {
108 return response_status_;
109 }
110
111 CHECK(!user_callback_);
112 CHECK(!user_buffer_);
113 CHECK(user_buffer_len_ == 0);
114
115 user_callback_ = callback;
116 user_buffer_ = buf;
117 user_buffer_len_ = buf_len;
118 return ERR_IO_PENDING;
119 }
120
SendRequest(UploadDataStream * upload_data,HttpResponseInfo * response,CompletionCallback * callback)121 int FlipStream::SendRequest(UploadDataStream* upload_data,
122 HttpResponseInfo* response,
123 CompletionCallback* callback) {
124 CHECK(callback);
125 CHECK(!cancelled_);
126 CHECK(response);
127
128 response_ = response;
129
130 if (upload_data) {
131 if (upload_data->size())
132 request_body_stream_.reset(upload_data);
133 else
134 delete upload_data;
135 }
136
137 send_time_ = base::TimeTicks::Now();
138
139 DCHECK_EQ(io_state_, STATE_NONE);
140 if (!pushed_)
141 io_state_ = STATE_SEND_HEADERS;
142 else
143 io_state_ = STATE_READ_HEADERS;
144 int result = DoLoop(OK);
145 if (result == ERR_IO_PENDING) {
146 CHECK(!user_callback_);
147 user_callback_ = callback;
148 }
149 return result;
150 }
151
Cancel()152 void FlipStream::Cancel() {
153 cancelled_ = true;
154 user_callback_ = NULL;
155
156 session_->CancelStream(stream_id_);
157 }
158
OnResponseReceived(const HttpResponseInfo & response)159 void FlipStream::OnResponseReceived(const HttpResponseInfo& response) {
160 metrics_.StartStream();
161
162 CHECK(!response_->headers);
163
164 *response_ = response; // TODO(mbelshe): avoid copy.
165 DCHECK(response_->headers);
166
167 recv_first_byte_time_ = base::TimeTicks::Now();
168
169 if (io_state_ == STATE_NONE) {
170 CHECK(pushed_);
171 } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) {
172 CHECK(!pushed_);
173 } else {
174 NOTREACHED();
175 }
176
177 int rv = DoLoop(OK);
178
179 if (user_callback_)
180 DoCallback(rv);
181 }
182
OnDataReceived(const char * data,int length)183 bool FlipStream::OnDataReceived(const char* data, int length) {
184 DCHECK_GE(length, 0);
185 LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
186 << stream_id_;
187
188 // If we don't have a response, then the SYN_REPLY did not come through.
189 // We cannot pass data up to the caller unless the reply headers have been
190 // received.
191 if (!response_->headers) {
192 OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
193 return false;
194 }
195
196 if (length > 0)
197 recv_bytes_ += length;
198 recv_last_byte_time_ = base::TimeTicks::Now();
199
200 // A zero-length read means that the stream is being closed.
201 if (!length) {
202 metrics_.StopStream();
203 download_finished_ = true;
204 OnClose(net::OK);
205 return true;
206 }
207
208 // Track our bandwidth.
209 metrics_.RecordBytes(length);
210
211 if (length > 0) {
212 // TODO(mbelshe): If read is pending, we should copy the data straight into
213 // the read buffer here. For now, we'll queue it always.
214 // TODO(mbelshe): We need to have some throttling on this. We shouldn't
215 // buffer an infinite amount of data.
216
217 IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
218 memcpy(io_buffer->data(), data, length);
219
220 response_body_.push_back(io_buffer);
221 }
222
223 // Note that data may be received for a FlipStream prior to the user calling
224 // ReadResponseBody(), therefore user_callback_ may be NULL. This may often
225 // happen for server initiated streams.
226 if (user_callback_) {
227 int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
228 CHECK(rv != ERR_IO_PENDING);
229 user_buffer_ = NULL;
230 user_buffer_len_ = 0;
231 DoCallback(rv);
232 }
233
234 return true;
235 }
236
OnWriteComplete(int status)237 void FlipStream::OnWriteComplete(int status) {
238 // TODO(mbelshe): Check for cancellation here. If we're cancelled, we
239 // should discontinue the DoLoop.
240
241 if (status > 0)
242 send_bytes_ += status;
243
244 DoLoop(status);
245 }
246
OnClose(int status)247 void FlipStream::OnClose(int status) {
248 response_complete_ = true;
249 response_status_ = status;
250 stream_id_ = 0;
251
252 if (user_callback_)
253 DoCallback(status);
254
255 UpdateHistograms();
256 }
257
DoLoop(int result)258 int FlipStream::DoLoop(int result) {
259 do {
260 State state = io_state_;
261 io_state_ = STATE_NONE;
262 switch (state) {
263 // State machine 1: Send headers and wait for response headers.
264 case STATE_SEND_HEADERS:
265 CHECK(result == OK);
266 LoadLog::BeginEvent(load_log_,
267 LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
268 result = DoSendHeaders();
269 break;
270 case STATE_SEND_HEADERS_COMPLETE:
271 LoadLog::EndEvent(load_log_,
272 LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
273 result = DoSendHeadersComplete(result);
274 break;
275 case STATE_SEND_BODY:
276 CHECK(result == OK);
277 LoadLog::BeginEvent(load_log_,
278 LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
279 result = DoSendBody();
280 break;
281 case STATE_SEND_BODY_COMPLETE:
282 LoadLog::EndEvent(load_log_,
283 LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
284 result = DoSendBodyComplete(result);
285 break;
286 case STATE_READ_HEADERS:
287 CHECK(result == OK);
288 LoadLog::BeginEvent(load_log_,
289 LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
290 result = DoReadHeaders();
291 break;
292 case STATE_READ_HEADERS_COMPLETE:
293 LoadLog::EndEvent(load_log_,
294 LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
295 result = DoReadHeadersComplete(result);
296 break;
297
298 // State machine 2: Read body.
299 // NOTE(willchan): Currently unused. Currently we handle this stuff in
300 // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc. Only reason
301 // to do this is for consistency with the Http code.
302 case STATE_READ_BODY:
303 LoadLog::BeginEvent(load_log_,
304 LoadLog::TYPE_FLIP_STREAM_READ_BODY);
305 result = DoReadBody();
306 break;
307 case STATE_READ_BODY_COMPLETE:
308 LoadLog::EndEvent(load_log_,
309 LoadLog::TYPE_FLIP_STREAM_READ_BODY);
310 result = DoReadBodyComplete(result);
311 break;
312 case STATE_DONE:
313 DCHECK(result != ERR_IO_PENDING);
314 break;
315 default:
316 NOTREACHED();
317 break;
318 }
319 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);
320
321 return result;
322 }
323
DoCallback(int rv)324 void FlipStream::DoCallback(int rv) {
325 CHECK(rv != ERR_IO_PENDING);
326 CHECK(user_callback_);
327
328 // Since Run may result in being called back, clear user_callback_ in advance.
329 CompletionCallback* c = user_callback_;
330 user_callback_ = NULL;
331 c->Run(rv);
332 }
333
DoSendHeaders()334 int FlipStream::DoSendHeaders() {
335 // The FlipSession will always call us back when the send is complete.
336 // TODO(willchan): This code makes the assumption that for the non-push stream
337 // case, the client code calls SendRequest() after creating the stream and
338 // before yielding back to the MessageLoop. This is true in the current code,
339 // but is not obvious from the headers. We should make the code handle
340 // SendRequest() being called after the SYN_REPLY has been received.
341 io_state_ = STATE_SEND_HEADERS_COMPLETE;
342 return ERR_IO_PENDING;
343 }
344
DoSendHeadersComplete(int result)345 int FlipStream::DoSendHeadersComplete(int result) {
346 if (result < 0)
347 return result;
348
349 CHECK(result > 0);
350
351 // There is no body, skip that state.
352 if (!request_body_stream_.get()) {
353 io_state_ = STATE_READ_HEADERS;
354 return OK;
355 }
356
357 io_state_ = STATE_SEND_BODY;
358 return OK;
359 }
360
361 // DoSendBody is called to send the optional body for the request. This call
362 // will also be called as each write of a chunk of the body completes.
DoSendBody()363 int FlipStream::DoSendBody() {
364 // If we're already in the STATE_SENDING_BODY state, then we've already
365 // sent a portion of the body. In that case, we need to first consume
366 // the bytes written in the body stream. Note that the bytes written is
367 // the number of bytes in the frame that were written, only consume the
368 // data portion, of course.
369 io_state_ = STATE_SEND_BODY_COMPLETE;
370 int buf_len = static_cast<int>(request_body_stream_->buf_len());
371 return session_->WriteStreamData(stream_id_,
372 request_body_stream_->buf(),
373 buf_len);
374 }
375
DoSendBodyComplete(int result)376 int FlipStream::DoSendBodyComplete(int result) {
377 if (result < 0)
378 return result;
379
380 CHECK(result != 0);
381
382 request_body_stream_->DidConsume(result);
383
384 if (request_body_stream_->position() < request_body_stream_->size())
385 io_state_ = STATE_SEND_BODY;
386 else
387 io_state_ = STATE_READ_HEADERS;
388
389 return OK;
390 }
391
DoReadHeaders()392 int FlipStream::DoReadHeaders() {
393 io_state_ = STATE_READ_HEADERS_COMPLETE;
394 return response_->headers ? OK : ERR_IO_PENDING;
395 }
396
DoReadHeadersComplete(int result)397 int FlipStream::DoReadHeadersComplete(int result) {
398 return result;
399 }
400
DoReadBody()401 int FlipStream::DoReadBody() {
402 // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
403 // makes sense.
404 return ERR_IO_PENDING;
405 }
406
DoReadBodyComplete(int result)407 int FlipStream::DoReadBodyComplete(int result) {
408 // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
409 // makes sense.
410 return ERR_IO_PENDING;
411 }
412
UpdateHistograms()413 void FlipStream::UpdateHistograms() {
414 if (histograms_recorded_)
415 return;
416
417 histograms_recorded_ = true;
418
419 // We need all timers to be filled in, otherwise metrics can be bogus.
420 if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
421 recv_last_byte_time_.is_null())
422 return;
423
424 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
425 recv_first_byte_time_ - send_time_);
426 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
427 recv_last_byte_time_ - recv_first_byte_time_);
428 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
429 recv_last_byte_time_ - send_time_);
430
431 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
432 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
433 }
434
435 } // namespace net
436