• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_session.h"
6 
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/memory/linked_ptr.h"
10 #include "base/message_loop.h"
11 #include "base/metrics/field_trial.h"
12 #include "base/metrics/stats_counters.h"
13 #include "base/stl_util-inl.h"
14 #include "base/string_number_conversions.h"
15 #include "base/string_util.h"
16 #include "base/stringprintf.h"
17 #include "base/time.h"
18 #include "base/utf_string_conversions.h"
19 #include "base/values.h"
20 #include "net/base/connection_type_histograms.h"
21 #include "net/base/net_log.h"
22 #include "net/base/net_util.h"
23 #include "net/http/http_network_session.h"
24 #include "net/socket/ssl_client_socket.h"
25 #include "net/spdy/spdy_frame_builder.h"
26 #include "net/spdy/spdy_http_utils.h"
27 #include "net/spdy/spdy_protocol.h"
28 #include "net/spdy/spdy_session_pool.h"
29 #include "net/spdy/spdy_settings_storage.h"
30 #include "net/spdy/spdy_stream.h"
31 
32 namespace net {
33 
NetLogSpdySynParameter(const linked_ptr<spdy::SpdyHeaderBlock> & headers,spdy::SpdyControlFlags flags,spdy::SpdyStreamId id,spdy::SpdyStreamId associated_stream)34 NetLogSpdySynParameter::NetLogSpdySynParameter(
35     const linked_ptr<spdy::SpdyHeaderBlock>& headers,
36     spdy::SpdyControlFlags flags,
37     spdy::SpdyStreamId id,
38     spdy::SpdyStreamId associated_stream)
39     : headers_(headers),
40       flags_(flags),
41       id_(id),
42       associated_stream_(associated_stream) {
43 }
44 
~NetLogSpdySynParameter()45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
46 }
47 
ToValue() const48 Value* NetLogSpdySynParameter::ToValue() const {
49   DictionaryValue* dict = new DictionaryValue();
50   ListValue* headers_list = new ListValue();
51   for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
52       it != headers_->end(); ++it) {
53     headers_list->Append(new StringValue(base::StringPrintf(
54         "%s: %s", it->first.c_str(), it->second.c_str())));
55   }
56   dict->SetInteger("flags", flags_);
57   dict->Set("headers", headers_list);
58   dict->SetInteger("id", id_);
59   if (associated_stream_)
60     dict->SetInteger("associated_stream", associated_stream_);
61   return dict;
62 }
63 
64 namespace {
65 
66 const int kReadBufferSize = 8 * 1024;
67 
68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
69  public:
NetLogSpdySessionParameter(const HostPortProxyPair & host_pair)70   NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
71       : host_pair_(host_pair) {}
ToValue() const72   virtual Value* ToValue() const {
73     DictionaryValue* dict = new DictionaryValue();
74     dict->Set("host", new StringValue(host_pair_.first.ToString()));
75     dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
76     return dict;
77   }
78  private:
79   const HostPortProxyPair host_pair_;
80   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
81 };
82 
83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
84  public:
NetLogSpdySettingsParameter(const spdy::SpdySettings & settings)85   explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
86       : settings_(settings) {}
87 
ToValue() const88   virtual Value* ToValue() const {
89     DictionaryValue* dict = new DictionaryValue();
90     ListValue* settings = new ListValue();
91     for (spdy::SpdySettings::const_iterator it = settings_.begin();
92          it != settings_.end(); ++it) {
93       settings->Append(new StringValue(
94           base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
95     }
96     dict->Set("settings", settings);
97     return dict;
98   }
99 
100  private:
~NetLogSpdySettingsParameter()101   ~NetLogSpdySettingsParameter() {}
102   const spdy::SpdySettings settings_;
103 
104   DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
105 };
106 
107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
108  public:
NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)109   NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
110                                   int delta,
111                                   int window_size)
112       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
113 
ToValue() const114   virtual Value* ToValue() const {
115     DictionaryValue* dict = new DictionaryValue();
116     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
117     dict->SetInteger("delta", delta_);
118     dict->SetInteger("window_size", window_size_);
119     return dict;
120   }
121 
122  private:
~NetLogSpdyWindowUpdateParameter()123   ~NetLogSpdyWindowUpdateParameter() {}
124   const spdy::SpdyStreamId stream_id_;
125   const int delta_;
126   const int window_size_;
127 
128   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
129 };
130 
131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
132  public:
NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,int size,spdy::SpdyDataFlags flags)133   NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
134                           int size,
135                           spdy::SpdyDataFlags flags)
136       : stream_id_(stream_id), size_(size), flags_(flags) {}
137 
ToValue() const138   virtual Value* ToValue() const {
139     DictionaryValue* dict = new DictionaryValue();
140     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
141     dict->SetInteger("size", size_);
142     dict->SetInteger("flags", static_cast<int>(flags_));
143     return dict;
144   }
145 
146  private:
~NetLogSpdyDataParameter()147   ~NetLogSpdyDataParameter() {}
148   const spdy::SpdyStreamId stream_id_;
149   const int size_;
150   const spdy::SpdyDataFlags flags_;
151 
152   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
153 };
154 
155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
156  public:
NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id,int status)157   NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
158       : stream_id_(stream_id), status_(status) {}
159 
ToValue() const160   virtual Value* ToValue() const {
161     DictionaryValue* dict = new DictionaryValue();
162     dict->SetInteger("stream_id", static_cast<int>(stream_id_));
163     dict->SetInteger("status", status_);
164     return dict;
165   }
166 
167  private:
~NetLogSpdyRstParameter()168   ~NetLogSpdyRstParameter() {}
169   const spdy::SpdyStreamId stream_id_;
170   const int status_;
171 
172   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
173 };
174 
175 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
176  public:
NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,int active_streams,int unclaimed_streams)177   NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
178                             int active_streams,
179                             int unclaimed_streams)
180       : last_stream_id_(last_stream_id),
181         active_streams_(active_streams),
182         unclaimed_streams_(unclaimed_streams) {}
183 
ToValue() const184   virtual Value* ToValue() const {
185     DictionaryValue* dict = new DictionaryValue();
186     dict->SetInteger("last_accepted_stream_id",
187                      static_cast<int>(last_stream_id_));
188     dict->SetInteger("active_streams", active_streams_);
189     dict->SetInteger("unclaimed_streams", unclaimed_streams_);
190     return dict;
191   }
192 
193  private:
~NetLogSpdyGoAwayParameter()194   ~NetLogSpdyGoAwayParameter() {}
195   const spdy::SpdyStreamId last_stream_id_;
196   const int active_streams_;
197   const int unclaimed_streams_;
198 
199   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
200 };
201 
202 }  // namespace
203 
204 // static
205 bool SpdySession::use_ssl_ = true;
206 
207 // static
208 bool SpdySession::use_flow_control_ = false;
209 
210 // static
211 size_t SpdySession::max_concurrent_stream_limit_ = 256;
212 
SpdySession(const HostPortProxyPair & host_port_proxy_pair,SpdySessionPool * spdy_session_pool,SpdySettingsStorage * spdy_settings,NetLog * net_log)213 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
214                          SpdySessionPool* spdy_session_pool,
215                          SpdySettingsStorage* spdy_settings,
216                          NetLog* net_log)
217     : ALLOW_THIS_IN_INITIALIZER_LIST(
218           read_callback_(this, &SpdySession::OnReadComplete)),
219       ALLOW_THIS_IN_INITIALIZER_LIST(
220           write_callback_(this, &SpdySession::OnWriteComplete)),
221       ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
222       host_port_proxy_pair_(host_port_proxy_pair),
223       spdy_session_pool_(spdy_session_pool),
224       spdy_settings_(spdy_settings),
225       connection_(new ClientSocketHandle),
226       read_buffer_(new IOBuffer(kReadBufferSize)),
227       read_pending_(false),
228       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
229       write_pending_(false),
230       delayed_write_pending_(false),
231       is_secure_(false),
232       certificate_error_code_(OK),
233       error_(OK),
234       state_(IDLE),
235       max_concurrent_streams_(kDefaultMaxConcurrentStreams),
236       streams_initiated_count_(0),
237       streams_pushed_count_(0),
238       streams_pushed_and_claimed_count_(0),
239       streams_abandoned_count_(0),
240       frames_received_(0),
241       bytes_received_(0),
242       sent_settings_(false),
243       received_settings_(false),
244       stalled_streams_(0),
245       initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
246       initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
247       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
248   DCHECK(HttpStreamFactory::spdy_enabled());
249   net_log_.BeginEvent(
250       NetLog::TYPE_SPDY_SESSION,
251       make_scoped_refptr(
252           new NetLogSpdySessionParameter(host_port_proxy_pair_)));
253 
254   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
255 
256   spdy_framer_.set_visitor(this);
257 
258   SendSettings();
259 }
260 
~SpdySession()261 SpdySession::~SpdySession() {
262   if (state_ != CLOSED) {
263     state_ = CLOSED;
264 
265     // Cleanup all the streams.
266     CloseAllStreams(net::ERR_ABORTED);
267   }
268 
269   if (connection_->is_initialized()) {
270     // With Spdy we can't recycle sockets.
271     connection_->socket()->Disconnect();
272   }
273 
274   // Streams should all be gone now.
275   DCHECK_EQ(0u, num_active_streams());
276   DCHECK_EQ(0u, num_unclaimed_pushed_streams());
277 
278   DCHECK(pending_callback_map_.empty());
279 
280   RecordHistograms();
281 
282   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
283 }
284 
InitializeWithSocket(ClientSocketHandle * connection,bool is_secure,int certificate_error_code)285 net::Error SpdySession::InitializeWithSocket(
286     ClientSocketHandle* connection,
287     bool is_secure,
288     int certificate_error_code) {
289   base::StatsCounter spdy_sessions("spdy.sessions");
290   spdy_sessions.Increment();
291 
292   state_ = CONNECTED;
293   connection_.reset(connection);
294   is_secure_ = is_secure;
295   certificate_error_code_ = certificate_error_code;
296 
297   // Write out any data that we might have to send, such as the settings frame.
298   WriteSocketLater();
299   net::Error error = ReadSocket();
300   if (error == ERR_IO_PENDING)
301     return OK;
302   return error;
303 }
304 
VerifyDomainAuthentication(const std::string & domain)305 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
306   if (state_ != CONNECTED)
307     return false;
308 
309   SSLInfo ssl_info;
310   bool was_npn_negotiated;
311   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
312     return true;   // This is not a secure session, so all domains are okay.
313 
314   return ssl_info.cert->VerifyNameMatch(domain);
315 }
316 
GetPushStream(const GURL & url,scoped_refptr<SpdyStream> * stream,const BoundNetLog & stream_net_log)317 int SpdySession::GetPushStream(
318     const GURL& url,
319     scoped_refptr<SpdyStream>* stream,
320     const BoundNetLog& stream_net_log) {
321   CHECK_NE(state_, CLOSED);
322 
323   *stream = NULL;
324 
325   // Don't allow access to secure push streams over an unauthenticated, but
326   // encrypted SSL socket.
327   if (is_secure_ && certificate_error_code_ != OK &&
328       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
329     LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
330                << "unauthenticated session.";
331     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
332     return ERR_SPDY_PROTOCOL_ERROR;
333   }
334 
335   *stream = GetActivePushStream(url.spec());
336   if (stream->get()) {
337     DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
338     streams_pushed_and_claimed_count_++;
339     return OK;
340   }
341   return 0;
342 }
343 
CreateStream(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log,CompletionCallback * callback)344 int SpdySession::CreateStream(
345     const GURL& url,
346     RequestPriority priority,
347     scoped_refptr<SpdyStream>* spdy_stream,
348     const BoundNetLog& stream_net_log,
349     CompletionCallback* callback) {
350   if (!max_concurrent_streams_ ||
351       active_streams_.size() < max_concurrent_streams_) {
352     return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
353   }
354 
355   stalled_streams_++;
356   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
357   create_stream_queues_[priority].push(
358       PendingCreateStream(url, priority, spdy_stream,
359                           stream_net_log, callback));
360   return ERR_IO_PENDING;
361 }
362 
ProcessPendingCreateStreams()363 void SpdySession::ProcessPendingCreateStreams() {
364   while (!max_concurrent_streams_ ||
365          active_streams_.size() < max_concurrent_streams_) {
366     bool no_pending_create_streams = true;
367     for (int i = 0;i < NUM_PRIORITIES;++i) {
368       if (!create_stream_queues_[i].empty()) {
369         PendingCreateStream pending_create = create_stream_queues_[i].front();
370         create_stream_queues_[i].pop();
371         no_pending_create_streams = false;
372         int error = CreateStreamImpl(*pending_create.url,
373                                      pending_create.priority,
374                                      pending_create.spdy_stream,
375                                      *pending_create.stream_net_log);
376         scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
377         DCHECK(!ContainsKey(pending_callback_map_, stream));
378         pending_callback_map_[stream] =
379             CallbackResultPair(pending_create.callback, error);
380         MessageLoop::current()->PostTask(
381             FROM_HERE,
382             method_factory_.NewRunnableMethod(
383                 &SpdySession::InvokeUserStreamCreationCallback, stream));
384         break;
385       }
386     }
387     if (no_pending_create_streams)
388       return;  // there were no streams in any queue
389   }
390 }
391 
CancelPendingCreateStreams(const scoped_refptr<SpdyStream> * spdy_stream)392 void SpdySession::CancelPendingCreateStreams(
393     const scoped_refptr<SpdyStream>* spdy_stream) {
394   PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
395   if (it != pending_callback_map_.end()) {
396     pending_callback_map_.erase(it);
397     return;
398   }
399 
400   for (int i = 0;i < NUM_PRIORITIES;++i) {
401     PendingCreateStreamQueue tmp;
402     // Make a copy removing this trans
403     while (!create_stream_queues_[i].empty()) {
404       PendingCreateStream pending_create = create_stream_queues_[i].front();
405       create_stream_queues_[i].pop();
406       if (pending_create.spdy_stream != spdy_stream)
407         tmp.push(pending_create);
408     }
409     // Now copy it back
410     while (!tmp.empty()) {
411       create_stream_queues_[i].push(tmp.front());
412       tmp.pop();
413     }
414   }
415 }
416 
CreateStreamImpl(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log)417 int SpdySession::CreateStreamImpl(
418     const GURL& url,
419     RequestPriority priority,
420     scoped_refptr<SpdyStream>* spdy_stream,
421     const BoundNetLog& stream_net_log) {
422   // Make sure that we don't try to send https/wss over an unauthenticated, but
423   // encrypted SSL socket.
424   if (is_secure_ && certificate_error_code_ != OK &&
425       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
426     LOG(ERROR) << "Tried to create spdy stream for secure content over an "
427                << "unauthenticated session.";
428     CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
429     return ERR_SPDY_PROTOCOL_ERROR;
430   }
431 
432   const std::string& path = url.PathForRequest();
433 
434   const spdy::SpdyStreamId stream_id = GetNewStreamId();
435 
436   *spdy_stream = new SpdyStream(this,
437                                 stream_id,
438                                 false,
439                                 stream_net_log);
440   const scoped_refptr<SpdyStream>& stream = *spdy_stream;
441 
442   stream->set_priority(priority);
443   stream->set_path(path);
444   stream->set_send_window_size(initial_send_window_size_);
445   stream->set_recv_window_size(initial_recv_window_size_);
446   ActivateStream(stream);
447 
448   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
449       static_cast<int>(priority), 0, 10, 11);
450 
451   // TODO(mbelshe): Optimize memory allocations
452   DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
453 
454   DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
455   return OK;
456 }
457 
WriteSynStream(spdy::SpdyStreamId stream_id,RequestPriority priority,spdy::SpdyControlFlags flags,const linked_ptr<spdy::SpdyHeaderBlock> & headers)458 int SpdySession::WriteSynStream(
459     spdy::SpdyStreamId stream_id,
460     RequestPriority priority,
461     spdy::SpdyControlFlags flags,
462     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
463   // Find our stream
464   if (!IsStreamActive(stream_id))
465     return ERR_INVALID_SPDY_STREAM;
466   const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
467   CHECK_EQ(stream->stream_id(), stream_id);
468 
469   scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
470       spdy_framer_.CreateSynStream(
471           stream_id, 0,
472           ConvertRequestPriorityToSpdyPriority(priority),
473           flags, false, headers.get()));
474   QueueFrame(syn_frame.get(), priority, stream);
475 
476   base::StatsCounter spdy_requests("spdy.requests");
477   spdy_requests.Increment();
478   streams_initiated_count_++;
479 
480   if (net_log().IsLoggingAllEvents()) {
481     net_log().AddEvent(
482         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
483         make_scoped_refptr(
484             new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
485   }
486 
487   return ERR_IO_PENDING;
488 }
489 
WriteStreamData(spdy::SpdyStreamId stream_id,net::IOBuffer * data,int len,spdy::SpdyDataFlags flags)490 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
491                                  net::IOBuffer* data, int len,
492                                  spdy::SpdyDataFlags flags) {
493   // Find our stream
494   DCHECK(IsStreamActive(stream_id));
495   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
496   CHECK_EQ(stream->stream_id(), stream_id);
497   if (!stream)
498     return ERR_INVALID_SPDY_STREAM;
499 
500   if (len > kMaxSpdyFrameChunkSize) {
501     len = kMaxSpdyFrameChunkSize;
502     flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
503   }
504 
505   // Obey send window size of the stream if flow control is enabled.
506   if (use_flow_control_) {
507     if (stream->send_window_size() <= 0) {
508       // Because we queue frames onto the session, it is possible that
509       // a stream was not flow controlled at the time it attempted the
510       // write, but when we go to fulfill the write, it is now flow
511       // controlled.  This is why we need the session to mark the stream
512       // as stalled - because only the session knows for sure when the
513       // stall occurs.
514       stream->set_stalled_by_flow_control(true);
515       net_log().AddEvent(
516           NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
517           make_scoped_refptr(
518               new NetLogIntegerParameter("stream_id", stream_id)));
519       return ERR_IO_PENDING;
520     }
521     int new_len = std::min(len, stream->send_window_size());
522     if (new_len < len) {
523       len = new_len;
524       flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
525     }
526     stream->DecreaseSendWindowSize(len);
527   }
528 
529   if (net_log().IsLoggingAllEvents()) {
530     net_log().AddEvent(
531         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
532         make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
533   }
534 
535   // TODO(mbelshe): reduce memory copies here.
536   scoped_ptr<spdy::SpdyDataFrame> frame(
537       spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
538   QueueFrame(frame.get(), stream->priority(), stream);
539   return ERR_IO_PENDING;
540 }
541 
CloseStream(spdy::SpdyStreamId stream_id,int status)542 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
543   // TODO(mbelshe): We should send a RST_STREAM control frame here
544   //                so that the server can cancel a large send.
545 
546   DeleteStream(stream_id, status);
547 }
548 
ResetStream(spdy::SpdyStreamId stream_id,spdy::SpdyStatusCodes status)549 void SpdySession::ResetStream(
550     spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
551 
552   net_log().AddEvent(
553       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
554       make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
555 
556   scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
557       spdy_framer_.CreateRstStream(stream_id, status));
558 
559   // Default to lowest priority unless we know otherwise.
560   int priority = 3;
561   if(IsStreamActive(stream_id)) {
562     scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
563     priority = stream->priority();
564   }
565   QueueFrame(rst_frame.get(), priority, NULL);
566 
567   DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
568 }
569 
IsStreamActive(spdy::SpdyStreamId stream_id) const570 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
571   return ContainsKey(active_streams_, stream_id);
572 }
573 
GetLoadState() const574 LoadState SpdySession::GetLoadState() const {
575   // NOTE: The application only queries the LoadState via the
576   //       SpdyNetworkTransaction, and details are only needed when
577   //       we're in the process of connecting.
578 
579   // If we're connecting, defer to the connection to give us the actual
580   // LoadState.
581   if (state_ == CONNECTING)
582     return connection_->GetLoadState();
583 
584   // Just report that we're idle since the session could be doing
585   // many things concurrently.
586   return LOAD_STATE_IDLE;
587 }
588 
OnReadComplete(int bytes_read)589 void SpdySession::OnReadComplete(int bytes_read) {
590   // Parse a frame.  For now this code requires that the frame fit into our
591   // buffer (32KB).
592   // TODO(mbelshe): support arbitrarily large frames!
593 
594   read_pending_ = false;
595 
596   if (bytes_read <= 0) {
597     // Session is tearing down.
598     net::Error error = static_cast<net::Error>(bytes_read);
599     if (bytes_read == 0)
600       error = ERR_CONNECTION_CLOSED;
601     CloseSessionOnError(error, true);
602     return;
603   }
604 
605   bytes_received_ += bytes_read;
606 
607   // The SpdyFramer will use callbacks onto |this| as it parses frames.
608   // When errors occur, those callbacks can lead to teardown of all references
609   // to |this|, so maintain a reference to self during this call for safe
610   // cleanup.
611   scoped_refptr<SpdySession> self(this);
612 
613   char *data = read_buffer_->data();
614   while (bytes_read &&
615          spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
616     uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
617     bytes_read -= bytes_processed;
618     data += bytes_processed;
619     if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
620       spdy_framer_.Reset();
621   }
622 
623   if (state_ != CLOSED)
624     ReadSocket();
625 }
626 
OnWriteComplete(int result)627 void SpdySession::OnWriteComplete(int result) {
628   DCHECK(write_pending_);
629   DCHECK(in_flight_write_.size());
630 
631   write_pending_ = false;
632 
633   scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
634 
635   if (result >= 0) {
636     // It should not be possible to have written more bytes than our
637     // in_flight_write_.
638     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
639 
640     in_flight_write_.buffer()->DidConsume(result);
641 
642     // We only notify the stream when we've fully written the pending frame.
643     if (!in_flight_write_.buffer()->BytesRemaining()) {
644       if (stream) {
645         // Report the number of bytes written to the caller, but exclude the
646         // frame size overhead.  NOTE: if this frame was compressed the
647         // reported bytes written is the compressed size, not the original
648         // size.
649         if (result > 0) {
650           result = in_flight_write_.buffer()->size();
651           DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
652           result -= static_cast<int>(spdy::SpdyFrame::size());
653         }
654 
655         // It is possible that the stream was cancelled while we were writing
656         // to the socket.
657         if (!stream->cancelled())
658           stream->OnWriteComplete(result);
659       }
660 
661       // Cleanup the write which just completed.
662       in_flight_write_.release();
663     }
664 
665     // Write more data.  We're already in a continuation, so we can
666     // go ahead and write it immediately (without going back to the
667     // message loop).
668     WriteSocketLater();
669   } else {
670     in_flight_write_.release();
671 
672     // The stream is now errored.  Close it down.
673     CloseSessionOnError(static_cast<net::Error>(result), true);
674   }
675 }
676 
ReadSocket()677 net::Error SpdySession::ReadSocket() {
678   if (read_pending_)
679     return OK;
680 
681   if (state_ == CLOSED) {
682     NOTREACHED();
683     return ERR_UNEXPECTED;
684   }
685 
686   CHECK(connection_.get());
687   CHECK(connection_->socket());
688   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
689                                                kReadBufferSize,
690                                                &read_callback_);
691   switch (bytes_read) {
692     case 0:
693       // Socket is closed!
694       CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
695       return ERR_CONNECTION_CLOSED;
696     case net::ERR_IO_PENDING:
697       // Waiting for data.  Nothing to do now.
698       read_pending_ = true;
699       return ERR_IO_PENDING;
700     default:
701       // Data was read, process it.
702       // Schedule the work through the message loop to avoid recursive
703       // callbacks.
704       read_pending_ = true;
705       MessageLoop::current()->PostTask(
706           FROM_HERE,
707           method_factory_.NewRunnableMethod(
708               &SpdySession::OnReadComplete, bytes_read));
709       break;
710   }
711   return OK;
712 }
713 
WriteSocketLater()714 void SpdySession::WriteSocketLater() {
715   if (delayed_write_pending_)
716     return;
717 
718   if (state_ < CONNECTED)
719     return;
720 
721   delayed_write_pending_ = true;
722   MessageLoop::current()->PostTask(
723       FROM_HERE,
724       method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
725 }
726 
WriteSocket()727 void SpdySession::WriteSocket() {
728   // This function should only be called via WriteSocketLater.
729   DCHECK(delayed_write_pending_);
730   delayed_write_pending_ = false;
731 
732   // If the socket isn't connected yet, just wait; we'll get called
733   // again when the socket connection completes.  If the socket is
734   // closed, just return.
735   if (state_ < CONNECTED || state_ == CLOSED)
736     return;
737 
738   if (write_pending_)   // Another write is in progress still.
739     return;
740 
741   // Loop sending frames until we've sent everything or until the write
742   // returns error (or ERR_IO_PENDING).
743   while (in_flight_write_.buffer() || !queue_.empty()) {
744     if (!in_flight_write_.buffer()) {
745       // Grab the next SpdyFrame to send.
746       SpdyIOBuffer next_buffer = queue_.top();
747       queue_.pop();
748 
749       // We've deferred compression until just before we write it to the socket,
750       // which is now.  At this time, we don't compress our data frames.
751       spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
752       size_t size;
753       if (spdy_framer_.IsCompressible(uncompressed_frame)) {
754         scoped_ptr<spdy::SpdyFrame> compressed_frame(
755             spdy_framer_.CompressFrame(uncompressed_frame));
756         if (!compressed_frame.get()) {
757           LOG(ERROR) << "SPDY Compression failure";
758           CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
759           return;
760         }
761 
762         size = compressed_frame->length() + spdy::SpdyFrame::size();
763 
764         DCHECK_GT(size, 0u);
765 
766         // TODO(mbelshe): We have too much copying of data here.
767         IOBufferWithSize* buffer = new IOBufferWithSize(size);
768         memcpy(buffer->data(), compressed_frame->data(), size);
769 
770         // Attempt to send the frame.
771         in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
772       } else {
773         size = uncompressed_frame.length() + spdy::SpdyFrame::size();
774         in_flight_write_ = next_buffer;
775       }
776     } else {
777       DCHECK(in_flight_write_.buffer()->BytesRemaining());
778     }
779 
780     write_pending_ = true;
781     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
782         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
783     if (rv == net::ERR_IO_PENDING)
784       break;
785 
786     // We sent the frame successfully.
787     OnWriteComplete(rv);
788 
789     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
790     //                 as in an error state.
791     if (rv < 0)
792       break;
793   }
794 }
795 
CloseAllStreams(net::Error status)796 void SpdySession::CloseAllStreams(net::Error status) {
797   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
798   base::StatsCounter abandoned_push_streams(
799       "spdy.abandoned_push_streams");
800 
801   if (!active_streams_.empty())
802     abandoned_streams.Add(active_streams_.size());
803   if (!unclaimed_pushed_streams_.empty()) {
804     streams_abandoned_count_ += unclaimed_pushed_streams_.size();
805     abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
806     unclaimed_pushed_streams_.clear();
807   }
808 
809   for (int i = 0;i < NUM_PRIORITIES;++i) {
810     while (!create_stream_queues_[i].empty()) {
811       PendingCreateStream pending_create = create_stream_queues_[i].front();
812       create_stream_queues_[i].pop();
813       pending_create.callback->Run(ERR_ABORTED);
814     }
815   }
816 
817   while (!active_streams_.empty()) {
818     ActiveStreamMap::iterator it = active_streams_.begin();
819     const scoped_refptr<SpdyStream>& stream = it->second;
820     DCHECK(stream);
821     LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
822                  << "): " << stream->path();
823     DeleteStream(stream->stream_id(), status);
824   }
825 
826   // We also need to drain the queue.
827   while (queue_.size())
828     queue_.pop();
829 }
830 
GetNewStreamId()831 int SpdySession::GetNewStreamId() {
832   int id = stream_hi_water_mark_;
833   stream_hi_water_mark_ += 2;
834   if (stream_hi_water_mark_ > 0x7fff)
835     stream_hi_water_mark_ = 1;
836   return id;
837 }
838 
QueueFrame(spdy::SpdyFrame * frame,spdy::SpdyPriority priority,SpdyStream * stream)839 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
840                              spdy::SpdyPriority priority,
841                              SpdyStream* stream) {
842   int length = spdy::SpdyFrame::size() + frame->length();
843   IOBuffer* buffer = new IOBuffer(length);
844   memcpy(buffer->data(), frame->data(), length);
845   queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
846 
847   WriteSocketLater();
848 }
849 
CloseSessionOnError(net::Error err,bool remove_from_pool)850 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
851   // Closing all streams can have a side-effect of dropping the last reference
852   // to |this|.  Hold a reference through this function.
853   scoped_refptr<SpdySession> self(this);
854 
855   DCHECK_LT(err, OK);
856   net_log_.AddEvent(
857       NetLog::TYPE_SPDY_SESSION_CLOSE,
858       make_scoped_refptr(new NetLogIntegerParameter("status", err)));
859 
860   // Don't close twice.  This can occur because we can have both
861   // a read and a write outstanding, and each can complete with
862   // an error.
863   if (state_ != CLOSED) {
864     state_ = CLOSED;
865     error_ = err;
866     if (remove_from_pool)
867       RemoveFromPool();
868     CloseAllStreams(err);
869   }
870 }
871 
GetInfoAsValue() const872 Value* SpdySession::GetInfoAsValue() const {
873   DictionaryValue* dict = new DictionaryValue();
874 
875   dict->SetInteger("source_id", net_log_.source().id);
876 
877   dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
878   dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
879 
880   dict->SetInteger("active_streams", active_streams_.size());
881 
882   dict->SetInteger("unclaimed_pushed_streams",
883       unclaimed_pushed_streams_.size());
884 
885   dict->SetBoolean("is_secure", is_secure_);
886 
887   dict->SetInteger("error", error_);
888   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
889 
890   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
891   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
892   dict->SetInteger("streams_pushed_and_claimed_count",
893       streams_pushed_and_claimed_count_);
894   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
895   dict->SetInteger("frames_received", frames_received_);
896 
897   dict->SetBoolean("sent_settings", sent_settings_);
898   dict->SetBoolean("received_settings", received_settings_);
899   return dict;
900 }
901 
GetPeerAddress(AddressList * address) const902 int SpdySession::GetPeerAddress(AddressList* address) const {
903   if (!connection_->socket())
904     return ERR_SOCKET_NOT_CONNECTED;
905 
906   return connection_->socket()->GetPeerAddress(address);
907 }
908 
GetLocalAddress(IPEndPoint * address) const909 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
910   if (!connection_->socket())
911     return ERR_SOCKET_NOT_CONNECTED;
912 
913   return connection_->socket()->GetLocalAddress(address);
914 }
915 
ActivateStream(SpdyStream * stream)916 void SpdySession::ActivateStream(SpdyStream* stream) {
917   const spdy::SpdyStreamId id = stream->stream_id();
918   DCHECK(!IsStreamActive(id));
919 
920   active_streams_[id] = stream;
921 }
922 
DeleteStream(spdy::SpdyStreamId id,int status)923 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
924   // For push streams, if they are being deleted normally, we leave
925   // the stream in the unclaimed_pushed_streams_ list.  However, if
926   // the stream is errored out, clean it up entirely.
927   if (status != OK) {
928     PushedStreamMap::iterator it;
929     for (it = unclaimed_pushed_streams_.begin();
930          it != unclaimed_pushed_streams_.end(); ++it) {
931       scoped_refptr<SpdyStream> curr = it->second;
932       if (id == curr->stream_id()) {
933         unclaimed_pushed_streams_.erase(it);
934         break;
935       }
936     }
937   }
938 
939   // The stream might have been deleted.
940   ActiveStreamMap::iterator it2 = active_streams_.find(id);
941   if (it2 == active_streams_.end())
942     return;
943 
944   // If this is an active stream, call the callback.
945   const scoped_refptr<SpdyStream> stream(it2->second);
946   active_streams_.erase(it2);
947   if (stream)
948     stream->OnClose(status);
949   ProcessPendingCreateStreams();
950 }
951 
RemoveFromPool()952 void SpdySession::RemoveFromPool() {
953   if (spdy_session_pool_) {
954     spdy_session_pool_->Remove(make_scoped_refptr(this));
955     spdy_session_pool_ = NULL;
956   }
957 }
958 
GetActivePushStream(const std::string & path)959 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
960     const std::string& path) {
961   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
962 
963   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
964   if (it != unclaimed_pushed_streams_.end()) {
965     net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
966     scoped_refptr<SpdyStream> stream = it->second;
967     unclaimed_pushed_streams_.erase(it);
968     used_push_streams.Increment();
969     return stream;
970   }
971   return NULL;
972 }
973 
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)974 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
975   if (is_secure_) {
976     SSLClientSocket* ssl_socket =
977         reinterpret_cast<SSLClientSocket*>(connection_->socket());
978     ssl_socket->GetSSLInfo(ssl_info);
979     *was_npn_negotiated = ssl_socket->was_npn_negotiated();
980     return true;
981   }
982   return false;
983 }
984 
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)985 bool SpdySession::GetSSLCertRequestInfo(
986     SSLCertRequestInfo* cert_request_info) {
987   if (is_secure_) {
988     SSLClientSocket* ssl_socket =
989         reinterpret_cast<SSLClientSocket*>(connection_->socket());
990     ssl_socket->GetSSLCertRequestInfo(cert_request_info);
991     return true;
992   }
993   return false;
994 }
995 
OnError(spdy::SpdyFramer * framer)996 void SpdySession::OnError(spdy::SpdyFramer* framer) {
997   CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
998 }
999 
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1000 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1001                                     const char* data,
1002                                     size_t len) {
1003   if (net_log().IsLoggingAllEvents()) {
1004     net_log().AddEvent(
1005         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1006         make_scoped_refptr(new NetLogSpdyDataParameter(
1007             stream_id, len, spdy::SpdyDataFlags())));
1008   }
1009 
1010   if (!IsStreamActive(stream_id)) {
1011     // NOTE:  it may just be that the stream was cancelled.
1012     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
1013     return;
1014   }
1015 
1016   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1017   stream->OnDataReceived(data, len);
1018 }
1019 
Respond(const spdy::SpdyHeaderBlock & headers,const scoped_refptr<SpdyStream> stream)1020 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
1021                           const scoped_refptr<SpdyStream> stream) {
1022   int rv = OK;
1023   rv = stream->OnResponseReceived(headers);
1024   if (rv < 0) {
1025     DCHECK_NE(rv, ERR_IO_PENDING);
1026     const spdy::SpdyStreamId stream_id = stream->stream_id();
1027     DeleteStream(stream_id, rv);
1028     return false;
1029   }
1030   return true;
1031 }
1032 
OnSyn(const spdy::SpdySynStreamControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1033 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
1034                         const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1035   spdy::SpdyStreamId stream_id = frame.stream_id();
1036   spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
1037 
1038   if (net_log_.IsLoggingAllEvents()) {
1039     net_log_.AddEvent(
1040         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1041         make_scoped_refptr(new NetLogSpdySynParameter(
1042             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1043             stream_id, associated_stream_id)));
1044   }
1045 
1046   // Server-initiated streams should have even sequence numbers.
1047   if ((stream_id & 0x1) != 0) {
1048     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1049     return;
1050   }
1051 
1052   if (IsStreamActive(stream_id)) {
1053     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1054     return;
1055   }
1056 
1057   if (associated_stream_id == 0) {
1058     LOG(WARNING) << "Received invalid OnSyn associated stream id "
1059                  << associated_stream_id
1060                  << " for stream " << stream_id;
1061     ResetStream(stream_id, spdy::INVALID_STREAM);
1062     return;
1063   }
1064 
1065   streams_pushed_count_++;
1066 
1067   // TODO(mbelshe): DCHECK that this is a GET method?
1068 
1069   // Verify that the response had a URL for us.
1070   const std::string& url = ContainsKey(*headers, "url") ?
1071       headers->find("url")->second : "";
1072   if (url.empty()) {
1073     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1074     LOG(WARNING) << "Pushed stream did not contain a url.";
1075     return;
1076   }
1077 
1078   GURL gurl(url);
1079   if (!gurl.is_valid()) {
1080     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1081     LOG(WARNING) << "Pushed stream url was invalid: " << url;
1082     return;
1083   }
1084 
1085   // Verify we have a valid stream association.
1086   if (!IsStreamActive(associated_stream_id)) {
1087     LOG(WARNING) << "Received OnSyn with inactive associated stream "
1088                << associated_stream_id;
1089     ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
1090     return;
1091   }
1092 
1093   scoped_refptr<SpdyStream> associated_stream =
1094       active_streams_[associated_stream_id];
1095   GURL associated_url(associated_stream->GetUrl());
1096   if (associated_url.GetOrigin() != gurl.GetOrigin()) {
1097     LOG(WARNING) << "Rejected Cross Origin Push Stream "
1098                  << associated_stream_id;
1099     ResetStream(stream_id, spdy::REFUSED_STREAM);
1100     return;
1101   }
1102 
1103   // There should not be an existing pushed stream with the same path.
1104   PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1105   if (it != unclaimed_pushed_streams_.end()) {
1106     LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
1107     ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1108     return;
1109   }
1110 
1111   scoped_refptr<SpdyStream> stream(
1112       new SpdyStream(this, stream_id, true, net_log_));
1113 
1114   stream->set_path(gurl.PathForRequest());
1115 
1116   unclaimed_pushed_streams_[url] = stream;
1117 
1118   ActivateStream(stream);
1119   stream->set_response_received();
1120 
1121   // Parse the headers.
1122   if (!Respond(*headers, stream))
1123     return;
1124 
1125   base::StatsCounter push_requests("spdy.pushed_streams");
1126   push_requests.Increment();
1127 }
1128 
OnSynReply(const spdy::SpdySynReplyControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1129 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
1130                              const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1131   spdy::SpdyStreamId stream_id = frame.stream_id();
1132 
1133   bool valid_stream = IsStreamActive(stream_id);
1134   if (!valid_stream) {
1135     // NOTE:  it may just be that the stream was cancelled.
1136     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
1137     return;
1138   }
1139 
1140   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1141   CHECK_EQ(stream->stream_id(), stream_id);
1142   CHECK(!stream->cancelled());
1143 
1144   if (stream->response_received()) {
1145     LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
1146     CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
1147     return;
1148   }
1149   stream->set_response_received();
1150 
1151   if (net_log().IsLoggingAllEvents()) {
1152     net_log().AddEvent(
1153         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1154         make_scoped_refptr(new NetLogSpdySynParameter(
1155             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1156             stream_id, 0)));
1157   }
1158 
1159   Respond(*headers, stream);
1160 }
1161 
OnHeaders(const spdy::SpdyHeadersControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1162 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
1163                             const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1164   spdy::SpdyStreamId stream_id = frame.stream_id();
1165 
1166   bool valid_stream = IsStreamActive(stream_id);
1167   if (!valid_stream) {
1168     // NOTE:  it may just be that the stream was cancelled.
1169     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
1170     return;
1171   }
1172 
1173   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1174   CHECK_EQ(stream->stream_id(), stream_id);
1175   CHECK(!stream->cancelled());
1176 
1177   if (net_log().IsLoggingAllEvents()) {
1178     net_log().AddEvent(
1179         NetLog::TYPE_SPDY_SESSION_HEADERS,
1180         make_scoped_refptr(new NetLogSpdySynParameter(
1181             headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1182             stream_id, 0)));
1183   }
1184 
1185   int rv = stream->OnHeaders(*headers);
1186   if (rv < 0) {
1187     DCHECK_NE(rv, ERR_IO_PENDING);
1188     const spdy::SpdyStreamId stream_id = stream->stream_id();
1189     DeleteStream(stream_id, rv);
1190   }
1191 }
1192 
OnControl(const spdy::SpdyControlFrame * frame)1193 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
1194   const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
1195   uint32 type = frame->type();
1196   if (type == spdy::SYN_STREAM ||
1197       type == spdy::SYN_REPLY ||
1198       type == spdy::HEADERS) {
1199     if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
1200       LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
1201       int stream_id = 0;
1202       if (type == spdy::SYN_STREAM) {
1203         stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
1204                      (frame))->stream_id();
1205       } else if (type == spdy::SYN_REPLY) {
1206         stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
1207                      (frame))->stream_id();
1208       } else if (type == spdy::HEADERS) {
1209         stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
1210                      (frame))->stream_id();
1211       }
1212       if(IsStreamActive(stream_id))
1213         ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1214       return;
1215     }
1216   }
1217 
1218   frames_received_++;
1219 
1220   switch (type) {
1221     case spdy::GOAWAY:
1222       OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
1223       break;
1224     case spdy::SETTINGS:
1225       OnSettings(
1226           *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
1227       break;
1228     case spdy::RST_STREAM:
1229       OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
1230       break;
1231     case spdy::SYN_STREAM:
1232       OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
1233             headers);
1234       break;
1235     case spdy::HEADERS:
1236       OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
1237                 headers);
1238       break;
1239     case spdy::SYN_REPLY:
1240       OnSynReply(
1241           *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
1242           headers);
1243       break;
1244     case spdy::WINDOW_UPDATE:
1245       OnWindowUpdate(
1246           *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
1247       break;
1248     default:
1249       DCHECK(false);  // Error!
1250   }
1251 }
1252 
OnRst(const spdy::SpdyRstStreamControlFrame & frame)1253 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
1254   spdy::SpdyStreamId stream_id = frame.stream_id();
1255 
1256   net_log().AddEvent(
1257       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
1258       make_scoped_refptr(
1259           new NetLogSpdyRstParameter(stream_id, frame.status())));
1260 
1261   bool valid_stream = IsStreamActive(stream_id);
1262   if (!valid_stream) {
1263     // NOTE:  it may just be that the stream was cancelled.
1264     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1265     return;
1266   }
1267   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1268   CHECK_EQ(stream->stream_id(), stream_id);
1269   CHECK(!stream->cancelled());
1270 
1271   if (frame.status() == 0) {
1272     stream->OnDataReceived(NULL, 0);
1273   } else {
1274     LOG(ERROR) << "Spdy stream closed: " << frame.status();
1275     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1276     //                For now, it doesn't matter much - it is a protocol error.
1277     DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
1278   }
1279 }
1280 
OnGoAway(const spdy::SpdyGoAwayControlFrame & frame)1281 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
1282   net_log_.AddEvent(
1283       NetLog::TYPE_SPDY_SESSION_GOAWAY,
1284       make_scoped_refptr(
1285           new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
1286                                         active_streams_.size(),
1287                                         unclaimed_pushed_streams_.size())));
1288   RemoveFromPool();
1289   CloseAllStreams(net::ERR_ABORTED);
1290 
1291   // TODO(willchan): Cancel any streams that are past the GoAway frame's
1292   // |last_accepted_stream_id|.
1293 
1294   // Don't bother killing any streams that are still reading.  They'll either
1295   // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
1296   // closed.
1297 }
1298 
OnSettings(const spdy::SpdySettingsControlFrame & frame)1299 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
1300   spdy::SpdySettings settings;
1301   if (spdy_framer_.ParseSettings(&frame, &settings)) {
1302     HandleSettings(settings);
1303     spdy_settings_->Set(host_port_pair(), settings);
1304   }
1305 
1306   received_settings_ = true;
1307 
1308   net_log_.AddEvent(
1309       NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1310       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1311 }
1312 
OnWindowUpdate(const spdy::SpdyWindowUpdateControlFrame & frame)1313 void SpdySession::OnWindowUpdate(
1314     const spdy::SpdyWindowUpdateControlFrame& frame) {
1315   spdy::SpdyStreamId stream_id = frame.stream_id();
1316   if (!IsStreamActive(stream_id)) {
1317     LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
1318     return;
1319   }
1320 
1321   int delta_window_size = static_cast<int>(frame.delta_window_size());
1322   if (delta_window_size < 1) {
1323     LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
1324                  << delta_window_size;
1325     ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
1326     return;
1327   }
1328 
1329   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1330   CHECK_EQ(stream->stream_id(), stream_id);
1331   CHECK(!stream->cancelled());
1332 
1333   if (use_flow_control_)
1334     stream->IncreaseSendWindowSize(delta_window_size);
1335 
1336   net_log_.AddEvent(
1337       NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
1338       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1339           stream_id, delta_window_size, stream->send_window_size())));
1340 }
1341 
SendWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1342 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
1343                                    int delta_window_size) {
1344   DCHECK(IsStreamActive(stream_id));
1345   scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1346   CHECK_EQ(stream->stream_id(), stream_id);
1347 
1348   net_log_.AddEvent(
1349       NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
1350       make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1351           stream_id, delta_window_size, stream->recv_window_size())));
1352 
1353   scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
1354       spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
1355   QueueFrame(window_update_frame.get(), stream->priority(), stream);
1356 }
1357 
1358 // Given a cwnd that we would have sent to the server, modify it based on the
1359 // field trial policy.
ApplyCwndFieldTrialPolicy(int cwnd)1360 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1361   base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1362   if (!trial) {
1363       LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1364       return cwnd;
1365   }
1366   if (trial->group_name() == "cwnd10")
1367     return 10;
1368   else if (trial->group_name() == "cwnd16")
1369     return 16;
1370   else if (trial->group_name() == "cwndMin16")
1371     return std::max(cwnd, 16);
1372   else if (trial->group_name() == "cwndMin10")
1373     return std::max(cwnd, 10);
1374   else if (trial->group_name() == "cwndDynamic")
1375     return cwnd;
1376   NOTREACHED();
1377   return cwnd;
1378 }
1379 
SendSettings()1380 void SpdySession::SendSettings() {
1381   // Note:  we're copying the settings here, so that we can potentially modify
1382   // the settings for the field trial.  When removing the field trial, make
1383   // this a reference to the const SpdySettings again.
1384   spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
1385   if (settings.empty())
1386     return;
1387 
1388   // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
1389   for (spdy::SpdySettings::iterator i = settings.begin(),
1390            end = settings.end(); i != end; ++i) {
1391     const uint32 id = i->first.id();
1392     const uint32 val = i->second;
1393     switch (id) {
1394       case spdy::SETTINGS_CURRENT_CWND:
1395         uint32 cwnd = 0;
1396         cwnd = ApplyCwndFieldTrialPolicy(val);
1397         UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
1398                                     cwnd,
1399                                     1, 200, 100);
1400         if (cwnd != val) {
1401           i->second = cwnd;
1402           i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
1403           spdy_settings_->Set(host_port_pair(), settings);
1404         }
1405         break;
1406     }
1407   }
1408 
1409   HandleSettings(settings);
1410 
1411   net_log_.AddEvent(
1412       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1413       make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1414 
1415   // Create the SETTINGS frame and send it.
1416   scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
1417       spdy_framer_.CreateSettings(settings));
1418   sent_settings_ = true;
1419   QueueFrame(settings_frame.get(), 0, NULL);
1420 }
1421 
HandleSettings(const spdy::SpdySettings & settings)1422 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
1423   for (spdy::SpdySettings::const_iterator i = settings.begin(),
1424            end = settings.end(); i != end; ++i) {
1425     const uint32 id = i->first.id();
1426     const uint32 val = i->second;
1427     switch (id) {
1428       case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
1429         max_concurrent_streams_ = std::min(static_cast<size_t>(val),
1430                                            max_concurrent_stream_limit_);
1431         ProcessPendingCreateStreams();
1432         break;
1433     }
1434   }
1435 }
1436 
RecordHistograms()1437 void SpdySession::RecordHistograms() {
1438   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
1439                               streams_initiated_count_,
1440                               0, 300, 50);
1441   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
1442                               streams_pushed_count_,
1443                               0, 300, 50);
1444   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
1445                               streams_pushed_and_claimed_count_,
1446                               0, 300, 50);
1447   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
1448                               streams_abandoned_count_,
1449                               0, 300, 50);
1450   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
1451                             sent_settings_ ? 1 : 0, 2);
1452   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
1453                             received_settings_ ? 1 : 0, 2);
1454   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
1455                               stalled_streams_,
1456                               0, 300, 50);
1457   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
1458                             stalled_streams_ > 0 ? 1 : 0, 2);
1459 
1460   if (received_settings_) {
1461     // Enumerate the saved settings, and set histograms for it.
1462     const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
1463 
1464     spdy::SpdySettings::const_iterator it;
1465     for (it = settings.begin(); it != settings.end(); ++it) {
1466       const spdy::SpdySetting setting = *it;
1467       switch (setting.first.id()) {
1468         case spdy::SETTINGS_CURRENT_CWND:
1469           // Record several different histograms to see if cwnd converges
1470           // for larger volumes of data being sent.
1471           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
1472                                       setting.second,
1473                                       1, 200, 100);
1474           if (bytes_received_ > 10 * 1024) {
1475             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
1476                                         setting.second,
1477                                         1, 200, 100);
1478             if (bytes_received_ > 25 * 1024) {
1479               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
1480                                           setting.second,
1481                                           1, 200, 100);
1482               if (bytes_received_ > 50 * 1024) {
1483                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
1484                                             setting.second,
1485                                             1, 200, 100);
1486                 if (bytes_received_ > 100 * 1024) {
1487                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
1488                                               setting.second,
1489                                               1, 200, 100);
1490                 }
1491               }
1492             }
1493           }
1494           break;
1495         case spdy::SETTINGS_ROUND_TRIP_TIME:
1496           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
1497                                       setting.second,
1498                                       1, 1200, 100);
1499           break;
1500         case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
1501           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
1502                                       setting.second,
1503                                       1, 100, 50);
1504           break;
1505       }
1506     }
1507   }
1508 }
1509 
InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream> * stream)1510 void SpdySession::InvokeUserStreamCreationCallback(
1511     scoped_refptr<SpdyStream>* stream) {
1512   PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
1513 
1514   // Exit if the request has already been cancelled.
1515   if (it == pending_callback_map_.end())
1516     return;
1517 
1518   CompletionCallback* callback = it->second.callback;
1519   int result = it->second.result;
1520   pending_callback_map_.erase(it);
1521   callback->Run(result);
1522 }
1523 
1524 }  // namespace net
1525