• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_session.h"
6 
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/memory/linked_ptr.h"
10 #include "base/message_loop.h"
11 #include "base/metrics/field_trial.h"
12 #include "base/metrics/stats_counters.h"
13 #include "base/stl_util-inl.h"
14 #include "base/string_number_conversions.h"
15 #include "base/string_util.h"
16 #include "base/stringprintf.h"
17 #include "base/time.h"
18 #include "base/utf_string_conversions.h"
19 #include "base/values.h"
20 #include "net/base/connection_type_histograms.h"
21 #include "net/base/net_log.h"
22 #include "net/base/net_util.h"
23 #include "net/http/http_network_session.h"
24 #include "net/socket/ssl_client_socket.h"
25 #include "net/spdy/spdy_frame_builder.h"
26 #include "net/spdy/spdy_http_utils.h"
27 #include "net/spdy/spdy_protocol.h"
28 #include "net/spdy/spdy_session_pool.h"
29 #include "net/spdy/spdy_settings_storage.h"
30 #include "net/spdy/spdy_stream.h"
31 
32 namespace net {
33 
NetLogSpdySynParameter(const linked_ptr<spdy::SpdyHeaderBlock> & headers,spdy::SpdyControlFlags flags,spdy::SpdyStreamId id,spdy::SpdyStreamId associated_stream)34 NetLogSpdySynParameter::NetLogSpdySynParameter(
35     const linked_ptr<spdy::SpdyHeaderBlock>& headers,
36     spdy::SpdyControlFlags flags,
37     spdy::SpdyStreamId id,
38     spdy::SpdyStreamId associated_stream)
39     : headers_(headers),
40       flags_(flags),
41       id_(id),
42       associated_stream_(associated_stream) {
43 }
44 
~NetLogSpdySynParameter()45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
46 }
47 
ToValue() const48 Value* NetLogSpdySynParameter::ToValue() const {
49   DictionaryValue* dict = new DictionaryValue();
50   ListValue* headers_list = new ListValue();
51   for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
52       it != headers_->end(); ++it) {
53     headers_list->Append(new StringValue(base::StringPrintf(
54         "%s: %s", it->first.c_str(), it->second.c_str())));
55   }
56   dict->SetInteger("flags", flags_);
57   dict->Set("headers", headers_list);
58   dict->SetInteger("id", id_);
59   if (associated_stream_)
60     dict->SetInteger("associated_stream", associated_stream_);
61   return dict;
62 }
63 
64 namespace {
65 
66 const int kReadBufferSize = 8 * 1024;
67 
68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
69  public:
NetLogSpdySessionParameter(const HostPortProxyPair & host_pair)70   NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
71       : host_pair_(host_pair) {}
ToValue() const72   virtual Value* ToValue() const {
73     DictionaryValue* dict = new DictionaryValue();
74     dict->Set("host", new StringValue(host_pair_.first.ToString()));
75     dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
76     return dict;
77   }
78  private:
79   const HostPortProxyPair host_pair_;
80   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
81 };
82 
83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
84  public:
NetLogSpdySettingsParameter(const spdy::SpdySettings & settings)85   explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
86       : settings_(settings) {}
87 
ToValue() const88   virtual Value* ToValue() const {
89     DictionaryValue* dict = new DictionaryValue();
90     ListValue* settings = new ListValue();
91     for (spdy::SpdySettings::const_iterator it = settings_.begin();
92          it != settings_.end(); ++it) {
93       settings->Append(new StringValue(
94           base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
95     }
96     dict->Set("settings", settings);
97     return dict;
98   }
99 
100  private:
~NetLogSpdySettingsParameter()101   ~NetLogSpdySettingsParameter() {}
102   const spdy::SpdySettings settings_;
103 
104   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
105 };
106 
107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
108  public:
NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)109   NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
110                                   int delta,
111                                   int window_size)
112       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
113 
ToValue() const114   virtual Value* ToValue() const {
115     DictionaryValue* dict = new DictionaryValue();
116     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
117     dict->SetInteger("delta", delta_);
118     dict->SetInteger("window_size", window_size_);
119     return dict;
120   }
121 
122  private:
~NetLogSpdyWindowUpdateParameter()123   ~NetLogSpdyWindowUpdateParameter() {}
124   const spdy::SpdyStreamId stream_id_;
125   const int delta_;
126   const int window_size_;
127 
128   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
129 };
130 
131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
132  public:
NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,int size,spdy::SpdyDataFlags flags)133   NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
134                           int size,
135                           spdy::SpdyDataFlags flags)
136       : stream_id_(stream_id), size_(size), flags_(flags) {}
137 
ToValue() const138   virtual Value* ToValue() const {
139     DictionaryValue* dict = new DictionaryValue();
140     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
141     dict->SetInteger("size", size_);
142     dict->SetInteger("flags", static_cast<int>(flags_));
143     return dict;
144   }
145 
146  private:
~NetLogSpdyDataParameter()147   ~NetLogSpdyDataParameter() {}
148   const spdy::SpdyStreamId stream_id_;
149   const int size_;
150   const spdy::SpdyDataFlags flags_;
151 
152   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
153 };
154 
155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
156  public:
NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id,int status)157   NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
158       : stream_id_(stream_id), status_(status) {}
159 
ToValue() const160   virtual Value* ToValue() const {
161     DictionaryValue* dict = new DictionaryValue();
162     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
163     dict->SetInteger("status", status_);
164     return dict;
165   }
166 
167  private:
~NetLogSpdyRstParameter()168   ~NetLogSpdyRstParameter() {}
169   const spdy::SpdyStreamId stream_id_;
170   const int status_;
171 
172   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
173 };
174 
175 class NetLogSpdyPingParameter : public NetLog::EventParameters {
176  public:
NetLogSpdyPingParameter(uint32 unique_id)177   explicit NetLogSpdyPingParameter(uint32 unique_id) : unique_id_(unique_id) {}
178 
ToValue() const179   virtual Value* ToValue() const {
180     DictionaryValue* dict = new DictionaryValue();
181     dict->SetInteger("unique_id", unique_id_);
182     return dict;
183   }
184 
185  private:
~NetLogSpdyPingParameter()186   ~NetLogSpdyPingParameter() {}
187   const uint32 unique_id_;
188 
189   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyPingParameter);
190 };
191 
192 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
193  public:
NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,int active_streams,int unclaimed_streams)194   NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
195                             int active_streams,
196                             int unclaimed_streams)
197       : last_stream_id_(last_stream_id),
198         active_streams_(active_streams),
199         unclaimed_streams_(unclaimed_streams) {}
200 
ToValue() const201   virtual Value* ToValue() const {
202     DictionaryValue* dict = new DictionaryValue();
203     dict->SetInteger("last_accepted_stream_id",
204                      static_cast<int>(last_stream_id_));
205     dict->SetInteger("active_streams", active_streams_);
206     dict->SetInteger("unclaimed_streams", unclaimed_streams_);
207     return dict;
208   }
209 
210  private:
~NetLogSpdyGoAwayParameter()211   ~NetLogSpdyGoAwayParameter() {}
212   const spdy::SpdyStreamId last_stream_id_;
213   const int active_streams_;
214   const int unclaimed_streams_;
215 
216   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
217 };
218 
219 }  // namespace
220 
221 // static
222 bool SpdySession::use_ssl_ = true;
223 
224 // static
225 bool SpdySession::use_flow_control_ = false;
226 
227 // static
228 size_t SpdySession::max_concurrent_stream_limit_ = 256;
229 
230 // static
231 bool SpdySession::enable_ping_based_connection_checking_ = true;
232 
233 // static
234 int SpdySession::connection_at_risk_of_loss_ms_ = 0;
235 
236 // static
237 int SpdySession::trailing_ping_delay_time_ms_ = 1000;
238 
239 // static
240 int SpdySession::hung_interval_ms_ = 10000;
241 
SpdySession(const HostPortProxyPair & host_port_proxy_pair,SpdySessionPool * spdy_session_pool,SpdySettingsStorage * spdy_settings,NetLog * net_log)242 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
243                          SpdySessionPool* spdy_session_pool,
244                          SpdySettingsStorage* spdy_settings,
245                          NetLog* net_log)
246     : ALLOW_THIS_IN_INITIALIZER_LIST(
247           read_callback_(this, &SpdySession::OnReadComplete)),
248       ALLOW_THIS_IN_INITIALIZER_LIST(
249           write_callback_(this, &SpdySession::OnWriteComplete)),
250       ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
251       host_port_proxy_pair_(host_port_proxy_pair),
252       spdy_session_pool_(spdy_session_pool),
253       spdy_settings_(spdy_settings),
254       connection_(new ClientSocketHandle),
255       read_buffer_(new IOBuffer(kReadBufferSize)),
256       read_pending_(false),
257       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
258       write_pending_(false),
259       delayed_write_pending_(false),
260       is_secure_(false),
261       certificate_error_code_(OK),
262       error_(OK),
263       state_(IDLE),
264       max_concurrent_streams_(kDefaultMaxConcurrentStreams),
265       streams_initiated_count_(0),
266       streams_pushed_count_(0),
267       streams_pushed_and_claimed_count_(0),
268       streams_abandoned_count_(0),
269       frames_received_(0),
270       bytes_received_(0),
271       sent_settings_(false),
272       received_settings_(false),
273       stalled_streams_(0),
274       pings_in_flight_(0),
275       next_ping_id_(1),
276       received_data_time_(base::TimeTicks::Now()),
277       trailing_ping_pending_(false),
278       check_ping_status_pending_(false),
279       need_to_send_ping_(false),
280       initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
281       initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
282       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
283   DCHECK(HttpStreamFactory::spdy_enabled());
284   net_log_.BeginEvent(
285       NetLog::TYPE_SPDY_SESSION,
286       make_scoped_refptr(
287           new NetLogSpdySessionParameter(host_port_proxy_pair_)));
288 
289   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
290 
291   spdy_framer_.set_visitor(this);
292 
293   SendSettings();
294 }
295 
~SpdySession()296 SpdySession::~SpdySession() {
297   if (state_ != CLOSED) {
298     state_ = CLOSED;
299 
300     // Cleanup all the streams.
301     CloseAllStreams(net::ERR_ABORTED);
302   }
303 
304   if (connection_->is_initialized()) {
305     // With Spdy we can't recycle sockets.
306     connection_->socket()->Disconnect();
307   }
308 
309   // Streams should all be gone now.
310   DCHECK_EQ(0u, num_active_streams());
311   DCHECK_EQ(0u, num_unclaimed_pushed_streams());
312 
313   DCHECK(pending_callback_map_.empty());
314 
315   RecordHistograms();
316 
317   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
318 }
319 
InitializeWithSocket(ClientSocketHandle * connection,bool is_secure,int certificate_error_code)320 net::Error SpdySession::InitializeWithSocket(
321     ClientSocketHandle* connection,
322     bool is_secure,
323     int certificate_error_code) {
324   base::StatsCounter spdy_sessions("spdy.sessions");
325   spdy_sessions.Increment();
326 
327   state_ = CONNECTED;
328   connection_.reset(connection);
329   is_secure_ = is_secure;
330   certificate_error_code_ = certificate_error_code;
331 
332   // Write out any data that we might have to send, such as the settings frame.
333   WriteSocketLater();
334   net::Error error = ReadSocket();
335   if (error == ERR_IO_PENDING)
336     return OK;
337   return error;
338 }
339 
VerifyDomainAuthentication(const std::string & domain)340 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
341   if (state_ != CONNECTED)
342     return false;
343 
344   SSLInfo ssl_info;
345   bool was_npn_negotiated;
346   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
347     return true;   // This is not a secure session, so all domains are okay.
348 
349   return ssl_info.cert->VerifyNameMatch(domain);
350 }
351 
GetPushStream(const GURL & url,scoped_refptr<SpdyStream> * stream,const BoundNetLog & stream_net_log)352 int SpdySession::GetPushStream(
353     const GURL& url,
354     scoped_refptr<SpdyStream>* stream,
355     const BoundNetLog& stream_net_log) {
356   CHECK_NE(state_, CLOSED);
357 
358   *stream = NULL;
359 
360   // Don't allow access to secure push streams over an unauthenticated, but
361   // encrypted SSL socket.
362   if (is_secure_ && certificate_error_code_ != OK &&
363       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
364     LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
365                << "unauthenticated session.";
366     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
367     return ERR_SPDY_PROTOCOL_ERROR;
368   }
369 
370   *stream = GetActivePushStream(url.spec());
371   if (stream->get()) {
372     DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
373     streams_pushed_and_claimed_count_++;
374     return OK;
375   }
376   return 0;
377 }
378 
CreateStream(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log,CompletionCallback * callback)379 int SpdySession::CreateStream(
380     const GURL& url,
381     RequestPriority priority,
382     scoped_refptr<SpdyStream>* spdy_stream,
383     const BoundNetLog& stream_net_log,
384     CompletionCallback* callback) {
385   if (!max_concurrent_streams_ ||
386       active_streams_.size() < max_concurrent_streams_) {
387     return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
388   }
389 
390   stalled_streams_++;
391   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
392   create_stream_queues_[priority].push(
393       PendingCreateStream(url, priority, spdy_stream,
394                           stream_net_log, callback));
395   return ERR_IO_PENDING;
396 }
397 
ProcessPendingCreateStreams()398 void SpdySession::ProcessPendingCreateStreams() {
399   while (!max_concurrent_streams_ ||
400          active_streams_.size() < max_concurrent_streams_) {
401     bool no_pending_create_streams = true;
402     for (int i = 0;i < NUM_PRIORITIES;++i) {
403       if (!create_stream_queues_[i].empty()) {
404         PendingCreateStream pending_create = create_stream_queues_[i].front();
405         create_stream_queues_[i].pop();
406         no_pending_create_streams = false;
407         int error = CreateStreamImpl(*pending_create.url,
408                                      pending_create.priority,
409                                      pending_create.spdy_stream,
410                                      *pending_create.stream_net_log);
411         scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
412         DCHECK(!ContainsKey(pending_callback_map_, stream));
413         pending_callback_map_[stream] =
414             CallbackResultPair(pending_create.callback, error);
415         MessageLoop::current()->PostTask(
416             FROM_HERE,
417             method_factory_.NewRunnableMethod(
418                 &SpdySession::InvokeUserStreamCreationCallback, stream));
419         break;
420       }
421     }
422     if (no_pending_create_streams)
423       return;  // there were no streams in any queue
424   }
425 }
426 
CancelPendingCreateStreams(const scoped_refptr<SpdyStream> * spdy_stream)427 void SpdySession::CancelPendingCreateStreams(
428     const scoped_refptr<SpdyStream>* spdy_stream) {
429   PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
430   if (it != pending_callback_map_.end()) {
431     pending_callback_map_.erase(it);
432     return;
433   }
434 
435   for (int i = 0;i < NUM_PRIORITIES;++i) {
436     PendingCreateStreamQueue tmp;
437     // Make a copy removing this trans
438     while (!create_stream_queues_[i].empty()) {
439       PendingCreateStream pending_create = create_stream_queues_[i].front();
440       create_stream_queues_[i].pop();
441       if (pending_create.spdy_stream != spdy_stream)
442         tmp.push(pending_create);
443     }
444     // Now copy it back
445     while (!tmp.empty()) {
446       create_stream_queues_[i].push(tmp.front());
447       tmp.pop();
448     }
449   }
450 }
451 
CreateStreamImpl(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log)452 int SpdySession::CreateStreamImpl(
453     const GURL& url,
454     RequestPriority priority,
455     scoped_refptr<SpdyStream>* spdy_stream,
456     const BoundNetLog& stream_net_log) {
457   // Make sure that we don't try to send https/wss over an unauthenticated, but
458   // encrypted SSL socket.
459   if (is_secure_ && certificate_error_code_ != OK &&
460       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
461     LOG(ERROR) << "Tried to create spdy stream for secure content over an "
462                << "unauthenticated session.";
463     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
464     return ERR_SPDY_PROTOCOL_ERROR;
465   }
466 
467   const std::string& path = url.PathForRequest();
468 
469   const spdy::SpdyStreamId stream_id = GetNewStreamId();
470 
471   *spdy_stream = new SpdyStream(this,
472                                 stream_id,
473                                 false,
474                                 stream_net_log);
475   const scoped_refptr<SpdyStream>& stream = *spdy_stream;
476 
477   stream->set_priority(priority);
478   stream->set_path(path);
479   stream->set_send_window_size(initial_send_window_size_);
480   stream->set_recv_window_size(initial_recv_window_size_);
481   ActivateStream(stream);
482 
483   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
484       static_cast<int>(priority), 0, 10, 11);
485 
486   // TODO(mbelshe): Optimize memory allocations
487   DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
488 
489   DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
490   return OK;
491 }
492 
WriteSynStream(spdy::SpdyStreamId stream_id,RequestPriority priority,spdy::SpdyControlFlags flags,const linked_ptr<spdy::SpdyHeaderBlock> & headers)493 int SpdySession::WriteSynStream(
494     spdy::SpdyStreamId stream_id,
495     RequestPriority priority,
496     spdy::SpdyControlFlags flags,
497     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
498   // Find our stream
499   if (!IsStreamActive(stream_id))
500     return ERR_INVALID_SPDY_STREAM;
501   const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
502   CHECK_EQ(stream->stream_id(), stream_id);
503 
504   SendPrefacePingIfNoneInFlight();
505 
506   scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
507       spdy_framer_.CreateSynStream(
508           stream_id, 0,
509           ConvertRequestPriorityToSpdyPriority(priority),
510           flags, false, headers.get()));
511   QueueFrame(syn_frame.get(), priority, stream);
512 
513   base::StatsCounter spdy_requests("spdy.requests");
514   spdy_requests.Increment();
515   streams_initiated_count_++;
516 
517   if (net_log().IsLoggingAllEvents()) {
518     net_log().AddEvent(
519         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
520         make_scoped_refptr(
521             new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
522   }
523 
524   // Some servers don't like too many pings, so we limit our current sending to
525   // no more than one ping for any syn sent.  To do this, we avoid ever setting
526   // this to true unless we send a syn (which we have just done).  This approach
527   // may change over time as servers change their responses to pings.
528   need_to_send_ping_ = true;
529 
530   return ERR_IO_PENDING;
531 }
532 
WriteStreamData(spdy::SpdyStreamId stream_id,net::IOBuffer * data,int len,spdy::SpdyDataFlags flags)533 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
534                                  net::IOBuffer* data, int len,
535                                  spdy::SpdyDataFlags flags) {
536   // Find our stream
537   DCHECK(IsStreamActive(stream_id));
538   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
539   CHECK_EQ(stream->stream_id(), stream_id);
540   if (!stream)
541     return ERR_INVALID_SPDY_STREAM;
542 
543   SendPrefacePingIfNoneInFlight();
544 
545   if (len > kMaxSpdyFrameChunkSize) {
546     len = kMaxSpdyFrameChunkSize;
547     flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
548   }
549 
550   // Obey send window size of the stream if flow control is enabled.
551   if (use_flow_control_) {
552     if (stream->send_window_size() <= 0) {
553       // Because we queue frames onto the session, it is possible that
554       // a stream was not flow controlled at the time it attempted the
555       // write, but when we go to fulfill the write, it is now flow
556       // controlled.  This is why we need the session to mark the stream
557       // as stalled - because only the session knows for sure when the
558       // stall occurs.
559       stream->set_stalled_by_flow_control(true);
560       net_log().AddEvent(
561           NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
562           make_scoped_refptr(
563               new NetLogIntegerParameter("stream_id", stream_id)));
564       return ERR_IO_PENDING;
565     }
566     int new_len = std::min(len, stream->send_window_size());
567     if (new_len < len) {
568       len = new_len;
569       flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
570     }
571     stream->DecreaseSendWindowSize(len);
572   }
573 
574   if (net_log().IsLoggingAllEvents()) {
575     net_log().AddEvent(
576         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
577         make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
578   }
579 
580   // TODO(mbelshe): reduce memory copies here.
581   scoped_ptr<spdy::SpdyDataFrame> frame(
582       spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
583   QueueFrame(frame.get(), stream->priority(), stream);
584   return ERR_IO_PENDING;
585 }
586 
CloseStream(spdy::SpdyStreamId stream_id,int status)587 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
588   // TODO(mbelshe): We should send a RST_STREAM control frame here
589   //                so that the server can cancel a large send.
590 
591   DeleteStream(stream_id, status);
592 }
593 
ResetStream(spdy::SpdyStreamId stream_id,spdy::SpdyStatusCodes status)594 void SpdySession::ResetStream(
595     spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
596 
597   net_log().AddEvent(
598       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
599       make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
600 
601   scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
602       spdy_framer_.CreateRstStream(stream_id, status));
603 
604   // Default to lowest priority unless we know otherwise.
605   int priority = 3;
606   if(IsStreamActive(stream_id)) {
607     scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
608     priority = stream->priority();
609   }
610   QueueFrame(rst_frame.get(), priority, NULL);
611   DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
612 }
613 
IsStreamActive(spdy::SpdyStreamId stream_id) const614 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
615   return ContainsKey(active_streams_, stream_id);
616 }
617 
GetLoadState() const618 LoadState SpdySession::GetLoadState() const {
619   // NOTE: The application only queries the LoadState via the
620   //       SpdyNetworkTransaction, and details are only needed when
621   //       we're in the process of connecting.
622 
623   // If we're connecting, defer to the connection to give us the actual
624   // LoadState.
625   if (state_ == CONNECTING)
626     return connection_->GetLoadState();
627 
628   // Just report that we're idle since the session could be doing
629   // many things concurrently.
630   return LOAD_STATE_IDLE;
631 }
632 
OnReadComplete(int bytes_read)633 void SpdySession::OnReadComplete(int bytes_read) {
634   // Parse a frame.  For now this code requires that the frame fit into our
635   // buffer (32KB).
636   // TODO(mbelshe): support arbitrarily large frames!
637 
638   read_pending_ = false;
639 
640   if (bytes_read <= 0) {
641     // Session is tearing down.
642     net::Error error = static_cast<net::Error>(bytes_read);
643     if (bytes_read == 0)
644       error = ERR_CONNECTION_CLOSED;
645     CloseSessionOnError(error, true);
646     return;
647   }
648 
649   bytes_received_ += bytes_read;
650 
651   received_data_time_ = base::TimeTicks::Now();
652 
653   // The SpdyFramer will use callbacks onto |this| as it parses frames.
654   // When errors occur, those callbacks can lead to teardown of all references
655   // to |this|, so maintain a reference to self during this call for safe
656   // cleanup.
657   scoped_refptr<SpdySession> self(this);
658 
659   char *data = read_buffer_->data();
660   while (bytes_read &&
661          spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
662     uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
663     bytes_read -= bytes_processed;
664     data += bytes_processed;
665     if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
666       spdy_framer_.Reset();
667   }
668 
669   if (state_ != CLOSED)
670     ReadSocket();
671 }
672 
OnWriteComplete(int result)673 void SpdySession::OnWriteComplete(int result) {
674   DCHECK(write_pending_);
675   DCHECK(in_flight_write_.size());
676 
677   write_pending_ = false;
678 
679   scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
680 
681   if (result >= 0) {
682     // It should not be possible to have written more bytes than our
683     // in_flight_write_.
684     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
685 
686     in_flight_write_.buffer()->DidConsume(result);
687 
688     // We only notify the stream when we've fully written the pending frame.
689     if (!in_flight_write_.buffer()->BytesRemaining()) {
690       if (stream) {
691         // Report the number of bytes written to the caller, but exclude the
692         // frame size overhead.  NOTE: if this frame was compressed the
693         // reported bytes written is the compressed size, not the original
694         // size.
695         if (result > 0) {
696           result = in_flight_write_.buffer()->size();
697           DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
698           result -= static_cast<int>(spdy::SpdyFrame::size());
699         }
700 
701         // It is possible that the stream was cancelled while we were writing
702         // to the socket.
703         if (!stream->cancelled())
704           stream->OnWriteComplete(result);
705       }
706 
707       // Cleanup the write which just completed.
708       in_flight_write_.release();
709     }
710 
711     // Write more data.  We're already in a continuation, so we can
712     // go ahead and write it immediately (without going back to the
713     // message loop).
714     WriteSocketLater();
715   } else {
716     in_flight_write_.release();
717 
718     // The stream is now errored.  Close it down.
719     CloseSessionOnError(static_cast<net::Error>(result), true);
720   }
721 }
722 
ReadSocket()723 net::Error SpdySession::ReadSocket() {
724   if (read_pending_)
725     return OK;
726 
727   if (state_ == CLOSED) {
728     NOTREACHED();
729     return ERR_UNEXPECTED;
730   }
731 
732   CHECK(connection_.get());
733   CHECK(connection_->socket());
734   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
735                                                kReadBufferSize,
736                                                &read_callback_);
737   switch (bytes_read) {
738     case 0:
739       // Socket is closed!
740       CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
741       return ERR_CONNECTION_CLOSED;
742     case net::ERR_IO_PENDING:
743       // Waiting for data.  Nothing to do now.
744       read_pending_ = true;
745       return ERR_IO_PENDING;
746     default:
747       // Data was read, process it.
748       // Schedule the work through the message loop to avoid recursive
749       // callbacks.
750       read_pending_ = true;
751       MessageLoop::current()->PostTask(
752           FROM_HERE,
753           method_factory_.NewRunnableMethod(
754               &SpdySession::OnReadComplete, bytes_read));
755       break;
756   }
757   return OK;
758 }
759 
WriteSocketLater()760 void SpdySession::WriteSocketLater() {
761   if (delayed_write_pending_)
762     return;
763 
764   if (state_ < CONNECTED)
765     return;
766 
767   delayed_write_pending_ = true;
768   MessageLoop::current()->PostTask(
769       FROM_HERE,
770       method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
771 }
772 
WriteSocket()773 void SpdySession::WriteSocket() {
774   // This function should only be called via WriteSocketLater.
775   DCHECK(delayed_write_pending_);
776   delayed_write_pending_ = false;
777 
778   // If the socket isn't connected yet, just wait; we'll get called
779   // again when the socket connection completes.  If the socket is
780   // closed, just return.
781   if (state_ < CONNECTED || state_ == CLOSED)
782     return;
783 
784   if (write_pending_)   // Another write is in progress still.
785     return;
786 
787   // Loop sending frames until we've sent everything or until the write
788   // returns error (or ERR_IO_PENDING).
789   while (in_flight_write_.buffer() || !queue_.empty()) {
790     if (!in_flight_write_.buffer()) {
791       // Grab the next SpdyFrame to send.
792       SpdyIOBuffer next_buffer = queue_.top();
793       queue_.pop();
794 
795       // We've deferred compression until just before we write it to the socket,
796       // which is now.  At this time, we don't compress our data frames.
797       spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
798       size_t size;
799       if (spdy_framer_.IsCompressible(uncompressed_frame)) {
800         scoped_ptr<spdy::SpdyFrame> compressed_frame(
801             spdy_framer_.CompressFrame(uncompressed_frame));
802         if (!compressed_frame.get()) {
803           LOG(ERROR) << "SPDY Compression failure";
804           CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
805           return;
806         }
807 
808         size = compressed_frame->length() + spdy::SpdyFrame::size();
809 
810         DCHECK_GT(size, 0u);
811 
812         // TODO(mbelshe): We have too much copying of data here.
813         IOBufferWithSize* buffer = new IOBufferWithSize(size);
814         memcpy(buffer->data(), compressed_frame->data(), size);
815 
816         // Attempt to send the frame.
817         in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
818       } else {
819         size = uncompressed_frame.length() + spdy::SpdyFrame::size();
820         in_flight_write_ = next_buffer;
821       }
822     } else {
823       DCHECK(in_flight_write_.buffer()->BytesRemaining());
824     }
825 
826     write_pending_ = true;
827     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
828         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
829     if (rv == net::ERR_IO_PENDING)
830       break;
831 
832     // We sent the frame successfully.
833     OnWriteComplete(rv);
834 
835     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
836     //                 as in an error state.
837     if (rv < 0)
838       break;
839   }
840 }
841 
CloseAllStreams(net::Error status)842 void SpdySession::CloseAllStreams(net::Error status) {
843   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
844   base::StatsCounter abandoned_push_streams(
845       "spdy.abandoned_push_streams");
846 
847   if (!active_streams_.empty())
848     abandoned_streams.Add(active_streams_.size());
849   if (!unclaimed_pushed_streams_.empty()) {
850     streams_abandoned_count_ += unclaimed_pushed_streams_.size();
851     abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
852     unclaimed_pushed_streams_.clear();
853   }
854 
855   for (int i = 0;i < NUM_PRIORITIES;++i) {
856     while (!create_stream_queues_[i].empty()) {
857       PendingCreateStream pending_create = create_stream_queues_[i].front();
858       create_stream_queues_[i].pop();
859       pending_create.callback->Run(ERR_ABORTED);
860     }
861   }
862 
863   while (!active_streams_.empty()) {
864     ActiveStreamMap::iterator it = active_streams_.begin();
865     const scoped_refptr<SpdyStream>& stream = it->second;
866     DCHECK(stream);
867     LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
868                  << "): " << stream->path();
869     DeleteStream(stream->stream_id(), status);
870   }
871 
872   // We also need to drain the queue.
873   while (queue_.size())
874     queue_.pop();
875 }
876 
GetNewStreamId()877 int SpdySession::GetNewStreamId() {
878   int id = stream_hi_water_mark_;
879   stream_hi_water_mark_ += 2;
880   if (stream_hi_water_mark_ > 0x7fff)
881     stream_hi_water_mark_ = 1;
882   return id;
883 }
884 
QueueFrame(spdy::SpdyFrame * frame,spdy::SpdyPriority priority,SpdyStream * stream)885 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
886                              spdy::SpdyPriority priority,
887                              SpdyStream* stream) {
888   int length = spdy::SpdyFrame::size() + frame->length();
889   IOBuffer* buffer = new IOBuffer(length);
890   memcpy(buffer->data(), frame->data(), length);
891   queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
892 
893   WriteSocketLater();
894 }
895 
CloseSessionOnError(net::Error err,bool remove_from_pool)896 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
897   // Closing all streams can have a side-effect of dropping the last reference
898   // to |this|.  Hold a reference through this function.
899   scoped_refptr<SpdySession> self(this);
900 
901   DCHECK_LT(err, OK);
902   net_log_.AddEvent(
903       NetLog::TYPE_SPDY_SESSION_CLOSE,
904       make_scoped_refptr(new NetLogIntegerParameter("status", err)));
905 
906   // Don't close twice.  This can occur because we can have both
907   // a read and a write outstanding, and each can complete with
908   // an error.
909   if (state_ != CLOSED) {
910     state_ = CLOSED;
911     error_ = err;
912     if (remove_from_pool)
913       RemoveFromPool();
914     CloseAllStreams(err);
915   }
916 }
917 
GetInfoAsValue() const918 Value* SpdySession::GetInfoAsValue() const {
919   DictionaryValue* dict = new DictionaryValue();
920 
921   dict->SetInteger("source_id", net_log_.source().id);
922 
923   dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
924   dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
925 
926   dict->SetInteger("active_streams", active_streams_.size());
927 
928   dict->SetInteger("unclaimed_pushed_streams",
929       unclaimed_pushed_streams_.size());
930 
931   dict->SetBoolean("is_secure", is_secure_);
932 
933   dict->SetInteger("error", error_);
934   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
935 
936   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
937   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
938   dict->SetInteger("streams_pushed_and_claimed_count",
939       streams_pushed_and_claimed_count_);
940   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
941   dict->SetInteger("frames_received", frames_received_);
942 
943   dict->SetBoolean("sent_settings", sent_settings_);
944   dict->SetBoolean("received_settings", received_settings_);
945   return dict;
946 }
947 
GetPeerAddress(AddressList * address) const948 int SpdySession::GetPeerAddress(AddressList* address) const {
949   if (!connection_->socket())
950     return ERR_SOCKET_NOT_CONNECTED;
951 
952   return connection_->socket()->GetPeerAddress(address);
953 }
954 
GetLocalAddress(IPEndPoint * address) const955 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
956   if (!connection_->socket())
957     return ERR_SOCKET_NOT_CONNECTED;
958 
959   return connection_->socket()->GetLocalAddress(address);
960 }
961 
ActivateStream(SpdyStream * stream)962 void SpdySession::ActivateStream(SpdyStream* stream) {
963   const spdy::SpdyStreamId id = stream->stream_id();
964   DCHECK(!IsStreamActive(id));
965 
966   active_streams_[id] = stream;
967 }
968 
DeleteStream(spdy::SpdyStreamId id,int status)969 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
970   // For push streams, if they are being deleted normally, we leave
971   // the stream in the unclaimed_pushed_streams_ list.  However, if
972   // the stream is errored out, clean it up entirely.
973   if (status != OK) {
974     PushedStreamMap::iterator it;
975     for (it = unclaimed_pushed_streams_.begin();
976          it != unclaimed_pushed_streams_.end(); ++it) {
977       scoped_refptr<SpdyStream> curr = it->second;
978       if (id == curr->stream_id()) {
979         unclaimed_pushed_streams_.erase(it);
980         break;
981       }
982     }
983   }
984 
985   // The stream might have been deleted.
986   ActiveStreamMap::iterator it2 = active_streams_.find(id);
987   if (it2 == active_streams_.end())
988     return;
989 
990   // If this is an active stream, call the callback.
991   const scoped_refptr<SpdyStream> stream(it2->second);
992   active_streams_.erase(it2);
993   if (stream)
994     stream->OnClose(status);
995   ProcessPendingCreateStreams();
996 }
997 
RemoveFromPool()998 void SpdySession::RemoveFromPool() {
999   if (spdy_session_pool_) {
1000     spdy_session_pool_->Remove(make_scoped_refptr(this));
1001     spdy_session_pool_ = NULL;
1002   }
1003 }
1004 
GetActivePushStream(const std::string & path)1005 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
1006     const std::string& path) {
1007   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1008 
1009   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
1010   if (it != unclaimed_pushed_streams_.end()) {
1011     net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
1012     scoped_refptr<SpdyStream> stream = it->second;
1013     unclaimed_pushed_streams_.erase(it);
1014     used_push_streams.Increment();
1015     return stream;
1016   }
1017   return NULL;
1018 }
1019 
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)1020 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
1021   if (is_secure_) {
1022     SSLClientSocket* ssl_socket =
1023         reinterpret_cast<SSLClientSocket*>(connection_->socket());
1024     ssl_socket->GetSSLInfo(ssl_info);
1025     *was_npn_negotiated = ssl_socket->was_npn_negotiated();
1026     return true;
1027   }
1028   return false;
1029 }
1030 
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)1031 bool SpdySession::GetSSLCertRequestInfo(
1032     SSLCertRequestInfo* cert_request_info) {
1033   if (is_secure_) {
1034     SSLClientSocket* ssl_socket =
1035         reinterpret_cast<SSLClientSocket*>(connection_->socket());
1036     ssl_socket->GetSSLCertRequestInfo(cert_request_info);
1037     return true;
1038   }
1039   return false;
1040 }
1041 
OnError(spdy::SpdyFramer * framer)1042 void SpdySession::OnError(spdy::SpdyFramer* framer) {
1043   CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1044 }
1045 
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1046 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1047                                     const char* data,
1048                                     size_t len) {
1049   if (net_log().IsLoggingAllEvents()) {
1050     net_log().AddEvent(
1051         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1052         make_scoped_refptr(new NetLogSpdyDataParameter(
1053             stream_id, len, spdy::SpdyDataFlags())));
1054   }
1055 
1056   if (!IsStreamActive(stream_id)) {
1057     // NOTE:  it may just be that the stream was cancelled.
1058     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
1059     return;
1060   }
1061 
1062   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1063   stream->OnDataReceived(data, len);
1064 }
1065 
Respond(const spdy::SpdyHeaderBlock & headers,const scoped_refptr<SpdyStream> stream)1066 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
1067                           const scoped_refptr<SpdyStream> stream) {
1068   int rv = OK;
1069   rv = stream->OnResponseReceived(headers);
1070   if (rv < 0) {
1071     DCHECK_NE(rv, ERR_IO_PENDING);
1072     const spdy::SpdyStreamId stream_id = stream->stream_id();
1073     DeleteStream(stream_id, rv);
1074     return false;
1075   }
1076   return true;
1077 }
1078 
OnSyn(const spdy::SpdySynStreamControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1079 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
1080                         const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1081   spdy::SpdyStreamId stream_id = frame.stream_id();
1082   spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
1083 
1084   if (net_log_.IsLoggingAllEvents()) {
1085     net_log_.AddEvent(
1086         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1087         make_scoped_refptr(new NetLogSpdySynParameter(
1088             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1089             stream_id, associated_stream_id)));
1090   }
1091 
1092   // Server-initiated streams should have even sequence numbers.
1093   if ((stream_id & 0x1) != 0) {
1094     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1095     return;
1096   }
1097 
1098   if (IsStreamActive(stream_id)) {
1099     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1100     return;
1101   }
1102 
1103   if (associated_stream_id == 0) {
1104     LOG(WARNING) << "Received invalid OnSyn associated stream id "
1105                  << associated_stream_id
1106                  << " for stream " << stream_id;
1107     ResetStream(stream_id, spdy::INVALID_STREAM);
1108     return;
1109   }
1110 
1111   streams_pushed_count_++;
1112 
1113   // TODO(mbelshe): DCHECK that this is a GET method?
1114 
1115   // Verify that the response had a URL for us.
1116   const std::string& url = ContainsKey(*headers, "url") ?
1117       headers->find("url")->second : "";
1118   if (url.empty()) {
1119     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1120     LOG(WARNING) << "Pushed stream did not contain a url.";
1121     return;
1122   }
1123 
1124   GURL gurl(url);
1125   if (!gurl.is_valid()) {
1126     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1127     LOG(WARNING) << "Pushed stream url was invalid: " << url;
1128     return;
1129   }
1130 
1131   // Verify we have a valid stream association.
1132   if (!IsStreamActive(associated_stream_id)) {
1133     LOG(WARNING) << "Received OnSyn with inactive associated stream "
1134                << associated_stream_id;
1135     ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
1136     return;
1137   }
1138 
1139   scoped_refptr<SpdyStream> associated_stream =
1140       active_streams_[associated_stream_id];
1141   GURL associated_url(associated_stream->GetUrl());
1142   if (associated_url.GetOrigin() != gurl.GetOrigin()) {
1143     LOG(WARNING) << "Rejected Cross Origin Push Stream "
1144                  << associated_stream_id;
1145     ResetStream(stream_id, spdy::REFUSED_STREAM);
1146     return;
1147   }
1148 
1149   // There should not be an existing pushed stream with the same path.
1150   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1151   if (it != unclaimed_pushed_streams_.end()) {
1152     LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
1153     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1154     return;
1155   }
1156 
1157   scoped_refptr<SpdyStream> stream(
1158       new SpdyStream(this, stream_id, true, net_log_));
1159 
1160   stream->set_path(gurl.PathForRequest());
1161 
1162   unclaimed_pushed_streams_[url] = stream;
1163 
1164   ActivateStream(stream);
1165   stream->set_response_received();
1166 
1167   // Parse the headers.
1168   if (!Respond(*headers, stream))
1169     return;
1170 
1171   base::StatsCounter push_requests("spdy.pushed_streams");
1172   push_requests.Increment();
1173 }
1174 
OnSynReply(const spdy::SpdySynReplyControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1175 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
1176                              const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1177   spdy::SpdyStreamId stream_id = frame.stream_id();
1178 
1179   bool valid_stream = IsStreamActive(stream_id);
1180   if (!valid_stream) {
1181     // NOTE:  it may just be that the stream was cancelled.
1182     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
1183     return;
1184   }
1185 
1186   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1187   CHECK_EQ(stream->stream_id(), stream_id);
1188   CHECK(!stream->cancelled());
1189 
1190   if (stream->response_received()) {
1191     LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
1192     CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
1193     return;
1194   }
1195   stream->set_response_received();
1196 
1197   if (net_log().IsLoggingAllEvents()) {
1198     net_log().AddEvent(
1199         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1200         make_scoped_refptr(new NetLogSpdySynParameter(
1201             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1202             stream_id, 0)));
1203   }
1204 
1205   Respond(*headers, stream);
1206 }
1207 
OnHeaders(const spdy::SpdyHeadersControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1208 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
1209                             const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1210   spdy::SpdyStreamId stream_id = frame.stream_id();
1211 
1212   bool valid_stream = IsStreamActive(stream_id);
1213   if (!valid_stream) {
1214     // NOTE:  it may just be that the stream was cancelled.
1215     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
1216     return;
1217   }
1218 
1219   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1220   CHECK_EQ(stream->stream_id(), stream_id);
1221   CHECK(!stream->cancelled());
1222 
1223   if (net_log().IsLoggingAllEvents()) {
1224     net_log().AddEvent(
1225         NetLog::TYPE_SPDY_SESSION_HEADERS,
1226         make_scoped_refptr(new NetLogSpdySynParameter(
1227             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1228             stream_id, 0)));
1229   }
1230 
1231   int rv = stream->OnHeaders(*headers);
1232   if (rv < 0) {
1233     DCHECK_NE(rv, ERR_IO_PENDING);
1234     const spdy::SpdyStreamId stream_id = stream->stream_id();
1235     DeleteStream(stream_id, rv);
1236   }
1237 }
1238 
OnControl(const spdy::SpdyControlFrame * frame)1239 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
1240   const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
1241   uint32 type = frame->type();
1242   if (type == spdy::SYN_STREAM ||
1243       type == spdy::SYN_REPLY ||
1244       type == spdy::HEADERS) {
1245     if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
1246       LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
1247       int stream_id = 0;
1248       if (type == spdy::SYN_STREAM) {
1249         stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
1250                      (frame))->stream_id();
1251       } else if (type == spdy::SYN_REPLY) {
1252         stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
1253                      (frame))->stream_id();
1254       } else if (type == spdy::HEADERS) {
1255         stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
1256                      (frame))->stream_id();
1257       }
1258       if(IsStreamActive(stream_id))
1259         ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1260       return;
1261     }
1262   }
1263 
1264   frames_received_++;
1265 
1266   switch (type) {
1267     case spdy::GOAWAY:
1268       OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
1269       break;
1270     case spdy::PING:
1271       OnPing(*reinterpret_cast<const spdy::SpdyPingControlFrame*>(frame));
1272       break;
1273     case spdy::SETTINGS:
1274       OnSettings(
1275           *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
1276       break;
1277     case spdy::RST_STREAM:
1278       OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
1279       break;
1280     case spdy::SYN_STREAM:
1281       OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
1282             headers);
1283       break;
1284     case spdy::HEADERS:
1285       OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
1286                 headers);
1287       break;
1288     case spdy::SYN_REPLY:
1289       OnSynReply(
1290           *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
1291           headers);
1292       break;
1293     case spdy::WINDOW_UPDATE:
1294       OnWindowUpdate(
1295           *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
1296       break;
1297     default:
1298       DCHECK(false);  // Error!
1299   }
1300 }
1301 
OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,const char * header_data,size_t len)1302 bool SpdySession::OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,
1303                                            const char* header_data,
1304                                            size_t len) {
1305   DCHECK(false);
1306   return false;
1307 }
1308 
OnDataFrameHeader(const spdy::SpdyDataFrame * frame)1309 void SpdySession::OnDataFrameHeader(const spdy::SpdyDataFrame* frame) {
1310   DCHECK(false);
1311 }
1312 
OnRst(const spdy::SpdyRstStreamControlFrame & frame)1313 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
1314   spdy::SpdyStreamId stream_id = frame.stream_id();
1315 
1316   net_log().AddEvent(
1317       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
1318       make_scoped_refptr(
1319           new NetLogSpdyRstParameter(stream_id, frame.status())));
1320 
1321   bool valid_stream = IsStreamActive(stream_id);
1322   if (!valid_stream) {
1323     // NOTE:  it may just be that the stream was cancelled.
1324     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1325     return;
1326   }
1327   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1328   CHECK_EQ(stream->stream_id(), stream_id);
1329   CHECK(!stream->cancelled());
1330 
1331   if (frame.status() == 0) {
1332     stream->OnDataReceived(NULL, 0);
1333   } else {
1334     LOG(ERROR) << "Spdy stream closed: " << frame.status();
1335     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1336     //                For now, it doesn't matter much - it is a protocol error.
1337     DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
1338   }
1339 }
1340 
OnGoAway(const spdy::SpdyGoAwayControlFrame & frame)1341 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
1342   net_log_.AddEvent(
1343       NetLog::TYPE_SPDY_SESSION_GOAWAY,
1344       make_scoped_refptr(
1345           new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
1346                                         active_streams_.size(),
1347                                         unclaimed_pushed_streams_.size())));
1348   RemoveFromPool();
1349   CloseAllStreams(net::ERR_ABORTED);
1350 
1351   // TODO(willchan): Cancel any streams that are past the GoAway frame's
1352   // |last_accepted_stream_id|.
1353 
1354   // Don't bother killing any streams that are still reading.  They'll either
1355   // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
1356   // closed.
1357 }
1358 
OnPing(const spdy::SpdyPingControlFrame & frame)1359 void SpdySession::OnPing(const spdy::SpdyPingControlFrame& frame) {
1360   net_log_.AddEvent(
1361       NetLog::TYPE_SPDY_SESSION_PING,
1362       make_scoped_refptr(new NetLogSpdyPingParameter(frame.unique_id())));
1363 
1364   // Send response to a PING from server.
1365   if (frame.unique_id() % 2 == 0) {
1366     WritePingFrame(frame.unique_id());
1367     return;
1368   }
1369 
1370   --pings_in_flight_;
1371   if (pings_in_flight_ < 0) {
1372     CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1373     return;
1374   }
1375 
1376   if (pings_in_flight_ > 0)
1377     return;
1378 
1379   if (!need_to_send_ping_)
1380     return;
1381 
1382   PlanToSendTrailingPing();
1383 }
1384 
OnSettings(const spdy::SpdySettingsControlFrame & frame)1385 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
1386   spdy::SpdySettings settings;
1387   if (spdy_framer_.ParseSettings(&frame, &settings)) {
1388     HandleSettings(settings);
1389     spdy_settings_->Set(host_port_pair(), settings);
1390   }
1391 
1392   received_settings_ = true;
1393 
1394   net_log_.AddEvent(
1395       NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1396       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1397 }
1398 
OnWindowUpdate(const spdy::SpdyWindowUpdateControlFrame & frame)1399 void SpdySession::OnWindowUpdate(
1400     const spdy::SpdyWindowUpdateControlFrame& frame) {
1401   spdy::SpdyStreamId stream_id = frame.stream_id();
1402   if (!IsStreamActive(stream_id)) {
1403     LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
1404     return;
1405   }
1406 
1407   int delta_window_size = static_cast<int>(frame.delta_window_size());
1408   if (delta_window_size < 1) {
1409     LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
1410                  << delta_window_size;
1411     ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
1412     return;
1413   }
1414 
1415   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1416   CHECK_EQ(stream->stream_id(), stream_id);
1417   CHECK(!stream->cancelled());
1418 
1419   if (use_flow_control_)
1420     stream->IncreaseSendWindowSize(delta_window_size);
1421 
1422   net_log_.AddEvent(
1423       NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
1424       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1425           stream_id, delta_window_size, stream->send_window_size())));
1426 }
1427 
SendWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1428 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
1429                                    int delta_window_size) {
1430   DCHECK(IsStreamActive(stream_id));
1431   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1432   CHECK_EQ(stream->stream_id(), stream_id);
1433 
1434   net_log_.AddEvent(
1435       NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
1436       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1437           stream_id, delta_window_size, stream->recv_window_size())));
1438 
1439   scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
1440       spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
1441   QueueFrame(window_update_frame.get(), stream->priority(), stream);
1442 }
1443 
1444 // Given a cwnd that we would have sent to the server, modify it based on the
1445 // field trial policy.
ApplyCwndFieldTrialPolicy(int cwnd)1446 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1447   base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1448   if (!trial) {
1449       LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1450       return cwnd;
1451   }
1452   if (trial->group_name() == "cwnd10")
1453     return 10;
1454   else if (trial->group_name() == "cwnd16")
1455     return 16;
1456   else if (trial->group_name() == "cwndMin16")
1457     return std::max(cwnd, 16);
1458   else if (trial->group_name() == "cwndMin10")
1459     return std::max(cwnd, 10);
1460   else if (trial->group_name() == "cwndDynamic")
1461     return cwnd;
1462   NOTREACHED();
1463   return cwnd;
1464 }
1465 
SendSettings()1466 void SpdySession::SendSettings() {
1467   // Note:  we're copying the settings here, so that we can potentially modify
1468   // the settings for the field trial.  When removing the field trial, make
1469   // this a reference to the const SpdySettings again.
1470   spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
1471   if (settings.empty())
1472     return;
1473 
1474   // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
1475   for (spdy::SpdySettings::iterator i = settings.begin(),
1476            end = settings.end(); i != end; ++i) {
1477     const uint32 id = i->first.id();
1478     const uint32 val = i->second;
1479     switch (id) {
1480       case spdy::SETTINGS_CURRENT_CWND:
1481         uint32 cwnd = 0;
1482         cwnd = ApplyCwndFieldTrialPolicy(val);
1483         UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
1484                                     cwnd,
1485                                     1, 200, 100);
1486         if (cwnd != val) {
1487           i->second = cwnd;
1488           i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
1489           spdy_settings_->Set(host_port_pair(), settings);
1490         }
1491         break;
1492     }
1493   }
1494 
1495   HandleSettings(settings);
1496 
1497   net_log_.AddEvent(
1498       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1499       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1500 
1501   // Create the SETTINGS frame and send it.
1502   scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
1503       spdy_framer_.CreateSettings(settings));
1504   sent_settings_ = true;
1505   QueueFrame(settings_frame.get(), 0, NULL);
1506 }
1507 
HandleSettings(const spdy::SpdySettings & settings)1508 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
1509   for (spdy::SpdySettings::const_iterator i = settings.begin(),
1510            end = settings.end(); i != end; ++i) {
1511     const uint32 id = i->first.id();
1512     const uint32 val = i->second;
1513     switch (id) {
1514       case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
1515         max_concurrent_streams_ = std::min(static_cast<size_t>(val),
1516                                            max_concurrent_stream_limit_);
1517         ProcessPendingCreateStreams();
1518         break;
1519     }
1520   }
1521 }
1522 
SendPrefacePingIfNoneInFlight()1523 void SpdySession::SendPrefacePingIfNoneInFlight() {
1524   if (pings_in_flight_ || trailing_ping_pending_ ||
1525       !enable_ping_based_connection_checking_)
1526     return;
1527 
1528   const base::TimeDelta kConnectionAtRiskOfLoss =
1529       base::TimeDelta::FromMilliseconds(connection_at_risk_of_loss_ms_);
1530 
1531   base::TimeTicks now = base::TimeTicks::Now();
1532   // If we haven't heard from server, then send a preface-PING.
1533   if ((now - received_data_time_) > kConnectionAtRiskOfLoss)
1534     SendPrefacePing();
1535 
1536   PlanToSendTrailingPing();
1537 }
1538 
SendPrefacePing()1539 void SpdySession::SendPrefacePing() {
1540   // TODO(rtenneti): Send preface pings when more servers support additional
1541   // pings.
1542   // WritePingFrame(next_ping_id_);
1543 }
1544 
PlanToSendTrailingPing()1545 void SpdySession::PlanToSendTrailingPing() {
1546   if (trailing_ping_pending_)
1547     return;
1548 
1549   trailing_ping_pending_ = true;
1550   MessageLoop::current()->PostDelayedTask(
1551       FROM_HERE,
1552       method_factory_.NewRunnableMethod(&SpdySession::SendTrailingPing),
1553       trailing_ping_delay_time_ms_);
1554 }
1555 
SendTrailingPing()1556 void SpdySession::SendTrailingPing() {
1557   DCHECK(trailing_ping_pending_);
1558   trailing_ping_pending_ = false;
1559   WritePingFrame(next_ping_id_);
1560 }
1561 
WritePingFrame(uint32 unique_id)1562 void SpdySession::WritePingFrame(uint32 unique_id) {
1563   scoped_ptr<spdy::SpdyPingControlFrame> ping_frame(
1564       spdy_framer_.CreatePingFrame(next_ping_id_));
1565   QueueFrame(ping_frame.get(), SPDY_PRIORITY_HIGHEST, NULL);
1566 
1567   if (net_log().IsLoggingAllEvents()) {
1568     net_log().AddEvent(
1569         NetLog::TYPE_SPDY_SESSION_PING,
1570         make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_)));
1571   }
1572   if (unique_id % 2 != 0) {
1573     next_ping_id_ += 2;
1574     ++pings_in_flight_;
1575     need_to_send_ping_ = false;
1576     PlanToCheckPingStatus();
1577   }
1578 }
1579 
PlanToCheckPingStatus()1580 void SpdySession::PlanToCheckPingStatus() {
1581   if (check_ping_status_pending_)
1582     return;
1583 
1584   check_ping_status_pending_ = true;
1585   MessageLoop::current()->PostDelayedTask(
1586       FROM_HERE,
1587       method_factory_.NewRunnableMethod(
1588           &SpdySession::CheckPingStatus, base::TimeTicks::Now()),
1589       hung_interval_ms_);
1590 }
1591 
CheckPingStatus(base::TimeTicks last_check_time)1592 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
1593   // Check if we got a response back for all PINGs we had sent.
1594   if (pings_in_flight_ == 0) {
1595     check_ping_status_pending_ = false;
1596     return;
1597   }
1598 
1599   DCHECK(check_ping_status_pending_);
1600 
1601   const base::TimeDelta kHungInterval =
1602       base::TimeDelta::FromMilliseconds(hung_interval_ms_);
1603 
1604   base::TimeTicks now = base::TimeTicks::Now();
1605   base::TimeDelta delay = kHungInterval - (now - received_data_time_);
1606 
1607   if (delay.InMilliseconds() < 0 || received_data_time_ < last_check_time) {
1608     DCHECK(now - received_data_time_ > kHungInterval);
1609     CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true);
1610     return;
1611   }
1612 
1613   // Check the status of connection after a delay.
1614   MessageLoop::current()->PostDelayedTask(
1615       FROM_HERE,
1616       method_factory_.NewRunnableMethod(&SpdySession::CheckPingStatus, now),
1617       delay.InMilliseconds());
1618 }
1619 
RecordHistograms()1620 void SpdySession::RecordHistograms() {
1621   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
1622                               streams_initiated_count_,
1623                               0, 300, 50);
1624   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
1625                               streams_pushed_count_,
1626                               0, 300, 50);
1627   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
1628                               streams_pushed_and_claimed_count_,
1629                               0, 300, 50);
1630   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
1631                               streams_abandoned_count_,
1632                               0, 300, 50);
1633   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
1634                             sent_settings_ ? 1 : 0, 2);
1635   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
1636                             received_settings_ ? 1 : 0, 2);
1637   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
1638                               stalled_streams_,
1639                               0, 300, 50);
1640   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
1641                             stalled_streams_ > 0 ? 1 : 0, 2);
1642 
1643   if (received_settings_) {
1644     // Enumerate the saved settings, and set histograms for it.
1645     const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
1646 
1647     spdy::SpdySettings::const_iterator it;
1648     for (it = settings.begin(); it != settings.end(); ++it) {
1649       const spdy::SpdySetting setting = *it;
1650       switch (setting.first.id()) {
1651         case spdy::SETTINGS_CURRENT_CWND:
1652           // Record several different histograms to see if cwnd converges
1653           // for larger volumes of data being sent.
1654           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
1655                                       setting.second,
1656                                       1, 200, 100);
1657           if (bytes_received_ > 10 * 1024) {
1658             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
1659                                         setting.second,
1660                                         1, 200, 100);
1661             if (bytes_received_ > 25 * 1024) {
1662               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
1663                                           setting.second,
1664                                           1, 200, 100);
1665               if (bytes_received_ > 50 * 1024) {
1666                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
1667                                             setting.second,
1668                                             1, 200, 100);
1669                 if (bytes_received_ > 100 * 1024) {
1670                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
1671                                               setting.second,
1672                                               1, 200, 100);
1673                 }
1674               }
1675             }
1676           }
1677           break;
1678         case spdy::SETTINGS_ROUND_TRIP_TIME:
1679           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
1680                                       setting.second,
1681                                       1, 1200, 100);
1682           break;
1683         case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
1684           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
1685                                       setting.second,
1686                                       1, 100, 50);
1687           break;
1688       }
1689     }
1690   }
1691 }
1692 
InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream> * stream)1693 void SpdySession::InvokeUserStreamCreationCallback(
1694     scoped_refptr<SpdyStream>* stream) {
1695   PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
1696 
1697   // Exit if the request has already been cancelled.
1698   if (it == pending_callback_map_.end())
1699     return;
1700 
1701   CompletionCallback* callback = it->second.callback;
1702   int result = it->second.result;
1703   pending_callback_map_.erase(it);
1704   callback->Run(result);
1705 }
1706 
1707 }  // namespace net
1708