1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_pipelined_connection_impl.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/stl_util.h"
11 #include "base/values.h"
12 #include "net/base/io_buffer.h"
13 #include "net/http/http_pipelined_stream.h"
14 #include "net/http/http_request_info.h"
15 #include "net/http/http_response_body_drainer.h"
16 #include "net/http/http_response_headers.h"
17 #include "net/http/http_stream_parser.h"
18 #include "net/http/http_version.h"
19 #include "net/socket/client_socket_handle.h"
20
21 namespace net {
22
23 namespace {
24
NetLogReceivedHeadersCallback(const NetLog::Source & source,const std::string * feedback,NetLog::LogLevel)25 base::Value* NetLogReceivedHeadersCallback(const NetLog::Source& source,
26 const std::string* feedback,
27 NetLog::LogLevel /* log_level */) {
28 base::DictionaryValue* dict = new base::DictionaryValue;
29 source.AddToEventParameters(dict);
30 dict->SetString("feedback", *feedback);
31 return dict;
32 }
33
NetLogStreamClosedCallback(const NetLog::Source & source,bool not_reusable,NetLog::LogLevel)34 base::Value* NetLogStreamClosedCallback(const NetLog::Source& source,
35 bool not_reusable,
36 NetLog::LogLevel /* log_level */) {
37 base::DictionaryValue* dict = new base::DictionaryValue;
38 source.AddToEventParameters(dict);
39 dict->SetBoolean("not_reusable", not_reusable);
40 return dict;
41 }
42
NetLogHostPortPairCallback(const HostPortPair * host_port_pair,NetLog::LogLevel)43 base::Value* NetLogHostPortPairCallback(const HostPortPair* host_port_pair,
44 NetLog::LogLevel /* log_level */) {
45 base::DictionaryValue* dict = new base::DictionaryValue;
46 dict->SetString("host_and_port", host_port_pair->ToString());
47 return dict;
48 }
49
50 } // anonymous namespace
51
52 HttpPipelinedConnection*
CreateNewPipeline(ClientSocketHandle * connection,HttpPipelinedConnection::Delegate * delegate,const HostPortPair & origin,const SSLConfig & used_ssl_config,const ProxyInfo & used_proxy_info,const BoundNetLog & net_log,bool was_npn_negotiated,NextProto protocol_negotiated)53 HttpPipelinedConnectionImpl::Factory::CreateNewPipeline(
54 ClientSocketHandle* connection,
55 HttpPipelinedConnection::Delegate* delegate,
56 const HostPortPair& origin,
57 const SSLConfig& used_ssl_config,
58 const ProxyInfo& used_proxy_info,
59 const BoundNetLog& net_log,
60 bool was_npn_negotiated,
61 NextProto protocol_negotiated) {
62 return new HttpPipelinedConnectionImpl(connection, delegate, origin,
63 used_ssl_config, used_proxy_info,
64 net_log, was_npn_negotiated,
65 protocol_negotiated);
66 }
67
HttpPipelinedConnectionImpl(ClientSocketHandle * connection,HttpPipelinedConnection::Delegate * delegate,const HostPortPair & origin,const SSLConfig & used_ssl_config,const ProxyInfo & used_proxy_info,const BoundNetLog & net_log,bool was_npn_negotiated,NextProto protocol_negotiated)68 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
69 ClientSocketHandle* connection,
70 HttpPipelinedConnection::Delegate* delegate,
71 const HostPortPair& origin,
72 const SSLConfig& used_ssl_config,
73 const ProxyInfo& used_proxy_info,
74 const BoundNetLog& net_log,
75 bool was_npn_negotiated,
76 NextProto protocol_negotiated)
77 : delegate_(delegate),
78 connection_(connection),
79 used_ssl_config_(used_ssl_config),
80 used_proxy_info_(used_proxy_info),
81 net_log_(BoundNetLog::Make(net_log.net_log(),
82 NetLog::SOURCE_HTTP_PIPELINED_CONNECTION)),
83 was_npn_negotiated_(was_npn_negotiated),
84 protocol_negotiated_(protocol_negotiated),
85 read_buf_(new GrowableIOBuffer()),
86 next_pipeline_id_(1),
87 active_(false),
88 usable_(true),
89 completed_one_request_(false),
90 weak_factory_(this),
91 send_next_state_(SEND_STATE_NONE),
92 send_still_on_call_stack_(false),
93 read_next_state_(READ_STATE_NONE),
94 active_read_id_(0),
95 read_still_on_call_stack_(false) {
96 CHECK(connection_.get());
97 net_log_.BeginEvent(
98 NetLog::TYPE_HTTP_PIPELINED_CONNECTION,
99 base::Bind(&NetLogHostPortPairCallback, &origin));
100 }
101
~HttpPipelinedConnectionImpl()102 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
103 CHECK_EQ(depth(), 0);
104 CHECK(stream_info_map_.empty());
105 CHECK(pending_send_request_queue_.empty());
106 CHECK(request_order_.empty());
107 CHECK_EQ(send_next_state_, SEND_STATE_NONE);
108 CHECK_EQ(read_next_state_, READ_STATE_NONE);
109 CHECK(!active_send_request_.get());
110 CHECK(!active_read_id_);
111 if (!usable_) {
112 connection_->socket()->Disconnect();
113 }
114 connection_->Reset();
115 net_log_.EndEvent(NetLog::TYPE_HTTP_PIPELINED_CONNECTION);
116 }
117
CreateNewStream()118 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
119 int pipeline_id = next_pipeline_id_++;
120 CHECK(pipeline_id);
121 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
122 stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo()));
123 return stream;
124 }
125
InitializeParser(int pipeline_id,const HttpRequestInfo * request,const BoundNetLog & net_log)126 void HttpPipelinedConnectionImpl::InitializeParser(
127 int pipeline_id,
128 const HttpRequestInfo* request,
129 const BoundNetLog& net_log) {
130 CHECK(ContainsKey(stream_info_map_, pipeline_id));
131 CHECK(!stream_info_map_[pipeline_id].parser.get());
132 stream_info_map_[pipeline_id].state = STREAM_BOUND;
133 stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser(
134 connection_.get(), request, read_buf_.get(), net_log));
135 stream_info_map_[pipeline_id].source = net_log.source();
136
137 // In case our first stream doesn't SendRequest() immediately, we should still
138 // allow others to use this pipeline.
139 if (pipeline_id == 1) {
140 base::MessageLoop::current()->PostTask(
141 FROM_HERE,
142 base::Bind(&HttpPipelinedConnectionImpl::ActivatePipeline,
143 weak_factory_.GetWeakPtr()));
144 }
145 }
146
ActivatePipeline()147 void HttpPipelinedConnectionImpl::ActivatePipeline() {
148 if (!active_) {
149 active_ = true;
150 delegate_->OnPipelineHasCapacity(this);
151 }
152 }
153
OnStreamDeleted(int pipeline_id)154 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
155 CHECK(ContainsKey(stream_info_map_, pipeline_id));
156 Close(pipeline_id, false);
157
158 if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
159 stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
160 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
161 CHECK(stream_info_map_[pipeline_id].parser.get());
162 stream_info_map_[pipeline_id].parser.reset();
163 }
164 CHECK(!stream_info_map_[pipeline_id].parser.get());
165 stream_info_map_.erase(pipeline_id);
166
167 delegate_->OnPipelineHasCapacity(this);
168 }
169
SendRequest(int pipeline_id,const std::string & request_line,const HttpRequestHeaders & headers,HttpResponseInfo * response,const CompletionCallback & callback)170 int HttpPipelinedConnectionImpl::SendRequest(
171 int pipeline_id,
172 const std::string& request_line,
173 const HttpRequestHeaders& headers,
174 HttpResponseInfo* response,
175 const CompletionCallback& callback) {
176 CHECK(ContainsKey(stream_info_map_, pipeline_id));
177 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
178 if (!usable_) {
179 return ERR_PIPELINE_EVICTION;
180 }
181
182 PendingSendRequest* send_request = new PendingSendRequest;
183 send_request->pipeline_id = pipeline_id;
184 send_request->request_line = request_line;
185 send_request->headers = headers;
186 send_request->response = response;
187 send_request->callback = callback;
188 pending_send_request_queue_.push(send_request);
189
190 int rv;
191 if (send_next_state_ == SEND_STATE_NONE) {
192 send_next_state_ = SEND_STATE_START_IMMEDIATELY;
193 rv = DoSendRequestLoop(OK);
194 } else {
195 rv = ERR_IO_PENDING;
196 }
197 ActivatePipeline();
198 return rv;
199 }
200
DoSendRequestLoop(int result)201 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
202 int rv = result;
203 do {
204 SendRequestState state = send_next_state_;
205 send_next_state_ = SEND_STATE_NONE;
206 switch (state) {
207 case SEND_STATE_START_IMMEDIATELY:
208 rv = DoStartRequestImmediately(rv);
209 break;
210 case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
211 rv = DoStartNextDeferredRequest(rv);
212 break;
213 case SEND_STATE_SEND_ACTIVE_REQUEST:
214 rv = DoSendActiveRequest(rv);
215 break;
216 case SEND_STATE_COMPLETE:
217 rv = DoSendComplete(rv);
218 break;
219 case SEND_STATE_EVICT_PENDING_REQUESTS:
220 rv = DoEvictPendingSendRequests(rv);
221 break;
222 default:
223 CHECK(false) << "bad send state: " << state;
224 rv = ERR_FAILED;
225 break;
226 }
227 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
228 send_still_on_call_stack_ = false;
229 return rv;
230 }
231
OnSendIOCallback(int result)232 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
233 CHECK(active_send_request_.get());
234 DoSendRequestLoop(result);
235 }
236
DoStartRequestImmediately(int result)237 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
238 CHECK(!active_send_request_.get());
239 CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
240 // If SendRequest() completes synchronously, then we need to return the value
241 // directly to the caller. |send_still_on_call_stack_| will track this.
242 // Otherwise, asynchronous completions will notify the caller via callback.
243 send_still_on_call_stack_ = true;
244 active_send_request_.reset(pending_send_request_queue_.front());
245 pending_send_request_queue_.pop();
246 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
247 return OK;
248 }
249
DoStartNextDeferredRequest(int result)250 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
251 CHECK(!send_still_on_call_stack_);
252 CHECK(!active_send_request_.get());
253
254 while (!pending_send_request_queue_.empty()) {
255 scoped_ptr<PendingSendRequest> next_request(
256 pending_send_request_queue_.front());
257 pending_send_request_queue_.pop();
258 CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
259 if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
260 active_send_request_.reset(next_request.release());
261 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
262 return OK;
263 }
264 }
265
266 send_next_state_ = SEND_STATE_NONE;
267 return OK;
268 }
269
DoSendActiveRequest(int result)270 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
271 CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
272 int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
273 SendRequest(active_send_request_->request_line,
274 active_send_request_->headers,
275 active_send_request_->response,
276 base::Bind(&HttpPipelinedConnectionImpl::OnSendIOCallback,
277 base::Unretained(this)));
278 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
279 send_next_state_ = SEND_STATE_COMPLETE;
280 return rv;
281 }
282
DoSendComplete(int result)283 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
284 CHECK(active_send_request_.get());
285 CHECK_EQ(STREAM_SENDING,
286 stream_info_map_[active_send_request_->pipeline_id].state);
287
288 request_order_.push(active_send_request_->pipeline_id);
289 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
290 net_log_.AddEvent(
291 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_SENT_REQUEST,
292 stream_info_map_[active_send_request_->pipeline_id].source.
293 ToEventParametersCallback());
294
295 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
296 result = ERR_PIPELINE_EVICTION;
297 }
298 if (result < OK) {
299 usable_ = false;
300 }
301
302 if (!send_still_on_call_stack_) {
303 QueueUserCallback(active_send_request_->pipeline_id,
304 active_send_request_->callback, result, FROM_HERE);
305 }
306
307 active_send_request_.reset();
308
309 if (send_still_on_call_stack_) {
310 // It should be impossible for another request to appear on the queue while
311 // this send was on the call stack.
312 CHECK(pending_send_request_queue_.empty());
313 send_next_state_ = SEND_STATE_NONE;
314 } else if (!usable_) {
315 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
316 } else {
317 send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
318 }
319
320 return result;
321 }
322
DoEvictPendingSendRequests(int result)323 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
324 while (!pending_send_request_queue_.empty()) {
325 scoped_ptr<PendingSendRequest> evicted_send(
326 pending_send_request_queue_.front());
327 pending_send_request_queue_.pop();
328 if (ContainsKey(stream_info_map_, evicted_send->pipeline_id) &&
329 stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
330 evicted_send->callback.Run(ERR_PIPELINE_EVICTION);
331 }
332 }
333 send_next_state_ = SEND_STATE_NONE;
334 return result;
335 }
336
ReadResponseHeaders(int pipeline_id,const CompletionCallback & callback)337 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
338 int pipeline_id, const CompletionCallback& callback) {
339 CHECK(ContainsKey(stream_info_map_, pipeline_id));
340 CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
341 CHECK(stream_info_map_[pipeline_id].read_headers_callback.is_null());
342
343 if (!usable_)
344 return ERR_PIPELINE_EVICTION;
345
346 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
347 stream_info_map_[pipeline_id].read_headers_callback = callback;
348 if (read_next_state_ == READ_STATE_NONE &&
349 pipeline_id == request_order_.front()) {
350 read_next_state_ = READ_STATE_START_IMMEDIATELY;
351 return DoReadHeadersLoop(OK);
352 }
353 return ERR_IO_PENDING;
354 }
355
StartNextDeferredRead()356 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
357 if (read_next_state_ == READ_STATE_NONE) {
358 read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
359 DoReadHeadersLoop(OK);
360 }
361 }
362
DoReadHeadersLoop(int result)363 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
364 int rv = result;
365 do {
366 ReadHeadersState state = read_next_state_;
367 read_next_state_ = READ_STATE_NONE;
368 switch (state) {
369 case READ_STATE_START_IMMEDIATELY:
370 rv = DoStartReadImmediately(rv);
371 break;
372 case READ_STATE_START_NEXT_DEFERRED_READ:
373 rv = DoStartNextDeferredRead(rv);
374 break;
375 case READ_STATE_READ_HEADERS:
376 rv = DoReadHeaders(rv);
377 break;
378 case READ_STATE_READ_HEADERS_COMPLETE:
379 rv = DoReadHeadersComplete(rv);
380 break;
381 case READ_STATE_WAITING_FOR_CLOSE:
382 // This is a holding state. We return instead of continuing to run hte
383 // loop. The state will advance when the stream calls Close().
384 rv = DoReadWaitForClose(rv);
385 read_still_on_call_stack_ = false;
386 return rv;
387 case READ_STATE_STREAM_CLOSED:
388 rv = DoReadStreamClosed();
389 break;
390 case READ_STATE_EVICT_PENDING_READS:
391 rv = DoEvictPendingReadHeaders(rv);
392 break;
393 case READ_STATE_NONE:
394 break;
395 default:
396 CHECK(false) << "bad read state";
397 rv = ERR_FAILED;
398 break;
399 }
400 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
401 read_still_on_call_stack_ = false;
402 return rv;
403 }
404
OnReadIOCallback(int result)405 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
406 DoReadHeadersLoop(result);
407 }
408
DoStartReadImmediately(int result)409 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
410 CHECK(!active_read_id_);
411 CHECK(!read_still_on_call_stack_);
412 CHECK(!request_order_.empty());
413 // If ReadResponseHeaders() completes synchronously, then we need to return
414 // the value directly to the caller. |read_still_on_call_stack_| will track
415 // this. Otherwise, asynchronous completions will notify the caller via
416 // callback.
417 read_still_on_call_stack_ = true;
418 read_next_state_ = READ_STATE_READ_HEADERS;
419 active_read_id_ = request_order_.front();
420 request_order_.pop();
421 return OK;
422 }
423
DoStartNextDeferredRead(int result)424 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
425 CHECK(!active_read_id_);
426 CHECK(!read_still_on_call_stack_);
427
428 if (request_order_.empty()) {
429 read_next_state_ = READ_STATE_NONE;
430 return OK;
431 }
432
433 int next_id = request_order_.front();
434 CHECK(ContainsKey(stream_info_map_, next_id));
435 switch (stream_info_map_[next_id].state) {
436 case STREAM_READ_PENDING:
437 read_next_state_ = READ_STATE_READ_HEADERS;
438 active_read_id_ = next_id;
439 request_order_.pop();
440 break;
441
442 case STREAM_CLOSED:
443 // Since nobody will read whatever data is on the pipeline associated with
444 // this closed request, we must shut down the rest of the pipeline.
445 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
446 break;
447
448 case STREAM_SENT:
449 read_next_state_ = READ_STATE_NONE;
450 break;
451
452 default:
453 CHECK(false) << "Unexpected read state: "
454 << stream_info_map_[next_id].state;
455 }
456
457 return OK;
458 }
459
DoReadHeaders(int result)460 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
461 CHECK(active_read_id_);
462 CHECK(ContainsKey(stream_info_map_, active_read_id_));
463 CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
464 stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
465 int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
466 base::Bind(&HttpPipelinedConnectionImpl::OnReadIOCallback,
467 base::Unretained(this)));
468 read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
469 return rv;
470 }
471
DoReadHeadersComplete(int result)472 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
473 CHECK(active_read_id_);
474 CHECK(ContainsKey(stream_info_map_, active_read_id_));
475 CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
476
477 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
478 if (result < OK) {
479 if (completed_one_request_ &&
480 (result == ERR_CONNECTION_CLOSED ||
481 result == ERR_EMPTY_RESPONSE ||
482 result == ERR_SOCKET_NOT_CONNECTED)) {
483 // These usually indicate that pipelining failed on the server side. In
484 // that case, we should retry without pipelining.
485 result = ERR_PIPELINE_EVICTION;
486 }
487 usable_ = false;
488 }
489
490 CheckHeadersForPipelineCompatibility(active_read_id_, result);
491
492 if (!read_still_on_call_stack_) {
493 QueueUserCallback(active_read_id_,
494 stream_info_map_[active_read_id_].read_headers_callback,
495 result, FROM_HERE);
496 }
497
498 return result;
499 }
500
DoReadWaitForClose(int result)501 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
502 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
503 return result;
504 }
505
DoReadStreamClosed()506 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
507 CHECK(active_read_id_);
508 CHECK(ContainsKey(stream_info_map_, active_read_id_));
509 CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
510 active_read_id_ = 0;
511 if (!usable_) {
512 // TODO(simonjam): Don't wait this long to evict.
513 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
514 return OK;
515 }
516 completed_one_request_ = true;
517 base::MessageLoop::current()->PostTask(
518 FROM_HERE,
519 base::Bind(&HttpPipelinedConnectionImpl::StartNextDeferredRead,
520 weak_factory_.GetWeakPtr()));
521 read_next_state_ = READ_STATE_NONE;
522 return OK;
523 }
524
DoEvictPendingReadHeaders(int result)525 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
526 while (!request_order_.empty()) {
527 int evicted_id = request_order_.front();
528 request_order_.pop();
529 if (!ContainsKey(stream_info_map_, evicted_id)) {
530 continue;
531 }
532 if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
533 stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
534 stream_info_map_[evicted_id].read_headers_callback.Run(
535 ERR_PIPELINE_EVICTION);
536 }
537 }
538 read_next_state_ = READ_STATE_NONE;
539 return result;
540 }
541
Close(int pipeline_id,bool not_reusable)542 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
543 bool not_reusable) {
544 CHECK(ContainsKey(stream_info_map_, pipeline_id));
545 net_log_.AddEvent(
546 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_STREAM_CLOSED,
547 base::Bind(&NetLogStreamClosedCallback,
548 stream_info_map_[pipeline_id].source, not_reusable));
549 switch (stream_info_map_[pipeline_id].state) {
550 case STREAM_CREATED:
551 stream_info_map_[pipeline_id].state = STREAM_UNUSED;
552 break;
553
554 case STREAM_BOUND:
555 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
556 break;
557
558 case STREAM_SENDING:
559 usable_ = false;
560 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
561 active_send_request_.reset();
562 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
563 DoSendRequestLoop(OK);
564 break;
565
566 case STREAM_SENT:
567 case STREAM_READ_PENDING:
568 usable_ = false;
569 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
570 if (!request_order_.empty() &&
571 pipeline_id == request_order_.front() &&
572 read_next_state_ == READ_STATE_NONE) {
573 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
574 DoReadHeadersLoop(OK);
575 }
576 break;
577
578 case STREAM_ACTIVE:
579 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
580 if (not_reusable) {
581 usable_ = false;
582 }
583 read_next_state_ = READ_STATE_STREAM_CLOSED;
584 DoReadHeadersLoop(OK);
585 break;
586
587 case STREAM_READ_EVICTED:
588 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
589 break;
590
591 case STREAM_CLOSED:
592 case STREAM_UNUSED:
593 // TODO(simonjam): Why is Close() sometimes called twice?
594 break;
595
596 default:
597 CHECK(false);
598 break;
599 }
600 }
601
ReadResponseBody(int pipeline_id,IOBuffer * buf,int buf_len,const CompletionCallback & callback)602 int HttpPipelinedConnectionImpl::ReadResponseBody(
603 int pipeline_id, IOBuffer* buf, int buf_len,
604 const CompletionCallback& callback) {
605 CHECK(ContainsKey(stream_info_map_, pipeline_id));
606 CHECK_EQ(active_read_id_, pipeline_id);
607 CHECK(stream_info_map_[pipeline_id].parser.get());
608 return stream_info_map_[pipeline_id].parser->ReadResponseBody(
609 buf, buf_len, callback);
610 }
611
GetUploadProgress(int pipeline_id) const612 UploadProgress HttpPipelinedConnectionImpl::GetUploadProgress(
613 int pipeline_id) const {
614 CHECK(ContainsKey(stream_info_map_, pipeline_id));
615 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
616 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
617 }
618
GetResponseInfo(int pipeline_id)619 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
620 int pipeline_id) {
621 CHECK(ContainsKey(stream_info_map_, pipeline_id));
622 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
623 return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo();
624 }
625
IsResponseBodyComplete(int pipeline_id) const626 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
627 int pipeline_id) const {
628 CHECK(ContainsKey(stream_info_map_, pipeline_id));
629 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
630 return stream_info_map_.find(pipeline_id)->second.parser->
631 IsResponseBodyComplete();
632 }
633
CanFindEndOfResponse(int pipeline_id) const634 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
635 CHECK(ContainsKey(stream_info_map_, pipeline_id));
636 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
637 return stream_info_map_.find(pipeline_id)->second.parser->
638 CanFindEndOfResponse();
639 }
640
IsConnectionReused(int pipeline_id) const641 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
642 CHECK(ContainsKey(stream_info_map_, pipeline_id));
643 if (pipeline_id > 1) {
644 return true;
645 }
646 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
647 return connection_->is_reused() ||
648 reuse_type == ClientSocketHandle::UNUSED_IDLE;
649 }
650
SetConnectionReused(int pipeline_id)651 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
652 CHECK(ContainsKey(stream_info_map_, pipeline_id));
653 connection_->set_is_reused(true);
654 }
655
GetTotalReceivedBytes(int pipeline_id) const656 int64 HttpPipelinedConnectionImpl::GetTotalReceivedBytes(
657 int pipeline_id) const {
658 CHECK(ContainsKey(stream_info_map_, pipeline_id));
659 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
660 return stream_info_map_.find(pipeline_id)->second.parser->received_bytes();
661 }
662
GetLoadTimingInfo(int pipeline_id,LoadTimingInfo * load_timing_info) const663 bool HttpPipelinedConnectionImpl::GetLoadTimingInfo(
664 int pipeline_id, LoadTimingInfo* load_timing_info) const {
665 return connection_->GetLoadTimingInfo(IsConnectionReused(pipeline_id),
666 load_timing_info);
667 }
668
GetSSLInfo(int pipeline_id,SSLInfo * ssl_info)669 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
670 SSLInfo* ssl_info) {
671 CHECK(ContainsKey(stream_info_map_, pipeline_id));
672 CHECK(stream_info_map_[pipeline_id].parser.get());
673 stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info);
674 }
675
GetSSLCertRequestInfo(int pipeline_id,SSLCertRequestInfo * cert_request_info)676 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
677 int pipeline_id,
678 SSLCertRequestInfo* cert_request_info) {
679 CHECK(ContainsKey(stream_info_map_, pipeline_id));
680 CHECK(stream_info_map_[pipeline_id].parser.get());
681 stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
682 cert_request_info);
683 }
684
Drain(HttpPipelinedStream * stream,HttpNetworkSession * session)685 void HttpPipelinedConnectionImpl::Drain(HttpPipelinedStream* stream,
686 HttpNetworkSession* session) {
687 HttpResponseHeaders* headers = stream->GetResponseInfo()->headers.get();
688 if (!stream->CanFindEndOfResponse() || headers->IsChunkEncoded() ||
689 !usable_) {
690 // TODO(simonjam): Drain chunk-encoded responses if they're relatively
691 // common.
692 stream->Close(true);
693 delete stream;
694 return;
695 }
696 HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(stream);
697 drainer->StartWithSize(session, headers->GetContentLength());
698 // |drainer| will delete itself when done.
699 }
700
CheckHeadersForPipelineCompatibility(int pipeline_id,int result)701 void HttpPipelinedConnectionImpl::CheckHeadersForPipelineCompatibility(
702 int pipeline_id,
703 int result) {
704 if (result < OK) {
705 switch (result) {
706 // TODO(simonjam): Ignoring specific errors like this may not work.
707 // Collect metrics to see if this code is useful.
708 case ERR_ABORTED:
709 case ERR_INTERNET_DISCONNECTED:
710 case ERR_NETWORK_CHANGED:
711 // These errors are no fault of the server.
712 break;
713
714 default:
715 ReportPipelineFeedback(pipeline_id, PIPELINE_SOCKET_ERROR);
716 break;
717 }
718 return;
719 }
720 HttpResponseInfo* info = GetResponseInfo(pipeline_id);
721 const HttpVersion required_version(1, 1);
722 if (info->headers->GetParsedHttpVersion() < required_version) {
723 ReportPipelineFeedback(pipeline_id, OLD_HTTP_VERSION);
724 return;
725 }
726 if (!info->headers->IsKeepAlive() || !CanFindEndOfResponse(pipeline_id)) {
727 usable_ = false;
728 ReportPipelineFeedback(pipeline_id, MUST_CLOSE_CONNECTION);
729 return;
730 }
731 if (info->headers->HasHeader(
732 HttpAuth::GetChallengeHeaderName(HttpAuth::AUTH_SERVER))) {
733 ReportPipelineFeedback(pipeline_id, AUTHENTICATION_REQUIRED);
734 return;
735 }
736 ReportPipelineFeedback(pipeline_id, OK);
737 }
738
ReportPipelineFeedback(int pipeline_id,Feedback feedback)739 void HttpPipelinedConnectionImpl::ReportPipelineFeedback(int pipeline_id,
740 Feedback feedback) {
741 std::string feedback_str;
742 switch (feedback) {
743 case OK:
744 feedback_str = "OK";
745 break;
746
747 case PIPELINE_SOCKET_ERROR:
748 feedback_str = "PIPELINE_SOCKET_ERROR";
749 break;
750
751 case OLD_HTTP_VERSION:
752 feedback_str = "OLD_HTTP_VERSION";
753 break;
754
755 case MUST_CLOSE_CONNECTION:
756 feedback_str = "MUST_CLOSE_CONNECTION";
757 break;
758
759 case AUTHENTICATION_REQUIRED:
760 feedback_str = "AUTHENTICATION_REQUIRED";
761 break;
762
763 default:
764 NOTREACHED();
765 feedback_str = "UNKNOWN";
766 break;
767 }
768 net_log_.AddEvent(
769 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_RECEIVED_HEADERS,
770 base::Bind(&NetLogReceivedHeadersCallback,
771 stream_info_map_[pipeline_id].source, &feedback_str));
772 delegate_->OnPipelineFeedback(this, feedback);
773 }
774
QueueUserCallback(int pipeline_id,const CompletionCallback & callback,int rv,const tracked_objects::Location & from_here)775 void HttpPipelinedConnectionImpl::QueueUserCallback(
776 int pipeline_id, const CompletionCallback& callback, int rv,
777 const tracked_objects::Location& from_here) {
778 CHECK(stream_info_map_[pipeline_id].pending_user_callback.is_null());
779 stream_info_map_[pipeline_id].pending_user_callback = callback;
780 base::MessageLoop::current()->PostTask(
781 from_here,
782 base::Bind(&HttpPipelinedConnectionImpl::FireUserCallback,
783 weak_factory_.GetWeakPtr(), pipeline_id, rv));
784 }
785
FireUserCallback(int pipeline_id,int result)786 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
787 int result) {
788 if (ContainsKey(stream_info_map_, pipeline_id)) {
789 CHECK(!stream_info_map_[pipeline_id].pending_user_callback.is_null());
790 CompletionCallback callback =
791 stream_info_map_[pipeline_id].pending_user_callback;
792 stream_info_map_[pipeline_id].pending_user_callback.Reset();
793 callback.Run(result);
794 }
795 }
796
depth() const797 int HttpPipelinedConnectionImpl::depth() const {
798 return stream_info_map_.size();
799 }
800
usable() const801 bool HttpPipelinedConnectionImpl::usable() const {
802 return usable_;
803 }
804
active() const805 bool HttpPipelinedConnectionImpl::active() const {
806 return active_;
807 }
808
used_ssl_config() const809 const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const {
810 return used_ssl_config_;
811 }
812
used_proxy_info() const813 const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const {
814 return used_proxy_info_;
815 }
816
net_log() const817 const BoundNetLog& HttpPipelinedConnectionImpl::net_log() const {
818 return net_log_;
819 }
820
was_npn_negotiated() const821 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
822 return was_npn_negotiated_;
823 }
824
protocol_negotiated() const825 NextProto HttpPipelinedConnectionImpl::protocol_negotiated()
826 const {
827 return protocol_negotiated_;
828 }
829
PendingSendRequest()830 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest()
831 : pipeline_id(0),
832 response(NULL) {
833 }
834
~PendingSendRequest()835 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
836 }
837
StreamInfo()838 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
839 : state(STREAM_CREATED) {
840 }
841
~StreamInfo()842 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
843 }
844
845 } // namespace net
846