1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/memory/linked_ptr.h"
10 #include "base/message_loop.h"
11 #include "base/metrics/field_trial.h"
12 #include "base/metrics/stats_counters.h"
13 #include "base/stl_util-inl.h"
14 #include "base/string_number_conversions.h"
15 #include "base/string_util.h"
16 #include "base/stringprintf.h"
17 #include "base/time.h"
18 #include "base/utf_string_conversions.h"
19 #include "base/values.h"
20 #include "net/base/connection_type_histograms.h"
21 #include "net/base/net_log.h"
22 #include "net/base/net_util.h"
23 #include "net/http/http_network_session.h"
24 #include "net/socket/ssl_client_socket.h"
25 #include "net/spdy/spdy_frame_builder.h"
26 #include "net/spdy/spdy_http_utils.h"
27 #include "net/spdy/spdy_protocol.h"
28 #include "net/spdy/spdy_session_pool.h"
29 #include "net/spdy/spdy_settings_storage.h"
30 #include "net/spdy/spdy_stream.h"
31
32 namespace net {
33
NetLogSpdySynParameter(const linked_ptr<spdy::SpdyHeaderBlock> & headers,spdy::SpdyControlFlags flags,spdy::SpdyStreamId id,spdy::SpdyStreamId associated_stream)34 NetLogSpdySynParameter::NetLogSpdySynParameter(
35 const linked_ptr<spdy::SpdyHeaderBlock>& headers,
36 spdy::SpdyControlFlags flags,
37 spdy::SpdyStreamId id,
38 spdy::SpdyStreamId associated_stream)
39 : headers_(headers),
40 flags_(flags),
41 id_(id),
42 associated_stream_(associated_stream) {
43 }
44
~NetLogSpdySynParameter()45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
46 }
47
ToValue() const48 Value* NetLogSpdySynParameter::ToValue() const {
49 DictionaryValue* dict = new DictionaryValue();
50 ListValue* headers_list = new ListValue();
51 for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
52 it != headers_->end(); ++it) {
53 headers_list->Append(new StringValue(base::StringPrintf(
54 "%s: %s", it->first.c_str(), it->second.c_str())));
55 }
56 dict->SetInteger("flags", flags_);
57 dict->Set("headers", headers_list);
58 dict->SetInteger("id", id_);
59 if (associated_stream_)
60 dict->SetInteger("associated_stream", associated_stream_);
61 return dict;
62 }
63
64 namespace {
65
66 const int kReadBufferSize = 8 * 1024;
67
68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
69 public:
NetLogSpdySessionParameter(const HostPortProxyPair & host_pair)70 NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
71 : host_pair_(host_pair) {}
ToValue() const72 virtual Value* ToValue() const {
73 DictionaryValue* dict = new DictionaryValue();
74 dict->Set("host", new StringValue(host_pair_.first.ToString()));
75 dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
76 return dict;
77 }
78 private:
79 const HostPortProxyPair host_pair_;
80 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
81 };
82
83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
84 public:
NetLogSpdySettingsParameter(const spdy::SpdySettings & settings)85 explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
86 : settings_(settings) {}
87
ToValue() const88 virtual Value* ToValue() const {
89 DictionaryValue* dict = new DictionaryValue();
90 ListValue* settings = new ListValue();
91 for (spdy::SpdySettings::const_iterator it = settings_.begin();
92 it != settings_.end(); ++it) {
93 settings->Append(new StringValue(
94 base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
95 }
96 dict->Set("settings", settings);
97 return dict;
98 }
99
100 private:
~NetLogSpdySettingsParameter()101 ~NetLogSpdySettingsParameter() {}
102 const spdy::SpdySettings settings_;
103
104 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
105 };
106
107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
108 public:
NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)109 NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
110 int delta,
111 int window_size)
112 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
113
ToValue() const114 virtual Value* ToValue() const {
115 DictionaryValue* dict = new DictionaryValue();
116 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
117 dict->SetInteger("delta", delta_);
118 dict->SetInteger("window_size", window_size_);
119 return dict;
120 }
121
122 private:
~NetLogSpdyWindowUpdateParameter()123 ~NetLogSpdyWindowUpdateParameter() {}
124 const spdy::SpdyStreamId stream_id_;
125 const int delta_;
126 const int window_size_;
127
128 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
129 };
130
131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
132 public:
NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,int size,spdy::SpdyDataFlags flags)133 NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
134 int size,
135 spdy::SpdyDataFlags flags)
136 : stream_id_(stream_id), size_(size), flags_(flags) {}
137
ToValue() const138 virtual Value* ToValue() const {
139 DictionaryValue* dict = new DictionaryValue();
140 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
141 dict->SetInteger("size", size_);
142 dict->SetInteger("flags", static_cast<int>(flags_));
143 return dict;
144 }
145
146 private:
~NetLogSpdyDataParameter()147 ~NetLogSpdyDataParameter() {}
148 const spdy::SpdyStreamId stream_id_;
149 const int size_;
150 const spdy::SpdyDataFlags flags_;
151
152 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
153 };
154
155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
156 public:
NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id,int status)157 NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
158 : stream_id_(stream_id), status_(status) {}
159
ToValue() const160 virtual Value* ToValue() const {
161 DictionaryValue* dict = new DictionaryValue();
162 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
163 dict->SetInteger("status", status_);
164 return dict;
165 }
166
167 private:
~NetLogSpdyRstParameter()168 ~NetLogSpdyRstParameter() {}
169 const spdy::SpdyStreamId stream_id_;
170 const int status_;
171
172 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
173 };
174
175 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
176 public:
NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,int active_streams,int unclaimed_streams)177 NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
178 int active_streams,
179 int unclaimed_streams)
180 : last_stream_id_(last_stream_id),
181 active_streams_(active_streams),
182 unclaimed_streams_(unclaimed_streams) {}
183
ToValue() const184 virtual Value* ToValue() const {
185 DictionaryValue* dict = new DictionaryValue();
186 dict->SetInteger("last_accepted_stream_id",
187 static_cast<int>(last_stream_id_));
188 dict->SetInteger("active_streams", active_streams_);
189 dict->SetInteger("unclaimed_streams", unclaimed_streams_);
190 return dict;
191 }
192
193 private:
~NetLogSpdyGoAwayParameter()194 ~NetLogSpdyGoAwayParameter() {}
195 const spdy::SpdyStreamId last_stream_id_;
196 const int active_streams_;
197 const int unclaimed_streams_;
198
199 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
200 };
201
202 } // namespace
203
204 // static
205 bool SpdySession::use_ssl_ = true;
206
207 // static
208 bool SpdySession::use_flow_control_ = false;
209
210 // static
211 size_t SpdySession::max_concurrent_stream_limit_ = 256;
212
SpdySession(const HostPortProxyPair & host_port_proxy_pair,SpdySessionPool * spdy_session_pool,SpdySettingsStorage * spdy_settings,NetLog * net_log)213 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
214 SpdySessionPool* spdy_session_pool,
215 SpdySettingsStorage* spdy_settings,
216 NetLog* net_log)
217 : ALLOW_THIS_IN_INITIALIZER_LIST(
218 read_callback_(this, &SpdySession::OnReadComplete)),
219 ALLOW_THIS_IN_INITIALIZER_LIST(
220 write_callback_(this, &SpdySession::OnWriteComplete)),
221 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
222 host_port_proxy_pair_(host_port_proxy_pair),
223 spdy_session_pool_(spdy_session_pool),
224 spdy_settings_(spdy_settings),
225 connection_(new ClientSocketHandle),
226 read_buffer_(new IOBuffer(kReadBufferSize)),
227 read_pending_(false),
228 stream_hi_water_mark_(1), // Always start at 1 for the first stream id.
229 write_pending_(false),
230 delayed_write_pending_(false),
231 is_secure_(false),
232 certificate_error_code_(OK),
233 error_(OK),
234 state_(IDLE),
235 max_concurrent_streams_(kDefaultMaxConcurrentStreams),
236 streams_initiated_count_(0),
237 streams_pushed_count_(0),
238 streams_pushed_and_claimed_count_(0),
239 streams_abandoned_count_(0),
240 frames_received_(0),
241 bytes_received_(0),
242 sent_settings_(false),
243 received_settings_(false),
244 stalled_streams_(0),
245 initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
246 initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
247 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
248 DCHECK(HttpStreamFactory::spdy_enabled());
249 net_log_.BeginEvent(
250 NetLog::TYPE_SPDY_SESSION,
251 make_scoped_refptr(
252 new NetLogSpdySessionParameter(host_port_proxy_pair_)));
253
254 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
255
256 spdy_framer_.set_visitor(this);
257
258 SendSettings();
259 }
260
~SpdySession()261 SpdySession::~SpdySession() {
262 if (state_ != CLOSED) {
263 state_ = CLOSED;
264
265 // Cleanup all the streams.
266 CloseAllStreams(net::ERR_ABORTED);
267 }
268
269 if (connection_->is_initialized()) {
270 // With Spdy we can't recycle sockets.
271 connection_->socket()->Disconnect();
272 }
273
274 // Streams should all be gone now.
275 DCHECK_EQ(0u, num_active_streams());
276 DCHECK_EQ(0u, num_unclaimed_pushed_streams());
277
278 DCHECK(pending_callback_map_.empty());
279
280 RecordHistograms();
281
282 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
283 }
284
InitializeWithSocket(ClientSocketHandle * connection,bool is_secure,int certificate_error_code)285 net::Error SpdySession::InitializeWithSocket(
286 ClientSocketHandle* connection,
287 bool is_secure,
288 int certificate_error_code) {
289 base::StatsCounter spdy_sessions("spdy.sessions");
290 spdy_sessions.Increment();
291
292 state_ = CONNECTED;
293 connection_.reset(connection);
294 is_secure_ = is_secure;
295 certificate_error_code_ = certificate_error_code;
296
297 // Write out any data that we might have to send, such as the settings frame.
298 WriteSocketLater();
299 net::Error error = ReadSocket();
300 if (error == ERR_IO_PENDING)
301 return OK;
302 return error;
303 }
304
VerifyDomainAuthentication(const std::string & domain)305 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
306 if (state_ != CONNECTED)
307 return false;
308
309 SSLInfo ssl_info;
310 bool was_npn_negotiated;
311 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
312 return true; // This is not a secure session, so all domains are okay.
313
314 return ssl_info.cert->VerifyNameMatch(domain);
315 }
316
GetPushStream(const GURL & url,scoped_refptr<SpdyStream> * stream,const BoundNetLog & stream_net_log)317 int SpdySession::GetPushStream(
318 const GURL& url,
319 scoped_refptr<SpdyStream>* stream,
320 const BoundNetLog& stream_net_log) {
321 CHECK_NE(state_, CLOSED);
322
323 *stream = NULL;
324
325 // Don't allow access to secure push streams over an unauthenticated, but
326 // encrypted SSL socket.
327 if (is_secure_ && certificate_error_code_ != OK &&
328 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
329 LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
330 << "unauthenticated session.";
331 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
332 return ERR_SPDY_PROTOCOL_ERROR;
333 }
334
335 *stream = GetActivePushStream(url.spec());
336 if (stream->get()) {
337 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
338 streams_pushed_and_claimed_count_++;
339 return OK;
340 }
341 return 0;
342 }
343
CreateStream(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log,CompletionCallback * callback)344 int SpdySession::CreateStream(
345 const GURL& url,
346 RequestPriority priority,
347 scoped_refptr<SpdyStream>* spdy_stream,
348 const BoundNetLog& stream_net_log,
349 CompletionCallback* callback) {
350 if (!max_concurrent_streams_ ||
351 active_streams_.size() < max_concurrent_streams_) {
352 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
353 }
354
355 stalled_streams_++;
356 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
357 create_stream_queues_[priority].push(
358 PendingCreateStream(url, priority, spdy_stream,
359 stream_net_log, callback));
360 return ERR_IO_PENDING;
361 }
362
ProcessPendingCreateStreams()363 void SpdySession::ProcessPendingCreateStreams() {
364 while (!max_concurrent_streams_ ||
365 active_streams_.size() < max_concurrent_streams_) {
366 bool no_pending_create_streams = true;
367 for (int i = 0;i < NUM_PRIORITIES;++i) {
368 if (!create_stream_queues_[i].empty()) {
369 PendingCreateStream pending_create = create_stream_queues_[i].front();
370 create_stream_queues_[i].pop();
371 no_pending_create_streams = false;
372 int error = CreateStreamImpl(*pending_create.url,
373 pending_create.priority,
374 pending_create.spdy_stream,
375 *pending_create.stream_net_log);
376 scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
377 DCHECK(!ContainsKey(pending_callback_map_, stream));
378 pending_callback_map_[stream] =
379 CallbackResultPair(pending_create.callback, error);
380 MessageLoop::current()->PostTask(
381 FROM_HERE,
382 method_factory_.NewRunnableMethod(
383 &SpdySession::InvokeUserStreamCreationCallback, stream));
384 break;
385 }
386 }
387 if (no_pending_create_streams)
388 return; // there were no streams in any queue
389 }
390 }
391
CancelPendingCreateStreams(const scoped_refptr<SpdyStream> * spdy_stream)392 void SpdySession::CancelPendingCreateStreams(
393 const scoped_refptr<SpdyStream>* spdy_stream) {
394 PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
395 if (it != pending_callback_map_.end()) {
396 pending_callback_map_.erase(it);
397 return;
398 }
399
400 for (int i = 0;i < NUM_PRIORITIES;++i) {
401 PendingCreateStreamQueue tmp;
402 // Make a copy removing this trans
403 while (!create_stream_queues_[i].empty()) {
404 PendingCreateStream pending_create = create_stream_queues_[i].front();
405 create_stream_queues_[i].pop();
406 if (pending_create.spdy_stream != spdy_stream)
407 tmp.push(pending_create);
408 }
409 // Now copy it back
410 while (!tmp.empty()) {
411 create_stream_queues_[i].push(tmp.front());
412 tmp.pop();
413 }
414 }
415 }
416
CreateStreamImpl(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log)417 int SpdySession::CreateStreamImpl(
418 const GURL& url,
419 RequestPriority priority,
420 scoped_refptr<SpdyStream>* spdy_stream,
421 const BoundNetLog& stream_net_log) {
422 // Make sure that we don't try to send https/wss over an unauthenticated, but
423 // encrypted SSL socket.
424 if (is_secure_ && certificate_error_code_ != OK &&
425 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
426 LOG(ERROR) << "Tried to create spdy stream for secure content over an "
427 << "unauthenticated session.";
428 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
429 return ERR_SPDY_PROTOCOL_ERROR;
430 }
431
432 const std::string& path = url.PathForRequest();
433
434 const spdy::SpdyStreamId stream_id = GetNewStreamId();
435
436 *spdy_stream = new SpdyStream(this,
437 stream_id,
438 false,
439 stream_net_log);
440 const scoped_refptr<SpdyStream>& stream = *spdy_stream;
441
442 stream->set_priority(priority);
443 stream->set_path(path);
444 stream->set_send_window_size(initial_send_window_size_);
445 stream->set_recv_window_size(initial_recv_window_size_);
446 ActivateStream(stream);
447
448 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
449 static_cast<int>(priority), 0, 10, 11);
450
451 // TODO(mbelshe): Optimize memory allocations
452 DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
453
454 DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
455 return OK;
456 }
457
WriteSynStream(spdy::SpdyStreamId stream_id,RequestPriority priority,spdy::SpdyControlFlags flags,const linked_ptr<spdy::SpdyHeaderBlock> & headers)458 int SpdySession::WriteSynStream(
459 spdy::SpdyStreamId stream_id,
460 RequestPriority priority,
461 spdy::SpdyControlFlags flags,
462 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
463 // Find our stream
464 if (!IsStreamActive(stream_id))
465 return ERR_INVALID_SPDY_STREAM;
466 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
467 CHECK_EQ(stream->stream_id(), stream_id);
468
469 scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
470 spdy_framer_.CreateSynStream(
471 stream_id, 0,
472 ConvertRequestPriorityToSpdyPriority(priority),
473 flags, false, headers.get()));
474 QueueFrame(syn_frame.get(), priority, stream);
475
476 base::StatsCounter spdy_requests("spdy.requests");
477 spdy_requests.Increment();
478 streams_initiated_count_++;
479
480 if (net_log().IsLoggingAllEvents()) {
481 net_log().AddEvent(
482 NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
483 make_scoped_refptr(
484 new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
485 }
486
487 return ERR_IO_PENDING;
488 }
489
WriteStreamData(spdy::SpdyStreamId stream_id,net::IOBuffer * data,int len,spdy::SpdyDataFlags flags)490 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
491 net::IOBuffer* data, int len,
492 spdy::SpdyDataFlags flags) {
493 // Find our stream
494 DCHECK(IsStreamActive(stream_id));
495 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
496 CHECK_EQ(stream->stream_id(), stream_id);
497 if (!stream)
498 return ERR_INVALID_SPDY_STREAM;
499
500 if (len > kMaxSpdyFrameChunkSize) {
501 len = kMaxSpdyFrameChunkSize;
502 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
503 }
504
505 // Obey send window size of the stream if flow control is enabled.
506 if (use_flow_control_) {
507 if (stream->send_window_size() <= 0) {
508 // Because we queue frames onto the session, it is possible that
509 // a stream was not flow controlled at the time it attempted the
510 // write, but when we go to fulfill the write, it is now flow
511 // controlled. This is why we need the session to mark the stream
512 // as stalled - because only the session knows for sure when the
513 // stall occurs.
514 stream->set_stalled_by_flow_control(true);
515 net_log().AddEvent(
516 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
517 make_scoped_refptr(
518 new NetLogIntegerParameter("stream_id", stream_id)));
519 return ERR_IO_PENDING;
520 }
521 int new_len = std::min(len, stream->send_window_size());
522 if (new_len < len) {
523 len = new_len;
524 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
525 }
526 stream->DecreaseSendWindowSize(len);
527 }
528
529 if (net_log().IsLoggingAllEvents()) {
530 net_log().AddEvent(
531 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
532 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
533 }
534
535 // TODO(mbelshe): reduce memory copies here.
536 scoped_ptr<spdy::SpdyDataFrame> frame(
537 spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
538 QueueFrame(frame.get(), stream->priority(), stream);
539 return ERR_IO_PENDING;
540 }
541
CloseStream(spdy::SpdyStreamId stream_id,int status)542 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
543 // TODO(mbelshe): We should send a RST_STREAM control frame here
544 // so that the server can cancel a large send.
545
546 DeleteStream(stream_id, status);
547 }
548
ResetStream(spdy::SpdyStreamId stream_id,spdy::SpdyStatusCodes status)549 void SpdySession::ResetStream(
550 spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
551
552 net_log().AddEvent(
553 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
554 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
555
556 scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
557 spdy_framer_.CreateRstStream(stream_id, status));
558
559 // Default to lowest priority unless we know otherwise.
560 int priority = 3;
561 if(IsStreamActive(stream_id)) {
562 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
563 priority = stream->priority();
564 }
565 QueueFrame(rst_frame.get(), priority, NULL);
566
567 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
568 }
569
IsStreamActive(spdy::SpdyStreamId stream_id) const570 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
571 return ContainsKey(active_streams_, stream_id);
572 }
573
GetLoadState() const574 LoadState SpdySession::GetLoadState() const {
575 // NOTE: The application only queries the LoadState via the
576 // SpdyNetworkTransaction, and details are only needed when
577 // we're in the process of connecting.
578
579 // If we're connecting, defer to the connection to give us the actual
580 // LoadState.
581 if (state_ == CONNECTING)
582 return connection_->GetLoadState();
583
584 // Just report that we're idle since the session could be doing
585 // many things concurrently.
586 return LOAD_STATE_IDLE;
587 }
588
OnReadComplete(int bytes_read)589 void SpdySession::OnReadComplete(int bytes_read) {
590 // Parse a frame. For now this code requires that the frame fit into our
591 // buffer (32KB).
592 // TODO(mbelshe): support arbitrarily large frames!
593
594 read_pending_ = false;
595
596 if (bytes_read <= 0) {
597 // Session is tearing down.
598 net::Error error = static_cast<net::Error>(bytes_read);
599 if (bytes_read == 0)
600 error = ERR_CONNECTION_CLOSED;
601 CloseSessionOnError(error, true);
602 return;
603 }
604
605 bytes_received_ += bytes_read;
606
607 // The SpdyFramer will use callbacks onto |this| as it parses frames.
608 // When errors occur, those callbacks can lead to teardown of all references
609 // to |this|, so maintain a reference to self during this call for safe
610 // cleanup.
611 scoped_refptr<SpdySession> self(this);
612
613 char *data = read_buffer_->data();
614 while (bytes_read &&
615 spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
616 uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
617 bytes_read -= bytes_processed;
618 data += bytes_processed;
619 if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
620 spdy_framer_.Reset();
621 }
622
623 if (state_ != CLOSED)
624 ReadSocket();
625 }
626
OnWriteComplete(int result)627 void SpdySession::OnWriteComplete(int result) {
628 DCHECK(write_pending_);
629 DCHECK(in_flight_write_.size());
630
631 write_pending_ = false;
632
633 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
634
635 if (result >= 0) {
636 // It should not be possible to have written more bytes than our
637 // in_flight_write_.
638 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
639
640 in_flight_write_.buffer()->DidConsume(result);
641
642 // We only notify the stream when we've fully written the pending frame.
643 if (!in_flight_write_.buffer()->BytesRemaining()) {
644 if (stream) {
645 // Report the number of bytes written to the caller, but exclude the
646 // frame size overhead. NOTE: if this frame was compressed the
647 // reported bytes written is the compressed size, not the original
648 // size.
649 if (result > 0) {
650 result = in_flight_write_.buffer()->size();
651 DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
652 result -= static_cast<int>(spdy::SpdyFrame::size());
653 }
654
655 // It is possible that the stream was cancelled while we were writing
656 // to the socket.
657 if (!stream->cancelled())
658 stream->OnWriteComplete(result);
659 }
660
661 // Cleanup the write which just completed.
662 in_flight_write_.release();
663 }
664
665 // Write more data. We're already in a continuation, so we can
666 // go ahead and write it immediately (without going back to the
667 // message loop).
668 WriteSocketLater();
669 } else {
670 in_flight_write_.release();
671
672 // The stream is now errored. Close it down.
673 CloseSessionOnError(static_cast<net::Error>(result), true);
674 }
675 }
676
ReadSocket()677 net::Error SpdySession::ReadSocket() {
678 if (read_pending_)
679 return OK;
680
681 if (state_ == CLOSED) {
682 NOTREACHED();
683 return ERR_UNEXPECTED;
684 }
685
686 CHECK(connection_.get());
687 CHECK(connection_->socket());
688 int bytes_read = connection_->socket()->Read(read_buffer_.get(),
689 kReadBufferSize,
690 &read_callback_);
691 switch (bytes_read) {
692 case 0:
693 // Socket is closed!
694 CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
695 return ERR_CONNECTION_CLOSED;
696 case net::ERR_IO_PENDING:
697 // Waiting for data. Nothing to do now.
698 read_pending_ = true;
699 return ERR_IO_PENDING;
700 default:
701 // Data was read, process it.
702 // Schedule the work through the message loop to avoid recursive
703 // callbacks.
704 read_pending_ = true;
705 MessageLoop::current()->PostTask(
706 FROM_HERE,
707 method_factory_.NewRunnableMethod(
708 &SpdySession::OnReadComplete, bytes_read));
709 break;
710 }
711 return OK;
712 }
713
WriteSocketLater()714 void SpdySession::WriteSocketLater() {
715 if (delayed_write_pending_)
716 return;
717
718 if (state_ < CONNECTED)
719 return;
720
721 delayed_write_pending_ = true;
722 MessageLoop::current()->PostTask(
723 FROM_HERE,
724 method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
725 }
726
WriteSocket()727 void SpdySession::WriteSocket() {
728 // This function should only be called via WriteSocketLater.
729 DCHECK(delayed_write_pending_);
730 delayed_write_pending_ = false;
731
732 // If the socket isn't connected yet, just wait; we'll get called
733 // again when the socket connection completes. If the socket is
734 // closed, just return.
735 if (state_ < CONNECTED || state_ == CLOSED)
736 return;
737
738 if (write_pending_) // Another write is in progress still.
739 return;
740
741 // Loop sending frames until we've sent everything or until the write
742 // returns error (or ERR_IO_PENDING).
743 while (in_flight_write_.buffer() || !queue_.empty()) {
744 if (!in_flight_write_.buffer()) {
745 // Grab the next SpdyFrame to send.
746 SpdyIOBuffer next_buffer = queue_.top();
747 queue_.pop();
748
749 // We've deferred compression until just before we write it to the socket,
750 // which is now. At this time, we don't compress our data frames.
751 spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
752 size_t size;
753 if (spdy_framer_.IsCompressible(uncompressed_frame)) {
754 scoped_ptr<spdy::SpdyFrame> compressed_frame(
755 spdy_framer_.CompressFrame(uncompressed_frame));
756 if (!compressed_frame.get()) {
757 LOG(ERROR) << "SPDY Compression failure";
758 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
759 return;
760 }
761
762 size = compressed_frame->length() + spdy::SpdyFrame::size();
763
764 DCHECK_GT(size, 0u);
765
766 // TODO(mbelshe): We have too much copying of data here.
767 IOBufferWithSize* buffer = new IOBufferWithSize(size);
768 memcpy(buffer->data(), compressed_frame->data(), size);
769
770 // Attempt to send the frame.
771 in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
772 } else {
773 size = uncompressed_frame.length() + spdy::SpdyFrame::size();
774 in_flight_write_ = next_buffer;
775 }
776 } else {
777 DCHECK(in_flight_write_.buffer()->BytesRemaining());
778 }
779
780 write_pending_ = true;
781 int rv = connection_->socket()->Write(in_flight_write_.buffer(),
782 in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
783 if (rv == net::ERR_IO_PENDING)
784 break;
785
786 // We sent the frame successfully.
787 OnWriteComplete(rv);
788
789 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
790 // as in an error state.
791 if (rv < 0)
792 break;
793 }
794 }
795
CloseAllStreams(net::Error status)796 void SpdySession::CloseAllStreams(net::Error status) {
797 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
798 base::StatsCounter abandoned_push_streams(
799 "spdy.abandoned_push_streams");
800
801 if (!active_streams_.empty())
802 abandoned_streams.Add(active_streams_.size());
803 if (!unclaimed_pushed_streams_.empty()) {
804 streams_abandoned_count_ += unclaimed_pushed_streams_.size();
805 abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
806 unclaimed_pushed_streams_.clear();
807 }
808
809 for (int i = 0;i < NUM_PRIORITIES;++i) {
810 while (!create_stream_queues_[i].empty()) {
811 PendingCreateStream pending_create = create_stream_queues_[i].front();
812 create_stream_queues_[i].pop();
813 pending_create.callback->Run(ERR_ABORTED);
814 }
815 }
816
817 while (!active_streams_.empty()) {
818 ActiveStreamMap::iterator it = active_streams_.begin();
819 const scoped_refptr<SpdyStream>& stream = it->second;
820 DCHECK(stream);
821 LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
822 << "): " << stream->path();
823 DeleteStream(stream->stream_id(), status);
824 }
825
826 // We also need to drain the queue.
827 while (queue_.size())
828 queue_.pop();
829 }
830
GetNewStreamId()831 int SpdySession::GetNewStreamId() {
832 int id = stream_hi_water_mark_;
833 stream_hi_water_mark_ += 2;
834 if (stream_hi_water_mark_ > 0x7fff)
835 stream_hi_water_mark_ = 1;
836 return id;
837 }
838
QueueFrame(spdy::SpdyFrame * frame,spdy::SpdyPriority priority,SpdyStream * stream)839 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
840 spdy::SpdyPriority priority,
841 SpdyStream* stream) {
842 int length = spdy::SpdyFrame::size() + frame->length();
843 IOBuffer* buffer = new IOBuffer(length);
844 memcpy(buffer->data(), frame->data(), length);
845 queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
846
847 WriteSocketLater();
848 }
849
CloseSessionOnError(net::Error err,bool remove_from_pool)850 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
851 // Closing all streams can have a side-effect of dropping the last reference
852 // to |this|. Hold a reference through this function.
853 scoped_refptr<SpdySession> self(this);
854
855 DCHECK_LT(err, OK);
856 net_log_.AddEvent(
857 NetLog::TYPE_SPDY_SESSION_CLOSE,
858 make_scoped_refptr(new NetLogIntegerParameter("status", err)));
859
860 // Don't close twice. This can occur because we can have both
861 // a read and a write outstanding, and each can complete with
862 // an error.
863 if (state_ != CLOSED) {
864 state_ = CLOSED;
865 error_ = err;
866 if (remove_from_pool)
867 RemoveFromPool();
868 CloseAllStreams(err);
869 }
870 }
871
GetInfoAsValue() const872 Value* SpdySession::GetInfoAsValue() const {
873 DictionaryValue* dict = new DictionaryValue();
874
875 dict->SetInteger("source_id", net_log_.source().id);
876
877 dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
878 dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
879
880 dict->SetInteger("active_streams", active_streams_.size());
881
882 dict->SetInteger("unclaimed_pushed_streams",
883 unclaimed_pushed_streams_.size());
884
885 dict->SetBoolean("is_secure", is_secure_);
886
887 dict->SetInteger("error", error_);
888 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
889
890 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
891 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
892 dict->SetInteger("streams_pushed_and_claimed_count",
893 streams_pushed_and_claimed_count_);
894 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
895 dict->SetInteger("frames_received", frames_received_);
896
897 dict->SetBoolean("sent_settings", sent_settings_);
898 dict->SetBoolean("received_settings", received_settings_);
899 return dict;
900 }
901
GetPeerAddress(AddressList * address) const902 int SpdySession::GetPeerAddress(AddressList* address) const {
903 if (!connection_->socket())
904 return ERR_SOCKET_NOT_CONNECTED;
905
906 return connection_->socket()->GetPeerAddress(address);
907 }
908
GetLocalAddress(IPEndPoint * address) const909 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
910 if (!connection_->socket())
911 return ERR_SOCKET_NOT_CONNECTED;
912
913 return connection_->socket()->GetLocalAddress(address);
914 }
915
ActivateStream(SpdyStream * stream)916 void SpdySession::ActivateStream(SpdyStream* stream) {
917 const spdy::SpdyStreamId id = stream->stream_id();
918 DCHECK(!IsStreamActive(id));
919
920 active_streams_[id] = stream;
921 }
922
DeleteStream(spdy::SpdyStreamId id,int status)923 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
924 // For push streams, if they are being deleted normally, we leave
925 // the stream in the unclaimed_pushed_streams_ list. However, if
926 // the stream is errored out, clean it up entirely.
927 if (status != OK) {
928 PushedStreamMap::iterator it;
929 for (it = unclaimed_pushed_streams_.begin();
930 it != unclaimed_pushed_streams_.end(); ++it) {
931 scoped_refptr<SpdyStream> curr = it->second;
932 if (id == curr->stream_id()) {
933 unclaimed_pushed_streams_.erase(it);
934 break;
935 }
936 }
937 }
938
939 // The stream might have been deleted.
940 ActiveStreamMap::iterator it2 = active_streams_.find(id);
941 if (it2 == active_streams_.end())
942 return;
943
944 // If this is an active stream, call the callback.
945 const scoped_refptr<SpdyStream> stream(it2->second);
946 active_streams_.erase(it2);
947 if (stream)
948 stream->OnClose(status);
949 ProcessPendingCreateStreams();
950 }
951
RemoveFromPool()952 void SpdySession::RemoveFromPool() {
953 if (spdy_session_pool_) {
954 spdy_session_pool_->Remove(make_scoped_refptr(this));
955 spdy_session_pool_ = NULL;
956 }
957 }
958
GetActivePushStream(const std::string & path)959 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
960 const std::string& path) {
961 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
962
963 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
964 if (it != unclaimed_pushed_streams_.end()) {
965 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
966 scoped_refptr<SpdyStream> stream = it->second;
967 unclaimed_pushed_streams_.erase(it);
968 used_push_streams.Increment();
969 return stream;
970 }
971 return NULL;
972 }
973
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)974 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
975 if (is_secure_) {
976 SSLClientSocket* ssl_socket =
977 reinterpret_cast<SSLClientSocket*>(connection_->socket());
978 ssl_socket->GetSSLInfo(ssl_info);
979 *was_npn_negotiated = ssl_socket->was_npn_negotiated();
980 return true;
981 }
982 return false;
983 }
984
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)985 bool SpdySession::GetSSLCertRequestInfo(
986 SSLCertRequestInfo* cert_request_info) {
987 if (is_secure_) {
988 SSLClientSocket* ssl_socket =
989 reinterpret_cast<SSLClientSocket*>(connection_->socket());
990 ssl_socket->GetSSLCertRequestInfo(cert_request_info);
991 return true;
992 }
993 return false;
994 }
995
OnError(spdy::SpdyFramer * framer)996 void SpdySession::OnError(spdy::SpdyFramer* framer) {
997 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
998 }
999
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1000 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1001 const char* data,
1002 size_t len) {
1003 if (net_log().IsLoggingAllEvents()) {
1004 net_log().AddEvent(
1005 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1006 make_scoped_refptr(new NetLogSpdyDataParameter(
1007 stream_id, len, spdy::SpdyDataFlags())));
1008 }
1009
1010 if (!IsStreamActive(stream_id)) {
1011 // NOTE: it may just be that the stream was cancelled.
1012 LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
1013 return;
1014 }
1015
1016 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1017 stream->OnDataReceived(data, len);
1018 }
1019
Respond(const spdy::SpdyHeaderBlock & headers,const scoped_refptr<SpdyStream> stream)1020 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
1021 const scoped_refptr<SpdyStream> stream) {
1022 int rv = OK;
1023 rv = stream->OnResponseReceived(headers);
1024 if (rv < 0) {
1025 DCHECK_NE(rv, ERR_IO_PENDING);
1026 const spdy::SpdyStreamId stream_id = stream->stream_id();
1027 DeleteStream(stream_id, rv);
1028 return false;
1029 }
1030 return true;
1031 }
1032
OnSyn(const spdy::SpdySynStreamControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1033 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
1034 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1035 spdy::SpdyStreamId stream_id = frame.stream_id();
1036 spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
1037
1038 if (net_log_.IsLoggingAllEvents()) {
1039 net_log_.AddEvent(
1040 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1041 make_scoped_refptr(new NetLogSpdySynParameter(
1042 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1043 stream_id, associated_stream_id)));
1044 }
1045
1046 // Server-initiated streams should have even sequence numbers.
1047 if ((stream_id & 0x1) != 0) {
1048 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1049 return;
1050 }
1051
1052 if (IsStreamActive(stream_id)) {
1053 LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1054 return;
1055 }
1056
1057 if (associated_stream_id == 0) {
1058 LOG(WARNING) << "Received invalid OnSyn associated stream id "
1059 << associated_stream_id
1060 << " for stream " << stream_id;
1061 ResetStream(stream_id, spdy::INVALID_STREAM);
1062 return;
1063 }
1064
1065 streams_pushed_count_++;
1066
1067 // TODO(mbelshe): DCHECK that this is a GET method?
1068
1069 // Verify that the response had a URL for us.
1070 const std::string& url = ContainsKey(*headers, "url") ?
1071 headers->find("url")->second : "";
1072 if (url.empty()) {
1073 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1074 LOG(WARNING) << "Pushed stream did not contain a url.";
1075 return;
1076 }
1077
1078 GURL gurl(url);
1079 if (!gurl.is_valid()) {
1080 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1081 LOG(WARNING) << "Pushed stream url was invalid: " << url;
1082 return;
1083 }
1084
1085 // Verify we have a valid stream association.
1086 if (!IsStreamActive(associated_stream_id)) {
1087 LOG(WARNING) << "Received OnSyn with inactive associated stream "
1088 << associated_stream_id;
1089 ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
1090 return;
1091 }
1092
1093 scoped_refptr<SpdyStream> associated_stream =
1094 active_streams_[associated_stream_id];
1095 GURL associated_url(associated_stream->GetUrl());
1096 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
1097 LOG(WARNING) << "Rejected Cross Origin Push Stream "
1098 << associated_stream_id;
1099 ResetStream(stream_id, spdy::REFUSED_STREAM);
1100 return;
1101 }
1102
1103 // There should not be an existing pushed stream with the same path.
1104 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1105 if (it != unclaimed_pushed_streams_.end()) {
1106 LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
1107 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1108 return;
1109 }
1110
1111 scoped_refptr<SpdyStream> stream(
1112 new SpdyStream(this, stream_id, true, net_log_));
1113
1114 stream->set_path(gurl.PathForRequest());
1115
1116 unclaimed_pushed_streams_[url] = stream;
1117
1118 ActivateStream(stream);
1119 stream->set_response_received();
1120
1121 // Parse the headers.
1122 if (!Respond(*headers, stream))
1123 return;
1124
1125 base::StatsCounter push_requests("spdy.pushed_streams");
1126 push_requests.Increment();
1127 }
1128
OnSynReply(const spdy::SpdySynReplyControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1129 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
1130 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1131 spdy::SpdyStreamId stream_id = frame.stream_id();
1132
1133 bool valid_stream = IsStreamActive(stream_id);
1134 if (!valid_stream) {
1135 // NOTE: it may just be that the stream was cancelled.
1136 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
1137 return;
1138 }
1139
1140 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1141 CHECK_EQ(stream->stream_id(), stream_id);
1142 CHECK(!stream->cancelled());
1143
1144 if (stream->response_received()) {
1145 LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
1146 CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
1147 return;
1148 }
1149 stream->set_response_received();
1150
1151 if (net_log().IsLoggingAllEvents()) {
1152 net_log().AddEvent(
1153 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1154 make_scoped_refptr(new NetLogSpdySynParameter(
1155 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1156 stream_id, 0)));
1157 }
1158
1159 Respond(*headers, stream);
1160 }
1161
OnHeaders(const spdy::SpdyHeadersControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1162 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
1163 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1164 spdy::SpdyStreamId stream_id = frame.stream_id();
1165
1166 bool valid_stream = IsStreamActive(stream_id);
1167 if (!valid_stream) {
1168 // NOTE: it may just be that the stream was cancelled.
1169 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
1170 return;
1171 }
1172
1173 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1174 CHECK_EQ(stream->stream_id(), stream_id);
1175 CHECK(!stream->cancelled());
1176
1177 if (net_log().IsLoggingAllEvents()) {
1178 net_log().AddEvent(
1179 NetLog::TYPE_SPDY_SESSION_HEADERS,
1180 make_scoped_refptr(new NetLogSpdySynParameter(
1181 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1182 stream_id, 0)));
1183 }
1184
1185 int rv = stream->OnHeaders(*headers);
1186 if (rv < 0) {
1187 DCHECK_NE(rv, ERR_IO_PENDING);
1188 const spdy::SpdyStreamId stream_id = stream->stream_id();
1189 DeleteStream(stream_id, rv);
1190 }
1191 }
1192
OnControl(const spdy::SpdyControlFrame * frame)1193 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
1194 const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
1195 uint32 type = frame->type();
1196 if (type == spdy::SYN_STREAM ||
1197 type == spdy::SYN_REPLY ||
1198 type == spdy::HEADERS) {
1199 if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
1200 LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
1201 int stream_id = 0;
1202 if (type == spdy::SYN_STREAM) {
1203 stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
1204 (frame))->stream_id();
1205 } else if (type == spdy::SYN_REPLY) {
1206 stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
1207 (frame))->stream_id();
1208 } else if (type == spdy::HEADERS) {
1209 stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
1210 (frame))->stream_id();
1211 }
1212 if(IsStreamActive(stream_id))
1213 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1214 return;
1215 }
1216 }
1217
1218 frames_received_++;
1219
1220 switch (type) {
1221 case spdy::GOAWAY:
1222 OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
1223 break;
1224 case spdy::SETTINGS:
1225 OnSettings(
1226 *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
1227 break;
1228 case spdy::RST_STREAM:
1229 OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
1230 break;
1231 case spdy::SYN_STREAM:
1232 OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
1233 headers);
1234 break;
1235 case spdy::HEADERS:
1236 OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
1237 headers);
1238 break;
1239 case spdy::SYN_REPLY:
1240 OnSynReply(
1241 *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
1242 headers);
1243 break;
1244 case spdy::WINDOW_UPDATE:
1245 OnWindowUpdate(
1246 *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
1247 break;
1248 default:
1249 DCHECK(false); // Error!
1250 }
1251 }
1252
OnRst(const spdy::SpdyRstStreamControlFrame & frame)1253 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
1254 spdy::SpdyStreamId stream_id = frame.stream_id();
1255
1256 net_log().AddEvent(
1257 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
1258 make_scoped_refptr(
1259 new NetLogSpdyRstParameter(stream_id, frame.status())));
1260
1261 bool valid_stream = IsStreamActive(stream_id);
1262 if (!valid_stream) {
1263 // NOTE: it may just be that the stream was cancelled.
1264 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1265 return;
1266 }
1267 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1268 CHECK_EQ(stream->stream_id(), stream_id);
1269 CHECK(!stream->cancelled());
1270
1271 if (frame.status() == 0) {
1272 stream->OnDataReceived(NULL, 0);
1273 } else {
1274 LOG(ERROR) << "Spdy stream closed: " << frame.status();
1275 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1276 // For now, it doesn't matter much - it is a protocol error.
1277 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
1278 }
1279 }
1280
OnGoAway(const spdy::SpdyGoAwayControlFrame & frame)1281 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
1282 net_log_.AddEvent(
1283 NetLog::TYPE_SPDY_SESSION_GOAWAY,
1284 make_scoped_refptr(
1285 new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
1286 active_streams_.size(),
1287 unclaimed_pushed_streams_.size())));
1288 RemoveFromPool();
1289 CloseAllStreams(net::ERR_ABORTED);
1290
1291 // TODO(willchan): Cancel any streams that are past the GoAway frame's
1292 // |last_accepted_stream_id|.
1293
1294 // Don't bother killing any streams that are still reading. They'll either
1295 // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
1296 // closed.
1297 }
1298
OnSettings(const spdy::SpdySettingsControlFrame & frame)1299 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
1300 spdy::SpdySettings settings;
1301 if (spdy_framer_.ParseSettings(&frame, &settings)) {
1302 HandleSettings(settings);
1303 spdy_settings_->Set(host_port_pair(), settings);
1304 }
1305
1306 received_settings_ = true;
1307
1308 net_log_.AddEvent(
1309 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1310 make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1311 }
1312
OnWindowUpdate(const spdy::SpdyWindowUpdateControlFrame & frame)1313 void SpdySession::OnWindowUpdate(
1314 const spdy::SpdyWindowUpdateControlFrame& frame) {
1315 spdy::SpdyStreamId stream_id = frame.stream_id();
1316 if (!IsStreamActive(stream_id)) {
1317 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
1318 return;
1319 }
1320
1321 int delta_window_size = static_cast<int>(frame.delta_window_size());
1322 if (delta_window_size < 1) {
1323 LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
1324 << delta_window_size;
1325 ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
1326 return;
1327 }
1328
1329 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1330 CHECK_EQ(stream->stream_id(), stream_id);
1331 CHECK(!stream->cancelled());
1332
1333 if (use_flow_control_)
1334 stream->IncreaseSendWindowSize(delta_window_size);
1335
1336 net_log_.AddEvent(
1337 NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
1338 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1339 stream_id, delta_window_size, stream->send_window_size())));
1340 }
1341
SendWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1342 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
1343 int delta_window_size) {
1344 DCHECK(IsStreamActive(stream_id));
1345 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1346 CHECK_EQ(stream->stream_id(), stream_id);
1347
1348 net_log_.AddEvent(
1349 NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
1350 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1351 stream_id, delta_window_size, stream->recv_window_size())));
1352
1353 scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
1354 spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
1355 QueueFrame(window_update_frame.get(), stream->priority(), stream);
1356 }
1357
1358 // Given a cwnd that we would have sent to the server, modify it based on the
1359 // field trial policy.
ApplyCwndFieldTrialPolicy(int cwnd)1360 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1361 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1362 if (!trial) {
1363 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1364 return cwnd;
1365 }
1366 if (trial->group_name() == "cwnd10")
1367 return 10;
1368 else if (trial->group_name() == "cwnd16")
1369 return 16;
1370 else if (trial->group_name() == "cwndMin16")
1371 return std::max(cwnd, 16);
1372 else if (trial->group_name() == "cwndMin10")
1373 return std::max(cwnd, 10);
1374 else if (trial->group_name() == "cwndDynamic")
1375 return cwnd;
1376 NOTREACHED();
1377 return cwnd;
1378 }
1379
SendSettings()1380 void SpdySession::SendSettings() {
1381 // Note: we're copying the settings here, so that we can potentially modify
1382 // the settings for the field trial. When removing the field trial, make
1383 // this a reference to the const SpdySettings again.
1384 spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
1385 if (settings.empty())
1386 return;
1387
1388 // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
1389 for (spdy::SpdySettings::iterator i = settings.begin(),
1390 end = settings.end(); i != end; ++i) {
1391 const uint32 id = i->first.id();
1392 const uint32 val = i->second;
1393 switch (id) {
1394 case spdy::SETTINGS_CURRENT_CWND:
1395 uint32 cwnd = 0;
1396 cwnd = ApplyCwndFieldTrialPolicy(val);
1397 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
1398 cwnd,
1399 1, 200, 100);
1400 if (cwnd != val) {
1401 i->second = cwnd;
1402 i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
1403 spdy_settings_->Set(host_port_pair(), settings);
1404 }
1405 break;
1406 }
1407 }
1408
1409 HandleSettings(settings);
1410
1411 net_log_.AddEvent(
1412 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1413 make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1414
1415 // Create the SETTINGS frame and send it.
1416 scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
1417 spdy_framer_.CreateSettings(settings));
1418 sent_settings_ = true;
1419 QueueFrame(settings_frame.get(), 0, NULL);
1420 }
1421
HandleSettings(const spdy::SpdySettings & settings)1422 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
1423 for (spdy::SpdySettings::const_iterator i = settings.begin(),
1424 end = settings.end(); i != end; ++i) {
1425 const uint32 id = i->first.id();
1426 const uint32 val = i->second;
1427 switch (id) {
1428 case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
1429 max_concurrent_streams_ = std::min(static_cast<size_t>(val),
1430 max_concurrent_stream_limit_);
1431 ProcessPendingCreateStreams();
1432 break;
1433 }
1434 }
1435 }
1436
RecordHistograms()1437 void SpdySession::RecordHistograms() {
1438 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
1439 streams_initiated_count_,
1440 0, 300, 50);
1441 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
1442 streams_pushed_count_,
1443 0, 300, 50);
1444 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
1445 streams_pushed_and_claimed_count_,
1446 0, 300, 50);
1447 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
1448 streams_abandoned_count_,
1449 0, 300, 50);
1450 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
1451 sent_settings_ ? 1 : 0, 2);
1452 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
1453 received_settings_ ? 1 : 0, 2);
1454 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
1455 stalled_streams_,
1456 0, 300, 50);
1457 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
1458 stalled_streams_ > 0 ? 1 : 0, 2);
1459
1460 if (received_settings_) {
1461 // Enumerate the saved settings, and set histograms for it.
1462 const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
1463
1464 spdy::SpdySettings::const_iterator it;
1465 for (it = settings.begin(); it != settings.end(); ++it) {
1466 const spdy::SpdySetting setting = *it;
1467 switch (setting.first.id()) {
1468 case spdy::SETTINGS_CURRENT_CWND:
1469 // Record several different histograms to see if cwnd converges
1470 // for larger volumes of data being sent.
1471 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
1472 setting.second,
1473 1, 200, 100);
1474 if (bytes_received_ > 10 * 1024) {
1475 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
1476 setting.second,
1477 1, 200, 100);
1478 if (bytes_received_ > 25 * 1024) {
1479 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
1480 setting.second,
1481 1, 200, 100);
1482 if (bytes_received_ > 50 * 1024) {
1483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
1484 setting.second,
1485 1, 200, 100);
1486 if (bytes_received_ > 100 * 1024) {
1487 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
1488 setting.second,
1489 1, 200, 100);
1490 }
1491 }
1492 }
1493 }
1494 break;
1495 case spdy::SETTINGS_ROUND_TRIP_TIME:
1496 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
1497 setting.second,
1498 1, 1200, 100);
1499 break;
1500 case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
1501 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
1502 setting.second,
1503 1, 100, 50);
1504 break;
1505 }
1506 }
1507 }
1508 }
1509
InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream> * stream)1510 void SpdySession::InvokeUserStreamCreationCallback(
1511 scoped_refptr<SpdyStream>* stream) {
1512 PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
1513
1514 // Exit if the request has already been cancelled.
1515 if (it == pending_callback_map_.end())
1516 return;
1517
1518 CompletionCallback* callback = it->second.callback;
1519 int result = it->second.result;
1520 pending_callback_map_.erase(it);
1521 callback->Run(result);
1522 }
1523
1524 } // namespace net
1525