• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/http/http_pipelined_connection_impl.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/stl_util.h"
11 #include "base/values.h"
12 #include "net/base/io_buffer.h"
13 #include "net/http/http_pipelined_stream.h"
14 #include "net/http/http_request_info.h"
15 #include "net/http/http_response_body_drainer.h"
16 #include "net/http/http_response_headers.h"
17 #include "net/http/http_stream_parser.h"
18 #include "net/http/http_version.h"
19 #include "net/socket/client_socket_handle.h"
20 
21 namespace net {
22 
23 namespace {
24 
NetLogReceivedHeadersCallback(const NetLog::Source & source,const std::string * feedback,NetLog::LogLevel)25 base::Value* NetLogReceivedHeadersCallback(const NetLog::Source& source,
26                                            const std::string* feedback,
27                                            NetLog::LogLevel /* log_level */) {
28   base::DictionaryValue* dict = new base::DictionaryValue;
29   source.AddToEventParameters(dict);
30   dict->SetString("feedback", *feedback);
31   return dict;
32 }
33 
NetLogStreamClosedCallback(const NetLog::Source & source,bool not_reusable,NetLog::LogLevel)34 base::Value* NetLogStreamClosedCallback(const NetLog::Source& source,
35                                         bool not_reusable,
36                                         NetLog::LogLevel /* log_level */) {
37   base::DictionaryValue* dict = new base::DictionaryValue;
38   source.AddToEventParameters(dict);
39   dict->SetBoolean("not_reusable", not_reusable);
40   return dict;
41 }
42 
NetLogHostPortPairCallback(const HostPortPair * host_port_pair,NetLog::LogLevel)43 base::Value* NetLogHostPortPairCallback(const HostPortPair* host_port_pair,
44                                         NetLog::LogLevel /* log_level */) {
45   base::DictionaryValue* dict = new base::DictionaryValue;
46   dict->SetString("host_and_port", host_port_pair->ToString());
47   return dict;
48 }
49 
50 }  // anonymous namespace
51 
52 HttpPipelinedConnection*
CreateNewPipeline(ClientSocketHandle * connection,HttpPipelinedConnection::Delegate * delegate,const HostPortPair & origin,const SSLConfig & used_ssl_config,const ProxyInfo & used_proxy_info,const BoundNetLog & net_log,bool was_npn_negotiated,NextProto protocol_negotiated)53 HttpPipelinedConnectionImpl::Factory::CreateNewPipeline(
54     ClientSocketHandle* connection,
55     HttpPipelinedConnection::Delegate* delegate,
56     const HostPortPair& origin,
57     const SSLConfig& used_ssl_config,
58     const ProxyInfo& used_proxy_info,
59     const BoundNetLog& net_log,
60     bool was_npn_negotiated,
61     NextProto protocol_negotiated) {
62   return new HttpPipelinedConnectionImpl(connection, delegate, origin,
63                                          used_ssl_config, used_proxy_info,
64                                          net_log, was_npn_negotiated,
65                                          protocol_negotiated);
66 }
67 
HttpPipelinedConnectionImpl(ClientSocketHandle * connection,HttpPipelinedConnection::Delegate * delegate,const HostPortPair & origin,const SSLConfig & used_ssl_config,const ProxyInfo & used_proxy_info,const BoundNetLog & net_log,bool was_npn_negotiated,NextProto protocol_negotiated)68 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
69     ClientSocketHandle* connection,
70     HttpPipelinedConnection::Delegate* delegate,
71     const HostPortPair& origin,
72     const SSLConfig& used_ssl_config,
73     const ProxyInfo& used_proxy_info,
74     const BoundNetLog& net_log,
75     bool was_npn_negotiated,
76     NextProto protocol_negotiated)
77     : delegate_(delegate),
78       connection_(connection),
79       used_ssl_config_(used_ssl_config),
80       used_proxy_info_(used_proxy_info),
81       net_log_(BoundNetLog::Make(net_log.net_log(),
82                                  NetLog::SOURCE_HTTP_PIPELINED_CONNECTION)),
83       was_npn_negotiated_(was_npn_negotiated),
84       protocol_negotiated_(protocol_negotiated),
85       read_buf_(new GrowableIOBuffer()),
86       next_pipeline_id_(1),
87       active_(false),
88       usable_(true),
89       completed_one_request_(false),
90       weak_factory_(this),
91       send_next_state_(SEND_STATE_NONE),
92       send_still_on_call_stack_(false),
93       read_next_state_(READ_STATE_NONE),
94       active_read_id_(0),
95       read_still_on_call_stack_(false) {
96   CHECK(connection_.get());
97   net_log_.BeginEvent(
98       NetLog::TYPE_HTTP_PIPELINED_CONNECTION,
99       base::Bind(&NetLogHostPortPairCallback, &origin));
100 }
101 
~HttpPipelinedConnectionImpl()102 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
103   CHECK_EQ(depth(), 0);
104   CHECK(stream_info_map_.empty());
105   CHECK(pending_send_request_queue_.empty());
106   CHECK(request_order_.empty());
107   CHECK_EQ(send_next_state_, SEND_STATE_NONE);
108   CHECK_EQ(read_next_state_, READ_STATE_NONE);
109   CHECK(!active_send_request_.get());
110   CHECK(!active_read_id_);
111   if (!usable_) {
112     connection_->socket()->Disconnect();
113   }
114   connection_->Reset();
115   net_log_.EndEvent(NetLog::TYPE_HTTP_PIPELINED_CONNECTION);
116 }
117 
CreateNewStream()118 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
119   int pipeline_id = next_pipeline_id_++;
120   CHECK(pipeline_id);
121   HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
122   stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo()));
123   return stream;
124 }
125 
InitializeParser(int pipeline_id,const HttpRequestInfo * request,const BoundNetLog & net_log)126 void HttpPipelinedConnectionImpl::InitializeParser(
127     int pipeline_id,
128     const HttpRequestInfo* request,
129     const BoundNetLog& net_log) {
130   CHECK(ContainsKey(stream_info_map_, pipeline_id));
131   CHECK(!stream_info_map_[pipeline_id].parser.get());
132   stream_info_map_[pipeline_id].state = STREAM_BOUND;
133   stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser(
134       connection_.get(), request, read_buf_.get(), net_log));
135   stream_info_map_[pipeline_id].source = net_log.source();
136 
137   // In case our first stream doesn't SendRequest() immediately, we should still
138   // allow others to use this pipeline.
139   if (pipeline_id == 1) {
140     base::MessageLoop::current()->PostTask(
141         FROM_HERE,
142         base::Bind(&HttpPipelinedConnectionImpl::ActivatePipeline,
143                    weak_factory_.GetWeakPtr()));
144   }
145 }
146 
ActivatePipeline()147 void HttpPipelinedConnectionImpl::ActivatePipeline() {
148   if (!active_) {
149     active_ = true;
150     delegate_->OnPipelineHasCapacity(this);
151   }
152 }
153 
OnStreamDeleted(int pipeline_id)154 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
155   CHECK(ContainsKey(stream_info_map_, pipeline_id));
156   Close(pipeline_id, false);
157 
158   if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
159       stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
160     CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
161     CHECK(stream_info_map_[pipeline_id].parser.get());
162     stream_info_map_[pipeline_id].parser.reset();
163   }
164   CHECK(!stream_info_map_[pipeline_id].parser.get());
165   stream_info_map_.erase(pipeline_id);
166 
167   delegate_->OnPipelineHasCapacity(this);
168 }
169 
SendRequest(int pipeline_id,const std::string & request_line,const HttpRequestHeaders & headers,HttpResponseInfo * response,const CompletionCallback & callback)170 int HttpPipelinedConnectionImpl::SendRequest(
171     int pipeline_id,
172     const std::string& request_line,
173     const HttpRequestHeaders& headers,
174     HttpResponseInfo* response,
175     const CompletionCallback& callback) {
176   CHECK(ContainsKey(stream_info_map_, pipeline_id));
177   CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
178   if (!usable_) {
179     return ERR_PIPELINE_EVICTION;
180   }
181 
182   PendingSendRequest* send_request = new PendingSendRequest;
183   send_request->pipeline_id = pipeline_id;
184   send_request->request_line = request_line;
185   send_request->headers = headers;
186   send_request->response = response;
187   send_request->callback = callback;
188   pending_send_request_queue_.push(send_request);
189 
190   int rv;
191   if (send_next_state_ == SEND_STATE_NONE) {
192     send_next_state_ = SEND_STATE_START_IMMEDIATELY;
193     rv = DoSendRequestLoop(OK);
194   } else {
195     rv = ERR_IO_PENDING;
196   }
197   ActivatePipeline();
198   return rv;
199 }
200 
DoSendRequestLoop(int result)201 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
202   int rv = result;
203   do {
204     SendRequestState state = send_next_state_;
205     send_next_state_ = SEND_STATE_NONE;
206     switch (state) {
207       case SEND_STATE_START_IMMEDIATELY:
208         rv = DoStartRequestImmediately(rv);
209         break;
210       case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
211         rv = DoStartNextDeferredRequest(rv);
212         break;
213       case SEND_STATE_SEND_ACTIVE_REQUEST:
214         rv = DoSendActiveRequest(rv);
215         break;
216       case SEND_STATE_COMPLETE:
217         rv = DoSendComplete(rv);
218         break;
219       case SEND_STATE_EVICT_PENDING_REQUESTS:
220         rv = DoEvictPendingSendRequests(rv);
221         break;
222       default:
223         CHECK(false) << "bad send state: " << state;
224         rv = ERR_FAILED;
225         break;
226     }
227   } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
228   send_still_on_call_stack_ = false;
229   return rv;
230 }
231 
OnSendIOCallback(int result)232 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
233   CHECK(active_send_request_.get());
234   DoSendRequestLoop(result);
235 }
236 
DoStartRequestImmediately(int result)237 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
238   CHECK(!active_send_request_.get());
239   CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
240   // If SendRequest() completes synchronously, then we need to return the value
241   // directly to the caller. |send_still_on_call_stack_| will track this.
242   // Otherwise, asynchronous completions will notify the caller via callback.
243   send_still_on_call_stack_ = true;
244   active_send_request_.reset(pending_send_request_queue_.front());
245   pending_send_request_queue_.pop();
246   send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
247   return OK;
248 }
249 
DoStartNextDeferredRequest(int result)250 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
251   CHECK(!send_still_on_call_stack_);
252   CHECK(!active_send_request_.get());
253 
254   while (!pending_send_request_queue_.empty()) {
255     scoped_ptr<PendingSendRequest> next_request(
256         pending_send_request_queue_.front());
257     pending_send_request_queue_.pop();
258     CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
259     if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
260       active_send_request_.reset(next_request.release());
261       send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
262       return OK;
263     }
264   }
265 
266   send_next_state_ = SEND_STATE_NONE;
267   return OK;
268 }
269 
DoSendActiveRequest(int result)270 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
271   CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
272   int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
273       SendRequest(active_send_request_->request_line,
274                   active_send_request_->headers,
275                   active_send_request_->response,
276                   base::Bind(&HttpPipelinedConnectionImpl::OnSendIOCallback,
277                              base::Unretained(this)));
278   stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
279   send_next_state_ = SEND_STATE_COMPLETE;
280   return rv;
281 }
282 
DoSendComplete(int result)283 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
284   CHECK(active_send_request_.get());
285   CHECK_EQ(STREAM_SENDING,
286            stream_info_map_[active_send_request_->pipeline_id].state);
287 
288   request_order_.push(active_send_request_->pipeline_id);
289   stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
290   net_log_.AddEvent(
291       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_SENT_REQUEST,
292       stream_info_map_[active_send_request_->pipeline_id].source.
293           ToEventParametersCallback());
294 
295   if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
296     result = ERR_PIPELINE_EVICTION;
297   }
298   if (result < OK) {
299     usable_ = false;
300   }
301 
302   if (!send_still_on_call_stack_) {
303     QueueUserCallback(active_send_request_->pipeline_id,
304                       active_send_request_->callback, result, FROM_HERE);
305   }
306 
307   active_send_request_.reset();
308 
309   if (send_still_on_call_stack_) {
310     // It should be impossible for another request to appear on the queue while
311     // this send was on the call stack.
312     CHECK(pending_send_request_queue_.empty());
313     send_next_state_ = SEND_STATE_NONE;
314   } else if (!usable_) {
315     send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
316   } else {
317     send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
318   }
319 
320   return result;
321 }
322 
DoEvictPendingSendRequests(int result)323 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
324   while (!pending_send_request_queue_.empty()) {
325     scoped_ptr<PendingSendRequest> evicted_send(
326         pending_send_request_queue_.front());
327     pending_send_request_queue_.pop();
328     if (ContainsKey(stream_info_map_, evicted_send->pipeline_id) &&
329         stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
330       evicted_send->callback.Run(ERR_PIPELINE_EVICTION);
331     }
332   }
333   send_next_state_ = SEND_STATE_NONE;
334   return result;
335 }
336 
ReadResponseHeaders(int pipeline_id,const CompletionCallback & callback)337 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
338     int pipeline_id, const CompletionCallback& callback) {
339   CHECK(ContainsKey(stream_info_map_, pipeline_id));
340   CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
341   CHECK(stream_info_map_[pipeline_id].read_headers_callback.is_null());
342 
343   if (!usable_)
344     return ERR_PIPELINE_EVICTION;
345 
346   stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
347   stream_info_map_[pipeline_id].read_headers_callback = callback;
348   if (read_next_state_ == READ_STATE_NONE &&
349       pipeline_id == request_order_.front()) {
350     read_next_state_ = READ_STATE_START_IMMEDIATELY;
351     return DoReadHeadersLoop(OK);
352   }
353   return ERR_IO_PENDING;
354 }
355 
StartNextDeferredRead()356 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
357   if (read_next_state_ == READ_STATE_NONE) {
358     read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
359     DoReadHeadersLoop(OK);
360   }
361 }
362 
DoReadHeadersLoop(int result)363 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
364   int rv = result;
365   do {
366     ReadHeadersState state = read_next_state_;
367     read_next_state_ = READ_STATE_NONE;
368     switch (state) {
369       case READ_STATE_START_IMMEDIATELY:
370         rv = DoStartReadImmediately(rv);
371         break;
372       case READ_STATE_START_NEXT_DEFERRED_READ:
373         rv = DoStartNextDeferredRead(rv);
374         break;
375       case READ_STATE_READ_HEADERS:
376         rv = DoReadHeaders(rv);
377         break;
378       case READ_STATE_READ_HEADERS_COMPLETE:
379         rv = DoReadHeadersComplete(rv);
380         break;
381       case READ_STATE_WAITING_FOR_CLOSE:
382         // This is a holding state. We return instead of continuing to run hte
383         // loop. The state will advance when the stream calls Close().
384         rv = DoReadWaitForClose(rv);
385         read_still_on_call_stack_ = false;
386         return rv;
387       case READ_STATE_STREAM_CLOSED:
388         rv = DoReadStreamClosed();
389         break;
390       case READ_STATE_EVICT_PENDING_READS:
391         rv = DoEvictPendingReadHeaders(rv);
392         break;
393       case READ_STATE_NONE:
394         break;
395       default:
396         CHECK(false) << "bad read state";
397         rv = ERR_FAILED;
398         break;
399     }
400   } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
401   read_still_on_call_stack_ = false;
402   return rv;
403 }
404 
OnReadIOCallback(int result)405 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
406   DoReadHeadersLoop(result);
407 }
408 
DoStartReadImmediately(int result)409 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
410   CHECK(!active_read_id_);
411   CHECK(!read_still_on_call_stack_);
412   CHECK(!request_order_.empty());
413   // If ReadResponseHeaders() completes synchronously, then we need to return
414   // the value directly to the caller. |read_still_on_call_stack_| will track
415   // this. Otherwise, asynchronous completions will notify the caller via
416   // callback.
417   read_still_on_call_stack_ = true;
418   read_next_state_ = READ_STATE_READ_HEADERS;
419   active_read_id_ = request_order_.front();
420   request_order_.pop();
421   return OK;
422 }
423 
DoStartNextDeferredRead(int result)424 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
425   CHECK(!active_read_id_);
426   CHECK(!read_still_on_call_stack_);
427 
428   if (request_order_.empty()) {
429     read_next_state_ = READ_STATE_NONE;
430     return OK;
431   }
432 
433   int next_id = request_order_.front();
434   CHECK(ContainsKey(stream_info_map_, next_id));
435   switch (stream_info_map_[next_id].state) {
436     case STREAM_READ_PENDING:
437       read_next_state_ = READ_STATE_READ_HEADERS;
438       active_read_id_ = next_id;
439       request_order_.pop();
440       break;
441 
442     case STREAM_CLOSED:
443       // Since nobody will read whatever data is on the pipeline associated with
444       // this closed request, we must shut down the rest of the pipeline.
445       read_next_state_ = READ_STATE_EVICT_PENDING_READS;
446       break;
447 
448     case STREAM_SENT:
449       read_next_state_ = READ_STATE_NONE;
450       break;
451 
452     default:
453       CHECK(false) << "Unexpected read state: "
454                    << stream_info_map_[next_id].state;
455   }
456 
457   return OK;
458 }
459 
DoReadHeaders(int result)460 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
461   CHECK(active_read_id_);
462   CHECK(ContainsKey(stream_info_map_, active_read_id_));
463   CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
464   stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
465   int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
466       base::Bind(&HttpPipelinedConnectionImpl::OnReadIOCallback,
467                  base::Unretained(this)));
468   read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
469   return rv;
470 }
471 
DoReadHeadersComplete(int result)472 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
473   CHECK(active_read_id_);
474   CHECK(ContainsKey(stream_info_map_, active_read_id_));
475   CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
476 
477   read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
478   if (result < OK) {
479     if (completed_one_request_ &&
480         (result == ERR_CONNECTION_CLOSED ||
481          result == ERR_EMPTY_RESPONSE ||
482          result == ERR_SOCKET_NOT_CONNECTED)) {
483       // These usually indicate that pipelining failed on the server side. In
484       // that case, we should retry without pipelining.
485       result = ERR_PIPELINE_EVICTION;
486     }
487     usable_ = false;
488   }
489 
490   CheckHeadersForPipelineCompatibility(active_read_id_, result);
491 
492   if (!read_still_on_call_stack_) {
493     QueueUserCallback(active_read_id_,
494                       stream_info_map_[active_read_id_].read_headers_callback,
495                       result, FROM_HERE);
496   }
497 
498   return result;
499 }
500 
DoReadWaitForClose(int result)501 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
502   read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
503   return result;
504 }
505 
DoReadStreamClosed()506 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
507   CHECK(active_read_id_);
508   CHECK(ContainsKey(stream_info_map_, active_read_id_));
509   CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
510   active_read_id_ = 0;
511   if (!usable_) {
512     // TODO(simonjam): Don't wait this long to evict.
513     read_next_state_ = READ_STATE_EVICT_PENDING_READS;
514     return OK;
515   }
516   completed_one_request_ = true;
517   base::MessageLoop::current()->PostTask(
518       FROM_HERE,
519       base::Bind(&HttpPipelinedConnectionImpl::StartNextDeferredRead,
520                  weak_factory_.GetWeakPtr()));
521   read_next_state_ = READ_STATE_NONE;
522   return OK;
523 }
524 
DoEvictPendingReadHeaders(int result)525 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
526   while (!request_order_.empty()) {
527     int evicted_id = request_order_.front();
528     request_order_.pop();
529     if (!ContainsKey(stream_info_map_, evicted_id)) {
530       continue;
531     }
532     if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
533       stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
534       stream_info_map_[evicted_id].read_headers_callback.Run(
535           ERR_PIPELINE_EVICTION);
536     }
537   }
538   read_next_state_ = READ_STATE_NONE;
539   return result;
540 }
541 
Close(int pipeline_id,bool not_reusable)542 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
543                                         bool not_reusable) {
544   CHECK(ContainsKey(stream_info_map_, pipeline_id));
545   net_log_.AddEvent(
546       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_STREAM_CLOSED,
547       base::Bind(&NetLogStreamClosedCallback,
548                  stream_info_map_[pipeline_id].source, not_reusable));
549   switch (stream_info_map_[pipeline_id].state) {
550     case STREAM_CREATED:
551       stream_info_map_[pipeline_id].state = STREAM_UNUSED;
552       break;
553 
554     case STREAM_BOUND:
555       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
556       break;
557 
558     case STREAM_SENDING:
559       usable_ = false;
560       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
561       active_send_request_.reset();
562       send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
563       DoSendRequestLoop(OK);
564       break;
565 
566     case STREAM_SENT:
567     case STREAM_READ_PENDING:
568       usable_ = false;
569       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
570       if (!request_order_.empty() &&
571           pipeline_id == request_order_.front() &&
572           read_next_state_ == READ_STATE_NONE) {
573         read_next_state_ = READ_STATE_EVICT_PENDING_READS;
574         DoReadHeadersLoop(OK);
575       }
576       break;
577 
578     case STREAM_ACTIVE:
579       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
580       if (not_reusable) {
581         usable_ = false;
582       }
583       read_next_state_ = READ_STATE_STREAM_CLOSED;
584       DoReadHeadersLoop(OK);
585       break;
586 
587     case STREAM_READ_EVICTED:
588       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
589       break;
590 
591     case STREAM_CLOSED:
592     case STREAM_UNUSED:
593       // TODO(simonjam): Why is Close() sometimes called twice?
594       break;
595 
596     default:
597       CHECK(false);
598       break;
599   }
600 }
601 
ReadResponseBody(int pipeline_id,IOBuffer * buf,int buf_len,const CompletionCallback & callback)602 int HttpPipelinedConnectionImpl::ReadResponseBody(
603     int pipeline_id, IOBuffer* buf, int buf_len,
604     const CompletionCallback& callback) {
605   CHECK(ContainsKey(stream_info_map_, pipeline_id));
606   CHECK_EQ(active_read_id_, pipeline_id);
607   CHECK(stream_info_map_[pipeline_id].parser.get());
608   return stream_info_map_[pipeline_id].parser->ReadResponseBody(
609       buf, buf_len, callback);
610 }
611 
GetUploadProgress(int pipeline_id) const612 UploadProgress HttpPipelinedConnectionImpl::GetUploadProgress(
613     int pipeline_id) const {
614   CHECK(ContainsKey(stream_info_map_, pipeline_id));
615   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
616   return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
617 }
618 
GetResponseInfo(int pipeline_id)619 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
620     int pipeline_id) {
621   CHECK(ContainsKey(stream_info_map_, pipeline_id));
622   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
623   return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo();
624 }
625 
IsResponseBodyComplete(int pipeline_id) const626 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
627     int pipeline_id) const {
628   CHECK(ContainsKey(stream_info_map_, pipeline_id));
629   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
630   return stream_info_map_.find(pipeline_id)->second.parser->
631       IsResponseBodyComplete();
632 }
633 
CanFindEndOfResponse(int pipeline_id) const634 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
635   CHECK(ContainsKey(stream_info_map_, pipeline_id));
636   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
637   return stream_info_map_.find(pipeline_id)->second.parser->
638       CanFindEndOfResponse();
639 }
640 
IsConnectionReused(int pipeline_id) const641 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
642   CHECK(ContainsKey(stream_info_map_, pipeline_id));
643   if (pipeline_id > 1) {
644     return true;
645   }
646   ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
647   return connection_->is_reused() ||
648          reuse_type == ClientSocketHandle::UNUSED_IDLE;
649 }
650 
SetConnectionReused(int pipeline_id)651 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
652   CHECK(ContainsKey(stream_info_map_, pipeline_id));
653   connection_->set_is_reused(true);
654 }
655 
GetTotalReceivedBytes(int pipeline_id) const656 int64 HttpPipelinedConnectionImpl::GetTotalReceivedBytes(
657     int pipeline_id) const {
658   CHECK(ContainsKey(stream_info_map_, pipeline_id));
659   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
660   return stream_info_map_.find(pipeline_id)->second.parser->received_bytes();
661 }
662 
GetLoadTimingInfo(int pipeline_id,LoadTimingInfo * load_timing_info) const663 bool HttpPipelinedConnectionImpl::GetLoadTimingInfo(
664     int pipeline_id, LoadTimingInfo* load_timing_info) const {
665   return connection_->GetLoadTimingInfo(IsConnectionReused(pipeline_id),
666                                         load_timing_info);
667 }
668 
GetSSLInfo(int pipeline_id,SSLInfo * ssl_info)669 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
670                                              SSLInfo* ssl_info) {
671   CHECK(ContainsKey(stream_info_map_, pipeline_id));
672   CHECK(stream_info_map_[pipeline_id].parser.get());
673   stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info);
674 }
675 
GetSSLCertRequestInfo(int pipeline_id,SSLCertRequestInfo * cert_request_info)676 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
677     int pipeline_id,
678     SSLCertRequestInfo* cert_request_info) {
679   CHECK(ContainsKey(stream_info_map_, pipeline_id));
680   CHECK(stream_info_map_[pipeline_id].parser.get());
681   stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
682       cert_request_info);
683 }
684 
Drain(HttpPipelinedStream * stream,HttpNetworkSession * session)685 void HttpPipelinedConnectionImpl::Drain(HttpPipelinedStream* stream,
686                                         HttpNetworkSession* session) {
687   HttpResponseHeaders* headers = stream->GetResponseInfo()->headers.get();
688   if (!stream->CanFindEndOfResponse() || headers->IsChunkEncoded() ||
689       !usable_) {
690     // TODO(simonjam): Drain chunk-encoded responses if they're relatively
691     // common.
692     stream->Close(true);
693     delete stream;
694     return;
695   }
696   HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(stream);
697   drainer->StartWithSize(session, headers->GetContentLength());
698   // |drainer| will delete itself when done.
699 }
700 
CheckHeadersForPipelineCompatibility(int pipeline_id,int result)701 void HttpPipelinedConnectionImpl::CheckHeadersForPipelineCompatibility(
702     int pipeline_id,
703     int result) {
704   if (result < OK) {
705     switch (result) {
706       // TODO(simonjam): Ignoring specific errors like this may not work.
707       // Collect metrics to see if this code is useful.
708       case ERR_ABORTED:
709       case ERR_INTERNET_DISCONNECTED:
710       case ERR_NETWORK_CHANGED:
711         // These errors are no fault of the server.
712         break;
713 
714       default:
715         ReportPipelineFeedback(pipeline_id, PIPELINE_SOCKET_ERROR);
716         break;
717     }
718     return;
719   }
720   HttpResponseInfo* info = GetResponseInfo(pipeline_id);
721   const HttpVersion required_version(1, 1);
722   if (info->headers->GetParsedHttpVersion() < required_version) {
723     ReportPipelineFeedback(pipeline_id, OLD_HTTP_VERSION);
724     return;
725   }
726   if (!info->headers->IsKeepAlive() || !CanFindEndOfResponse(pipeline_id)) {
727     usable_ = false;
728     ReportPipelineFeedback(pipeline_id, MUST_CLOSE_CONNECTION);
729     return;
730   }
731   if (info->headers->HasHeader(
732       HttpAuth::GetChallengeHeaderName(HttpAuth::AUTH_SERVER))) {
733     ReportPipelineFeedback(pipeline_id, AUTHENTICATION_REQUIRED);
734     return;
735   }
736   ReportPipelineFeedback(pipeline_id, OK);
737 }
738 
ReportPipelineFeedback(int pipeline_id,Feedback feedback)739 void HttpPipelinedConnectionImpl::ReportPipelineFeedback(int pipeline_id,
740                                                          Feedback feedback) {
741   std::string feedback_str;
742   switch (feedback) {
743     case OK:
744       feedback_str = "OK";
745       break;
746 
747     case PIPELINE_SOCKET_ERROR:
748       feedback_str = "PIPELINE_SOCKET_ERROR";
749       break;
750 
751     case OLD_HTTP_VERSION:
752       feedback_str = "OLD_HTTP_VERSION";
753       break;
754 
755     case MUST_CLOSE_CONNECTION:
756       feedback_str = "MUST_CLOSE_CONNECTION";
757       break;
758 
759     case AUTHENTICATION_REQUIRED:
760       feedback_str = "AUTHENTICATION_REQUIRED";
761       break;
762 
763     default:
764       NOTREACHED();
765       feedback_str = "UNKNOWN";
766       break;
767   }
768   net_log_.AddEvent(
769       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_RECEIVED_HEADERS,
770       base::Bind(&NetLogReceivedHeadersCallback,
771                  stream_info_map_[pipeline_id].source, &feedback_str));
772   delegate_->OnPipelineFeedback(this, feedback);
773 }
774 
QueueUserCallback(int pipeline_id,const CompletionCallback & callback,int rv,const tracked_objects::Location & from_here)775 void HttpPipelinedConnectionImpl::QueueUserCallback(
776     int pipeline_id, const CompletionCallback& callback, int rv,
777     const tracked_objects::Location& from_here) {
778   CHECK(stream_info_map_[pipeline_id].pending_user_callback.is_null());
779   stream_info_map_[pipeline_id].pending_user_callback = callback;
780   base::MessageLoop::current()->PostTask(
781       from_here,
782       base::Bind(&HttpPipelinedConnectionImpl::FireUserCallback,
783                  weak_factory_.GetWeakPtr(), pipeline_id, rv));
784 }
785 
FireUserCallback(int pipeline_id,int result)786 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
787                                                    int result) {
788   if (ContainsKey(stream_info_map_, pipeline_id)) {
789     CHECK(!stream_info_map_[pipeline_id].pending_user_callback.is_null());
790     CompletionCallback callback =
791         stream_info_map_[pipeline_id].pending_user_callback;
792     stream_info_map_[pipeline_id].pending_user_callback.Reset();
793     callback.Run(result);
794   }
795 }
796 
depth() const797 int HttpPipelinedConnectionImpl::depth() const {
798   return stream_info_map_.size();
799 }
800 
usable() const801 bool HttpPipelinedConnectionImpl::usable() const {
802   return usable_;
803 }
804 
active() const805 bool HttpPipelinedConnectionImpl::active() const {
806   return active_;
807 }
808 
used_ssl_config() const809 const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const {
810   return used_ssl_config_;
811 }
812 
used_proxy_info() const813 const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const {
814   return used_proxy_info_;
815 }
816 
net_log() const817 const BoundNetLog& HttpPipelinedConnectionImpl::net_log() const {
818   return net_log_;
819 }
820 
was_npn_negotiated() const821 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
822   return was_npn_negotiated_;
823 }
824 
protocol_negotiated() const825 NextProto HttpPipelinedConnectionImpl::protocol_negotiated()
826     const {
827   return protocol_negotiated_;
828 }
829 
PendingSendRequest()830 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest()
831     : pipeline_id(0),
832       response(NULL) {
833 }
834 
~PendingSendRequest()835 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
836 }
837 
StreamInfo()838 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
839     : state(STREAM_CREATED) {
840 }
841 
~StreamInfo()842 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
843 }
844 
845 }  // namespace net
846