1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/memory/linked_ptr.h"
10 #include "base/message_loop.h"
11 #include "base/metrics/field_trial.h"
12 #include "base/metrics/stats_counters.h"
13 #include "base/stl_util-inl.h"
14 #include "base/string_number_conversions.h"
15 #include "base/string_util.h"
16 #include "base/stringprintf.h"
17 #include "base/time.h"
18 #include "base/utf_string_conversions.h"
19 #include "base/values.h"
20 #include "net/base/connection_type_histograms.h"
21 #include "net/base/net_log.h"
22 #include "net/base/net_util.h"
23 #include "net/http/http_network_session.h"
24 #include "net/socket/ssl_client_socket.h"
25 #include "net/spdy/spdy_frame_builder.h"
26 #include "net/spdy/spdy_http_utils.h"
27 #include "net/spdy/spdy_protocol.h"
28 #include "net/spdy/spdy_session_pool.h"
29 #include "net/spdy/spdy_settings_storage.h"
30 #include "net/spdy/spdy_stream.h"
31
32 namespace net {
33
NetLogSpdySynParameter(const linked_ptr<spdy::SpdyHeaderBlock> & headers,spdy::SpdyControlFlags flags,spdy::SpdyStreamId id,spdy::SpdyStreamId associated_stream)34 NetLogSpdySynParameter::NetLogSpdySynParameter(
35 const linked_ptr<spdy::SpdyHeaderBlock>& headers,
36 spdy::SpdyControlFlags flags,
37 spdy::SpdyStreamId id,
38 spdy::SpdyStreamId associated_stream)
39 : headers_(headers),
40 flags_(flags),
41 id_(id),
42 associated_stream_(associated_stream) {
43 }
44
~NetLogSpdySynParameter()45 NetLogSpdySynParameter::~NetLogSpdySynParameter() {
46 }
47
ToValue() const48 Value* NetLogSpdySynParameter::ToValue() const {
49 DictionaryValue* dict = new DictionaryValue();
50 ListValue* headers_list = new ListValue();
51 for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin();
52 it != headers_->end(); ++it) {
53 headers_list->Append(new StringValue(base::StringPrintf(
54 "%s: %s", it->first.c_str(), it->second.c_str())));
55 }
56 dict->SetInteger("flags", flags_);
57 dict->Set("headers", headers_list);
58 dict->SetInteger("id", id_);
59 if (associated_stream_)
60 dict->SetInteger("associated_stream", associated_stream_);
61 return dict;
62 }
63
64 namespace {
65
66 const int kReadBufferSize = 8 * 1024;
67
68 class NetLogSpdySessionParameter : public NetLog::EventParameters {
69 public:
NetLogSpdySessionParameter(const HostPortProxyPair & host_pair)70 NetLogSpdySessionParameter(const HostPortProxyPair& host_pair)
71 : host_pair_(host_pair) {}
ToValue() const72 virtual Value* ToValue() const {
73 DictionaryValue* dict = new DictionaryValue();
74 dict->Set("host", new StringValue(host_pair_.first.ToString()));
75 dict->Set("proxy", new StringValue(host_pair_.second.ToPacString()));
76 return dict;
77 }
78 private:
79 const HostPortProxyPair host_pair_;
80 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter);
81 };
82
83 class NetLogSpdySettingsParameter : public NetLog::EventParameters {
84 public:
NetLogSpdySettingsParameter(const spdy::SpdySettings & settings)85 explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings)
86 : settings_(settings) {}
87
ToValue() const88 virtual Value* ToValue() const {
89 DictionaryValue* dict = new DictionaryValue();
90 ListValue* settings = new ListValue();
91 for (spdy::SpdySettings::const_iterator it = settings_.begin();
92 it != settings_.end(); ++it) {
93 settings->Append(new StringValue(
94 base::StringPrintf("[%u:%u]", it->first.id(), it->second)));
95 }
96 dict->Set("settings", settings);
97 return dict;
98 }
99
100 private:
~NetLogSpdySettingsParameter()101 ~NetLogSpdySettingsParameter() {}
102 const spdy::SpdySettings settings_;
103
104 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter);
105 };
106
107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters {
108 public:
NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,int delta,int window_size)109 NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id,
110 int delta,
111 int window_size)
112 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
113
ToValue() const114 virtual Value* ToValue() const {
115 DictionaryValue* dict = new DictionaryValue();
116 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
117 dict->SetInteger("delta", delta_);
118 dict->SetInteger("window_size", window_size_);
119 return dict;
120 }
121
122 private:
~NetLogSpdyWindowUpdateParameter()123 ~NetLogSpdyWindowUpdateParameter() {}
124 const spdy::SpdyStreamId stream_id_;
125 const int delta_;
126 const int window_size_;
127
128 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter);
129 };
130
131 class NetLogSpdyDataParameter : public NetLog::EventParameters {
132 public:
NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,int size,spdy::SpdyDataFlags flags)133 NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id,
134 int size,
135 spdy::SpdyDataFlags flags)
136 : stream_id_(stream_id), size_(size), flags_(flags) {}
137
ToValue() const138 virtual Value* ToValue() const {
139 DictionaryValue* dict = new DictionaryValue();
140 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
141 dict->SetInteger("size", size_);
142 dict->SetInteger("flags", static_cast<int>(flags_));
143 return dict;
144 }
145
146 private:
~NetLogSpdyDataParameter()147 ~NetLogSpdyDataParameter() {}
148 const spdy::SpdyStreamId stream_id_;
149 const int size_;
150 const spdy::SpdyDataFlags flags_;
151
152 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter);
153 };
154
155 class NetLogSpdyRstParameter : public NetLog::EventParameters {
156 public:
NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id,int status)157 NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status)
158 : stream_id_(stream_id), status_(status) {}
159
ToValue() const160 virtual Value* ToValue() const {
161 DictionaryValue* dict = new DictionaryValue();
162 dict->SetInteger("stream_id", static_cast<int>(stream_id_));
163 dict->SetInteger("status", status_);
164 return dict;
165 }
166
167 private:
~NetLogSpdyRstParameter()168 ~NetLogSpdyRstParameter() {}
169 const spdy::SpdyStreamId stream_id_;
170 const int status_;
171
172 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter);
173 };
174
175 class NetLogSpdyPingParameter : public NetLog::EventParameters {
176 public:
NetLogSpdyPingParameter(uint32 unique_id)177 explicit NetLogSpdyPingParameter(uint32 unique_id) : unique_id_(unique_id) {}
178
ToValue() const179 virtual Value* ToValue() const {
180 DictionaryValue* dict = new DictionaryValue();
181 dict->SetInteger("unique_id", unique_id_);
182 return dict;
183 }
184
185 private:
~NetLogSpdyPingParameter()186 ~NetLogSpdyPingParameter() {}
187 const uint32 unique_id_;
188
189 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyPingParameter);
190 };
191
192 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters {
193 public:
NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,int active_streams,int unclaimed_streams)194 NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id,
195 int active_streams,
196 int unclaimed_streams)
197 : last_stream_id_(last_stream_id),
198 active_streams_(active_streams),
199 unclaimed_streams_(unclaimed_streams) {}
200
ToValue() const201 virtual Value* ToValue() const {
202 DictionaryValue* dict = new DictionaryValue();
203 dict->SetInteger("last_accepted_stream_id",
204 static_cast<int>(last_stream_id_));
205 dict->SetInteger("active_streams", active_streams_);
206 dict->SetInteger("unclaimed_streams", unclaimed_streams_);
207 return dict;
208 }
209
210 private:
~NetLogSpdyGoAwayParameter()211 ~NetLogSpdyGoAwayParameter() {}
212 const spdy::SpdyStreamId last_stream_id_;
213 const int active_streams_;
214 const int unclaimed_streams_;
215
216 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter);
217 };
218
219 } // namespace
220
221 // static
222 bool SpdySession::use_ssl_ = true;
223
224 // static
225 bool SpdySession::use_flow_control_ = false;
226
227 // static
228 size_t SpdySession::max_concurrent_stream_limit_ = 256;
229
230 // static
231 bool SpdySession::enable_ping_based_connection_checking_ = true;
232
233 // static
234 int SpdySession::connection_at_risk_of_loss_ms_ = 0;
235
236 // static
237 int SpdySession::trailing_ping_delay_time_ms_ = 1000;
238
239 // static
240 int SpdySession::hung_interval_ms_ = 10000;
241
SpdySession(const HostPortProxyPair & host_port_proxy_pair,SpdySessionPool * spdy_session_pool,SpdySettingsStorage * spdy_settings,NetLog * net_log)242 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
243 SpdySessionPool* spdy_session_pool,
244 SpdySettingsStorage* spdy_settings,
245 NetLog* net_log)
246 : ALLOW_THIS_IN_INITIALIZER_LIST(
247 read_callback_(this, &SpdySession::OnReadComplete)),
248 ALLOW_THIS_IN_INITIALIZER_LIST(
249 write_callback_(this, &SpdySession::OnWriteComplete)),
250 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
251 host_port_proxy_pair_(host_port_proxy_pair),
252 spdy_session_pool_(spdy_session_pool),
253 spdy_settings_(spdy_settings),
254 connection_(new ClientSocketHandle),
255 read_buffer_(new IOBuffer(kReadBufferSize)),
256 read_pending_(false),
257 stream_hi_water_mark_(1), // Always start at 1 for the first stream id.
258 write_pending_(false),
259 delayed_write_pending_(false),
260 is_secure_(false),
261 certificate_error_code_(OK),
262 error_(OK),
263 state_(IDLE),
264 max_concurrent_streams_(kDefaultMaxConcurrentStreams),
265 streams_initiated_count_(0),
266 streams_pushed_count_(0),
267 streams_pushed_and_claimed_count_(0),
268 streams_abandoned_count_(0),
269 frames_received_(0),
270 bytes_received_(0),
271 sent_settings_(false),
272 received_settings_(false),
273 stalled_streams_(0),
274 pings_in_flight_(0),
275 next_ping_id_(1),
276 received_data_time_(base::TimeTicks::Now()),
277 trailing_ping_pending_(false),
278 check_ping_status_pending_(false),
279 need_to_send_ping_(false),
280 initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize),
281 initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
282 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) {
283 DCHECK(HttpStreamFactory::spdy_enabled());
284 net_log_.BeginEvent(
285 NetLog::TYPE_SPDY_SESSION,
286 make_scoped_refptr(
287 new NetLogSpdySessionParameter(host_port_proxy_pair_)));
288
289 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
290
291 spdy_framer_.set_visitor(this);
292
293 SendSettings();
294 }
295
~SpdySession()296 SpdySession::~SpdySession() {
297 if (state_ != CLOSED) {
298 state_ = CLOSED;
299
300 // Cleanup all the streams.
301 CloseAllStreams(net::ERR_ABORTED);
302 }
303
304 if (connection_->is_initialized()) {
305 // With Spdy we can't recycle sockets.
306 connection_->socket()->Disconnect();
307 }
308
309 // Streams should all be gone now.
310 DCHECK_EQ(0u, num_active_streams());
311 DCHECK_EQ(0u, num_unclaimed_pushed_streams());
312
313 DCHECK(pending_callback_map_.empty());
314
315 RecordHistograms();
316
317 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL);
318 }
319
InitializeWithSocket(ClientSocketHandle * connection,bool is_secure,int certificate_error_code)320 net::Error SpdySession::InitializeWithSocket(
321 ClientSocketHandle* connection,
322 bool is_secure,
323 int certificate_error_code) {
324 base::StatsCounter spdy_sessions("spdy.sessions");
325 spdy_sessions.Increment();
326
327 state_ = CONNECTED;
328 connection_.reset(connection);
329 is_secure_ = is_secure;
330 certificate_error_code_ = certificate_error_code;
331
332 // Write out any data that we might have to send, such as the settings frame.
333 WriteSocketLater();
334 net::Error error = ReadSocket();
335 if (error == ERR_IO_PENDING)
336 return OK;
337 return error;
338 }
339
VerifyDomainAuthentication(const std::string & domain)340 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
341 if (state_ != CONNECTED)
342 return false;
343
344 SSLInfo ssl_info;
345 bool was_npn_negotiated;
346 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated))
347 return true; // This is not a secure session, so all domains are okay.
348
349 return ssl_info.cert->VerifyNameMatch(domain);
350 }
351
GetPushStream(const GURL & url,scoped_refptr<SpdyStream> * stream,const BoundNetLog & stream_net_log)352 int SpdySession::GetPushStream(
353 const GURL& url,
354 scoped_refptr<SpdyStream>* stream,
355 const BoundNetLog& stream_net_log) {
356 CHECK_NE(state_, CLOSED);
357
358 *stream = NULL;
359
360 // Don't allow access to secure push streams over an unauthenticated, but
361 // encrypted SSL socket.
362 if (is_secure_ && certificate_error_code_ != OK &&
363 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
364 LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an "
365 << "unauthenticated session.";
366 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
367 return ERR_SPDY_PROTOCOL_ERROR;
368 }
369
370 *stream = GetActivePushStream(url.spec());
371 if (stream->get()) {
372 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
373 streams_pushed_and_claimed_count_++;
374 return OK;
375 }
376 return 0;
377 }
378
CreateStream(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log,CompletionCallback * callback)379 int SpdySession::CreateStream(
380 const GURL& url,
381 RequestPriority priority,
382 scoped_refptr<SpdyStream>* spdy_stream,
383 const BoundNetLog& stream_net_log,
384 CompletionCallback* callback) {
385 if (!max_concurrent_streams_ ||
386 active_streams_.size() < max_concurrent_streams_) {
387 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
388 }
389
390 stalled_streams_++;
391 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
392 create_stream_queues_[priority].push(
393 PendingCreateStream(url, priority, spdy_stream,
394 stream_net_log, callback));
395 return ERR_IO_PENDING;
396 }
397
ProcessPendingCreateStreams()398 void SpdySession::ProcessPendingCreateStreams() {
399 while (!max_concurrent_streams_ ||
400 active_streams_.size() < max_concurrent_streams_) {
401 bool no_pending_create_streams = true;
402 for (int i = 0;i < NUM_PRIORITIES;++i) {
403 if (!create_stream_queues_[i].empty()) {
404 PendingCreateStream pending_create = create_stream_queues_[i].front();
405 create_stream_queues_[i].pop();
406 no_pending_create_streams = false;
407 int error = CreateStreamImpl(*pending_create.url,
408 pending_create.priority,
409 pending_create.spdy_stream,
410 *pending_create.stream_net_log);
411 scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream;
412 DCHECK(!ContainsKey(pending_callback_map_, stream));
413 pending_callback_map_[stream] =
414 CallbackResultPair(pending_create.callback, error);
415 MessageLoop::current()->PostTask(
416 FROM_HERE,
417 method_factory_.NewRunnableMethod(
418 &SpdySession::InvokeUserStreamCreationCallback, stream));
419 break;
420 }
421 }
422 if (no_pending_create_streams)
423 return; // there were no streams in any queue
424 }
425 }
426
CancelPendingCreateStreams(const scoped_refptr<SpdyStream> * spdy_stream)427 void SpdySession::CancelPendingCreateStreams(
428 const scoped_refptr<SpdyStream>* spdy_stream) {
429 PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream);
430 if (it != pending_callback_map_.end()) {
431 pending_callback_map_.erase(it);
432 return;
433 }
434
435 for (int i = 0;i < NUM_PRIORITIES;++i) {
436 PendingCreateStreamQueue tmp;
437 // Make a copy removing this trans
438 while (!create_stream_queues_[i].empty()) {
439 PendingCreateStream pending_create = create_stream_queues_[i].front();
440 create_stream_queues_[i].pop();
441 if (pending_create.spdy_stream != spdy_stream)
442 tmp.push(pending_create);
443 }
444 // Now copy it back
445 while (!tmp.empty()) {
446 create_stream_queues_[i].push(tmp.front());
447 tmp.pop();
448 }
449 }
450 }
451
CreateStreamImpl(const GURL & url,RequestPriority priority,scoped_refptr<SpdyStream> * spdy_stream,const BoundNetLog & stream_net_log)452 int SpdySession::CreateStreamImpl(
453 const GURL& url,
454 RequestPriority priority,
455 scoped_refptr<SpdyStream>* spdy_stream,
456 const BoundNetLog& stream_net_log) {
457 // Make sure that we don't try to send https/wss over an unauthenticated, but
458 // encrypted SSL socket.
459 if (is_secure_ && certificate_error_code_ != OK &&
460 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
461 LOG(ERROR) << "Tried to create spdy stream for secure content over an "
462 << "unauthenticated session.";
463 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true);
464 return ERR_SPDY_PROTOCOL_ERROR;
465 }
466
467 const std::string& path = url.PathForRequest();
468
469 const spdy::SpdyStreamId stream_id = GetNewStreamId();
470
471 *spdy_stream = new SpdyStream(this,
472 stream_id,
473 false,
474 stream_net_log);
475 const scoped_refptr<SpdyStream>& stream = *spdy_stream;
476
477 stream->set_priority(priority);
478 stream->set_path(path);
479 stream->set_send_window_size(initial_send_window_size_);
480 stream->set_recv_window_size(initial_recv_window_size_);
481 ActivateStream(stream);
482
483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
484 static_cast<int>(priority), 0, 10, 11);
485
486 // TODO(mbelshe): Optimize memory allocations
487 DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES);
488
489 DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
490 return OK;
491 }
492
WriteSynStream(spdy::SpdyStreamId stream_id,RequestPriority priority,spdy::SpdyControlFlags flags,const linked_ptr<spdy::SpdyHeaderBlock> & headers)493 int SpdySession::WriteSynStream(
494 spdy::SpdyStreamId stream_id,
495 RequestPriority priority,
496 spdy::SpdyControlFlags flags,
497 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
498 // Find our stream
499 if (!IsStreamActive(stream_id))
500 return ERR_INVALID_SPDY_STREAM;
501 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
502 CHECK_EQ(stream->stream_id(), stream_id);
503
504 SendPrefacePingIfNoneInFlight();
505
506 scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame(
507 spdy_framer_.CreateSynStream(
508 stream_id, 0,
509 ConvertRequestPriorityToSpdyPriority(priority),
510 flags, false, headers.get()));
511 QueueFrame(syn_frame.get(), priority, stream);
512
513 base::StatsCounter spdy_requests("spdy.requests");
514 spdy_requests.Increment();
515 streams_initiated_count_++;
516
517 if (net_log().IsLoggingAllEvents()) {
518 net_log().AddEvent(
519 NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
520 make_scoped_refptr(
521 new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
522 }
523
524 // Some servers don't like too many pings, so we limit our current sending to
525 // no more than one ping for any syn sent. To do this, we avoid ever setting
526 // this to true unless we send a syn (which we have just done). This approach
527 // may change over time as servers change their responses to pings.
528 need_to_send_ping_ = true;
529
530 return ERR_IO_PENDING;
531 }
532
WriteStreamData(spdy::SpdyStreamId stream_id,net::IOBuffer * data,int len,spdy::SpdyDataFlags flags)533 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id,
534 net::IOBuffer* data, int len,
535 spdy::SpdyDataFlags flags) {
536 // Find our stream
537 DCHECK(IsStreamActive(stream_id));
538 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
539 CHECK_EQ(stream->stream_id(), stream_id);
540 if (!stream)
541 return ERR_INVALID_SPDY_STREAM;
542
543 SendPrefacePingIfNoneInFlight();
544
545 if (len > kMaxSpdyFrameChunkSize) {
546 len = kMaxSpdyFrameChunkSize;
547 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
548 }
549
550 // Obey send window size of the stream if flow control is enabled.
551 if (use_flow_control_) {
552 if (stream->send_window_size() <= 0) {
553 // Because we queue frames onto the session, it is possible that
554 // a stream was not flow controlled at the time it attempted the
555 // write, but when we go to fulfill the write, it is now flow
556 // controlled. This is why we need the session to mark the stream
557 // as stalled - because only the session knows for sure when the
558 // stall occurs.
559 stream->set_stalled_by_flow_control(true);
560 net_log().AddEvent(
561 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
562 make_scoped_refptr(
563 new NetLogIntegerParameter("stream_id", stream_id)));
564 return ERR_IO_PENDING;
565 }
566 int new_len = std::min(len, stream->send_window_size());
567 if (new_len < len) {
568 len = new_len;
569 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN);
570 }
571 stream->DecreaseSendWindowSize(len);
572 }
573
574 if (net_log().IsLoggingAllEvents()) {
575 net_log().AddEvent(
576 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
577 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
578 }
579
580 // TODO(mbelshe): reduce memory copies here.
581 scoped_ptr<spdy::SpdyDataFrame> frame(
582 spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
583 QueueFrame(frame.get(), stream->priority(), stream);
584 return ERR_IO_PENDING;
585 }
586
CloseStream(spdy::SpdyStreamId stream_id,int status)587 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) {
588 // TODO(mbelshe): We should send a RST_STREAM control frame here
589 // so that the server can cancel a large send.
590
591 DeleteStream(stream_id, status);
592 }
593
ResetStream(spdy::SpdyStreamId stream_id,spdy::SpdyStatusCodes status)594 void SpdySession::ResetStream(
595 spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) {
596
597 net_log().AddEvent(
598 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
599 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status)));
600
601 scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame(
602 spdy_framer_.CreateRstStream(stream_id, status));
603
604 // Default to lowest priority unless we know otherwise.
605 int priority = 3;
606 if(IsStreamActive(stream_id)) {
607 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
608 priority = stream->priority();
609 }
610 QueueFrame(rst_frame.get(), priority, NULL);
611 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
612 }
613
IsStreamActive(spdy::SpdyStreamId stream_id) const614 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const {
615 return ContainsKey(active_streams_, stream_id);
616 }
617
GetLoadState() const618 LoadState SpdySession::GetLoadState() const {
619 // NOTE: The application only queries the LoadState via the
620 // SpdyNetworkTransaction, and details are only needed when
621 // we're in the process of connecting.
622
623 // If we're connecting, defer to the connection to give us the actual
624 // LoadState.
625 if (state_ == CONNECTING)
626 return connection_->GetLoadState();
627
628 // Just report that we're idle since the session could be doing
629 // many things concurrently.
630 return LOAD_STATE_IDLE;
631 }
632
OnReadComplete(int bytes_read)633 void SpdySession::OnReadComplete(int bytes_read) {
634 // Parse a frame. For now this code requires that the frame fit into our
635 // buffer (32KB).
636 // TODO(mbelshe): support arbitrarily large frames!
637
638 read_pending_ = false;
639
640 if (bytes_read <= 0) {
641 // Session is tearing down.
642 net::Error error = static_cast<net::Error>(bytes_read);
643 if (bytes_read == 0)
644 error = ERR_CONNECTION_CLOSED;
645 CloseSessionOnError(error, true);
646 return;
647 }
648
649 bytes_received_ += bytes_read;
650
651 received_data_time_ = base::TimeTicks::Now();
652
653 // The SpdyFramer will use callbacks onto |this| as it parses frames.
654 // When errors occur, those callbacks can lead to teardown of all references
655 // to |this|, so maintain a reference to self during this call for safe
656 // cleanup.
657 scoped_refptr<SpdySession> self(this);
658
659 char *data = read_buffer_->data();
660 while (bytes_read &&
661 spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) {
662 uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read);
663 bytes_read -= bytes_processed;
664 data += bytes_processed;
665 if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE)
666 spdy_framer_.Reset();
667 }
668
669 if (state_ != CLOSED)
670 ReadSocket();
671 }
672
OnWriteComplete(int result)673 void SpdySession::OnWriteComplete(int result) {
674 DCHECK(write_pending_);
675 DCHECK(in_flight_write_.size());
676
677 write_pending_ = false;
678
679 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
680
681 if (result >= 0) {
682 // It should not be possible to have written more bytes than our
683 // in_flight_write_.
684 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
685
686 in_flight_write_.buffer()->DidConsume(result);
687
688 // We only notify the stream when we've fully written the pending frame.
689 if (!in_flight_write_.buffer()->BytesRemaining()) {
690 if (stream) {
691 // Report the number of bytes written to the caller, but exclude the
692 // frame size overhead. NOTE: if this frame was compressed the
693 // reported bytes written is the compressed size, not the original
694 // size.
695 if (result > 0) {
696 result = in_flight_write_.buffer()->size();
697 DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size()));
698 result -= static_cast<int>(spdy::SpdyFrame::size());
699 }
700
701 // It is possible that the stream was cancelled while we were writing
702 // to the socket.
703 if (!stream->cancelled())
704 stream->OnWriteComplete(result);
705 }
706
707 // Cleanup the write which just completed.
708 in_flight_write_.release();
709 }
710
711 // Write more data. We're already in a continuation, so we can
712 // go ahead and write it immediately (without going back to the
713 // message loop).
714 WriteSocketLater();
715 } else {
716 in_flight_write_.release();
717
718 // The stream is now errored. Close it down.
719 CloseSessionOnError(static_cast<net::Error>(result), true);
720 }
721 }
722
ReadSocket()723 net::Error SpdySession::ReadSocket() {
724 if (read_pending_)
725 return OK;
726
727 if (state_ == CLOSED) {
728 NOTREACHED();
729 return ERR_UNEXPECTED;
730 }
731
732 CHECK(connection_.get());
733 CHECK(connection_->socket());
734 int bytes_read = connection_->socket()->Read(read_buffer_.get(),
735 kReadBufferSize,
736 &read_callback_);
737 switch (bytes_read) {
738 case 0:
739 // Socket is closed!
740 CloseSessionOnError(ERR_CONNECTION_CLOSED, true);
741 return ERR_CONNECTION_CLOSED;
742 case net::ERR_IO_PENDING:
743 // Waiting for data. Nothing to do now.
744 read_pending_ = true;
745 return ERR_IO_PENDING;
746 default:
747 // Data was read, process it.
748 // Schedule the work through the message loop to avoid recursive
749 // callbacks.
750 read_pending_ = true;
751 MessageLoop::current()->PostTask(
752 FROM_HERE,
753 method_factory_.NewRunnableMethod(
754 &SpdySession::OnReadComplete, bytes_read));
755 break;
756 }
757 return OK;
758 }
759
WriteSocketLater()760 void SpdySession::WriteSocketLater() {
761 if (delayed_write_pending_)
762 return;
763
764 if (state_ < CONNECTED)
765 return;
766
767 delayed_write_pending_ = true;
768 MessageLoop::current()->PostTask(
769 FROM_HERE,
770 method_factory_.NewRunnableMethod(&SpdySession::WriteSocket));
771 }
772
WriteSocket()773 void SpdySession::WriteSocket() {
774 // This function should only be called via WriteSocketLater.
775 DCHECK(delayed_write_pending_);
776 delayed_write_pending_ = false;
777
778 // If the socket isn't connected yet, just wait; we'll get called
779 // again when the socket connection completes. If the socket is
780 // closed, just return.
781 if (state_ < CONNECTED || state_ == CLOSED)
782 return;
783
784 if (write_pending_) // Another write is in progress still.
785 return;
786
787 // Loop sending frames until we've sent everything or until the write
788 // returns error (or ERR_IO_PENDING).
789 while (in_flight_write_.buffer() || !queue_.empty()) {
790 if (!in_flight_write_.buffer()) {
791 // Grab the next SpdyFrame to send.
792 SpdyIOBuffer next_buffer = queue_.top();
793 queue_.pop();
794
795 // We've deferred compression until just before we write it to the socket,
796 // which is now. At this time, we don't compress our data frames.
797 spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
798 size_t size;
799 if (spdy_framer_.IsCompressible(uncompressed_frame)) {
800 scoped_ptr<spdy::SpdyFrame> compressed_frame(
801 spdy_framer_.CompressFrame(uncompressed_frame));
802 if (!compressed_frame.get()) {
803 LOG(ERROR) << "SPDY Compression failure";
804 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
805 return;
806 }
807
808 size = compressed_frame->length() + spdy::SpdyFrame::size();
809
810 DCHECK_GT(size, 0u);
811
812 // TODO(mbelshe): We have too much copying of data here.
813 IOBufferWithSize* buffer = new IOBufferWithSize(size);
814 memcpy(buffer->data(), compressed_frame->data(), size);
815
816 // Attempt to send the frame.
817 in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream());
818 } else {
819 size = uncompressed_frame.length() + spdy::SpdyFrame::size();
820 in_flight_write_ = next_buffer;
821 }
822 } else {
823 DCHECK(in_flight_write_.buffer()->BytesRemaining());
824 }
825
826 write_pending_ = true;
827 int rv = connection_->socket()->Write(in_flight_write_.buffer(),
828 in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
829 if (rv == net::ERR_IO_PENDING)
830 break;
831
832 // We sent the frame successfully.
833 OnWriteComplete(rv);
834
835 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
836 // as in an error state.
837 if (rv < 0)
838 break;
839 }
840 }
841
CloseAllStreams(net::Error status)842 void SpdySession::CloseAllStreams(net::Error status) {
843 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
844 base::StatsCounter abandoned_push_streams(
845 "spdy.abandoned_push_streams");
846
847 if (!active_streams_.empty())
848 abandoned_streams.Add(active_streams_.size());
849 if (!unclaimed_pushed_streams_.empty()) {
850 streams_abandoned_count_ += unclaimed_pushed_streams_.size();
851 abandoned_push_streams.Add(unclaimed_pushed_streams_.size());
852 unclaimed_pushed_streams_.clear();
853 }
854
855 for (int i = 0;i < NUM_PRIORITIES;++i) {
856 while (!create_stream_queues_[i].empty()) {
857 PendingCreateStream pending_create = create_stream_queues_[i].front();
858 create_stream_queues_[i].pop();
859 pending_create.callback->Run(ERR_ABORTED);
860 }
861 }
862
863 while (!active_streams_.empty()) {
864 ActiveStreamMap::iterator it = active_streams_.begin();
865 const scoped_refptr<SpdyStream>& stream = it->second;
866 DCHECK(stream);
867 LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id()
868 << "): " << stream->path();
869 DeleteStream(stream->stream_id(), status);
870 }
871
872 // We also need to drain the queue.
873 while (queue_.size())
874 queue_.pop();
875 }
876
GetNewStreamId()877 int SpdySession::GetNewStreamId() {
878 int id = stream_hi_water_mark_;
879 stream_hi_water_mark_ += 2;
880 if (stream_hi_water_mark_ > 0x7fff)
881 stream_hi_water_mark_ = 1;
882 return id;
883 }
884
QueueFrame(spdy::SpdyFrame * frame,spdy::SpdyPriority priority,SpdyStream * stream)885 void SpdySession::QueueFrame(spdy::SpdyFrame* frame,
886 spdy::SpdyPriority priority,
887 SpdyStream* stream) {
888 int length = spdy::SpdyFrame::size() + frame->length();
889 IOBuffer* buffer = new IOBuffer(length);
890 memcpy(buffer->data(), frame->data(), length);
891 queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
892
893 WriteSocketLater();
894 }
895
CloseSessionOnError(net::Error err,bool remove_from_pool)896 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) {
897 // Closing all streams can have a side-effect of dropping the last reference
898 // to |this|. Hold a reference through this function.
899 scoped_refptr<SpdySession> self(this);
900
901 DCHECK_LT(err, OK);
902 net_log_.AddEvent(
903 NetLog::TYPE_SPDY_SESSION_CLOSE,
904 make_scoped_refptr(new NetLogIntegerParameter("status", err)));
905
906 // Don't close twice. This can occur because we can have both
907 // a read and a write outstanding, and each can complete with
908 // an error.
909 if (state_ != CLOSED) {
910 state_ = CLOSED;
911 error_ = err;
912 if (remove_from_pool)
913 RemoveFromPool();
914 CloseAllStreams(err);
915 }
916 }
917
GetInfoAsValue() const918 Value* SpdySession::GetInfoAsValue() const {
919 DictionaryValue* dict = new DictionaryValue();
920
921 dict->SetInteger("source_id", net_log_.source().id);
922
923 dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString());
924 dict->SetString("proxy", host_port_proxy_pair_.second.ToURI());
925
926 dict->SetInteger("active_streams", active_streams_.size());
927
928 dict->SetInteger("unclaimed_pushed_streams",
929 unclaimed_pushed_streams_.size());
930
931 dict->SetBoolean("is_secure", is_secure_);
932
933 dict->SetInteger("error", error_);
934 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
935
936 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
937 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
938 dict->SetInteger("streams_pushed_and_claimed_count",
939 streams_pushed_and_claimed_count_);
940 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
941 dict->SetInteger("frames_received", frames_received_);
942
943 dict->SetBoolean("sent_settings", sent_settings_);
944 dict->SetBoolean("received_settings", received_settings_);
945 return dict;
946 }
947
GetPeerAddress(AddressList * address) const948 int SpdySession::GetPeerAddress(AddressList* address) const {
949 if (!connection_->socket())
950 return ERR_SOCKET_NOT_CONNECTED;
951
952 return connection_->socket()->GetPeerAddress(address);
953 }
954
GetLocalAddress(IPEndPoint * address) const955 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
956 if (!connection_->socket())
957 return ERR_SOCKET_NOT_CONNECTED;
958
959 return connection_->socket()->GetLocalAddress(address);
960 }
961
ActivateStream(SpdyStream * stream)962 void SpdySession::ActivateStream(SpdyStream* stream) {
963 const spdy::SpdyStreamId id = stream->stream_id();
964 DCHECK(!IsStreamActive(id));
965
966 active_streams_[id] = stream;
967 }
968
DeleteStream(spdy::SpdyStreamId id,int status)969 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) {
970 // For push streams, if they are being deleted normally, we leave
971 // the stream in the unclaimed_pushed_streams_ list. However, if
972 // the stream is errored out, clean it up entirely.
973 if (status != OK) {
974 PushedStreamMap::iterator it;
975 for (it = unclaimed_pushed_streams_.begin();
976 it != unclaimed_pushed_streams_.end(); ++it) {
977 scoped_refptr<SpdyStream> curr = it->second;
978 if (id == curr->stream_id()) {
979 unclaimed_pushed_streams_.erase(it);
980 break;
981 }
982 }
983 }
984
985 // The stream might have been deleted.
986 ActiveStreamMap::iterator it2 = active_streams_.find(id);
987 if (it2 == active_streams_.end())
988 return;
989
990 // If this is an active stream, call the callback.
991 const scoped_refptr<SpdyStream> stream(it2->second);
992 active_streams_.erase(it2);
993 if (stream)
994 stream->OnClose(status);
995 ProcessPendingCreateStreams();
996 }
997
RemoveFromPool()998 void SpdySession::RemoveFromPool() {
999 if (spdy_session_pool_) {
1000 spdy_session_pool_->Remove(make_scoped_refptr(this));
1001 spdy_session_pool_ = NULL;
1002 }
1003 }
1004
GetActivePushStream(const std::string & path)1005 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream(
1006 const std::string& path) {
1007 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1008
1009 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path);
1010 if (it != unclaimed_pushed_streams_.end()) {
1011 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL);
1012 scoped_refptr<SpdyStream> stream = it->second;
1013 unclaimed_pushed_streams_.erase(it);
1014 used_push_streams.Increment();
1015 return stream;
1016 }
1017 return NULL;
1018 }
1019
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated)1020 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
1021 if (is_secure_) {
1022 SSLClientSocket* ssl_socket =
1023 reinterpret_cast<SSLClientSocket*>(connection_->socket());
1024 ssl_socket->GetSSLInfo(ssl_info);
1025 *was_npn_negotiated = ssl_socket->was_npn_negotiated();
1026 return true;
1027 }
1028 return false;
1029 }
1030
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)1031 bool SpdySession::GetSSLCertRequestInfo(
1032 SSLCertRequestInfo* cert_request_info) {
1033 if (is_secure_) {
1034 SSLClientSocket* ssl_socket =
1035 reinterpret_cast<SSLClientSocket*>(connection_->socket());
1036 ssl_socket->GetSSLCertRequestInfo(cert_request_info);
1037 return true;
1038 }
1039 return false;
1040 }
1041
OnError(spdy::SpdyFramer * framer)1042 void SpdySession::OnError(spdy::SpdyFramer* framer) {
1043 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1044 }
1045
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1046 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1047 const char* data,
1048 size_t len) {
1049 if (net_log().IsLoggingAllEvents()) {
1050 net_log().AddEvent(
1051 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1052 make_scoped_refptr(new NetLogSpdyDataParameter(
1053 stream_id, len, spdy::SpdyDataFlags())));
1054 }
1055
1056 if (!IsStreamActive(stream_id)) {
1057 // NOTE: it may just be that the stream was cancelled.
1058 LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
1059 return;
1060 }
1061
1062 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1063 stream->OnDataReceived(data, len);
1064 }
1065
Respond(const spdy::SpdyHeaderBlock & headers,const scoped_refptr<SpdyStream> stream)1066 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers,
1067 const scoped_refptr<SpdyStream> stream) {
1068 int rv = OK;
1069 rv = stream->OnResponseReceived(headers);
1070 if (rv < 0) {
1071 DCHECK_NE(rv, ERR_IO_PENDING);
1072 const spdy::SpdyStreamId stream_id = stream->stream_id();
1073 DeleteStream(stream_id, rv);
1074 return false;
1075 }
1076 return true;
1077 }
1078
OnSyn(const spdy::SpdySynStreamControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1079 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame,
1080 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1081 spdy::SpdyStreamId stream_id = frame.stream_id();
1082 spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id();
1083
1084 if (net_log_.IsLoggingAllEvents()) {
1085 net_log_.AddEvent(
1086 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
1087 make_scoped_refptr(new NetLogSpdySynParameter(
1088 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1089 stream_id, associated_stream_id)));
1090 }
1091
1092 // Server-initiated streams should have even sequence numbers.
1093 if ((stream_id & 0x1) != 0) {
1094 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
1095 return;
1096 }
1097
1098 if (IsStreamActive(stream_id)) {
1099 LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
1100 return;
1101 }
1102
1103 if (associated_stream_id == 0) {
1104 LOG(WARNING) << "Received invalid OnSyn associated stream id "
1105 << associated_stream_id
1106 << " for stream " << stream_id;
1107 ResetStream(stream_id, spdy::INVALID_STREAM);
1108 return;
1109 }
1110
1111 streams_pushed_count_++;
1112
1113 // TODO(mbelshe): DCHECK that this is a GET method?
1114
1115 // Verify that the response had a URL for us.
1116 const std::string& url = ContainsKey(*headers, "url") ?
1117 headers->find("url")->second : "";
1118 if (url.empty()) {
1119 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1120 LOG(WARNING) << "Pushed stream did not contain a url.";
1121 return;
1122 }
1123
1124 GURL gurl(url);
1125 if (!gurl.is_valid()) {
1126 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1127 LOG(WARNING) << "Pushed stream url was invalid: " << url;
1128 return;
1129 }
1130
1131 // Verify we have a valid stream association.
1132 if (!IsStreamActive(associated_stream_id)) {
1133 LOG(WARNING) << "Received OnSyn with inactive associated stream "
1134 << associated_stream_id;
1135 ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM);
1136 return;
1137 }
1138
1139 scoped_refptr<SpdyStream> associated_stream =
1140 active_streams_[associated_stream_id];
1141 GURL associated_url(associated_stream->GetUrl());
1142 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
1143 LOG(WARNING) << "Rejected Cross Origin Push Stream "
1144 << associated_stream_id;
1145 ResetStream(stream_id, spdy::REFUSED_STREAM);
1146 return;
1147 }
1148
1149 // There should not be an existing pushed stream with the same path.
1150 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1151 if (it != unclaimed_pushed_streams_.end()) {
1152 LOG(WARNING) << "Received duplicate pushed stream with url: " << url;
1153 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1154 return;
1155 }
1156
1157 scoped_refptr<SpdyStream> stream(
1158 new SpdyStream(this, stream_id, true, net_log_));
1159
1160 stream->set_path(gurl.PathForRequest());
1161
1162 unclaimed_pushed_streams_[url] = stream;
1163
1164 ActivateStream(stream);
1165 stream->set_response_received();
1166
1167 // Parse the headers.
1168 if (!Respond(*headers, stream))
1169 return;
1170
1171 base::StatsCounter push_requests("spdy.pushed_streams");
1172 push_requests.Increment();
1173 }
1174
OnSynReply(const spdy::SpdySynReplyControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1175 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame,
1176 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1177 spdy::SpdyStreamId stream_id = frame.stream_id();
1178
1179 bool valid_stream = IsStreamActive(stream_id);
1180 if (!valid_stream) {
1181 // NOTE: it may just be that the stream was cancelled.
1182 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
1183 return;
1184 }
1185
1186 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1187 CHECK_EQ(stream->stream_id(), stream_id);
1188 CHECK(!stream->cancelled());
1189
1190 if (stream->response_received()) {
1191 LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id;
1192 CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR);
1193 return;
1194 }
1195 stream->set_response_received();
1196
1197 if (net_log().IsLoggingAllEvents()) {
1198 net_log().AddEvent(
1199 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1200 make_scoped_refptr(new NetLogSpdySynParameter(
1201 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1202 stream_id, 0)));
1203 }
1204
1205 Respond(*headers, stream);
1206 }
1207
OnHeaders(const spdy::SpdyHeadersControlFrame & frame,const linked_ptr<spdy::SpdyHeaderBlock> & headers)1208 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame,
1209 const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
1210 spdy::SpdyStreamId stream_id = frame.stream_id();
1211
1212 bool valid_stream = IsStreamActive(stream_id);
1213 if (!valid_stream) {
1214 // NOTE: it may just be that the stream was cancelled.
1215 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
1216 return;
1217 }
1218
1219 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1220 CHECK_EQ(stream->stream_id(), stream_id);
1221 CHECK(!stream->cancelled());
1222
1223 if (net_log().IsLoggingAllEvents()) {
1224 net_log().AddEvent(
1225 NetLog::TYPE_SPDY_SESSION_HEADERS,
1226 make_scoped_refptr(new NetLogSpdySynParameter(
1227 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()),
1228 stream_id, 0)));
1229 }
1230
1231 int rv = stream->OnHeaders(*headers);
1232 if (rv < 0) {
1233 DCHECK_NE(rv, ERR_IO_PENDING);
1234 const spdy::SpdyStreamId stream_id = stream->stream_id();
1235 DeleteStream(stream_id, rv);
1236 }
1237 }
1238
OnControl(const spdy::SpdyControlFrame * frame)1239 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) {
1240 const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
1241 uint32 type = frame->type();
1242 if (type == spdy::SYN_STREAM ||
1243 type == spdy::SYN_REPLY ||
1244 type == spdy::HEADERS) {
1245 if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) {
1246 LOG(WARNING) << "Could not parse Spdy Control Frame Header.";
1247 int stream_id = 0;
1248 if (type == spdy::SYN_STREAM) {
1249 stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*>
1250 (frame))->stream_id();
1251 } else if (type == spdy::SYN_REPLY) {
1252 stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*>
1253 (frame))->stream_id();
1254 } else if (type == spdy::HEADERS) {
1255 stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*>
1256 (frame))->stream_id();
1257 }
1258 if(IsStreamActive(stream_id))
1259 ResetStream(stream_id, spdy::PROTOCOL_ERROR);
1260 return;
1261 }
1262 }
1263
1264 frames_received_++;
1265
1266 switch (type) {
1267 case spdy::GOAWAY:
1268 OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame));
1269 break;
1270 case spdy::PING:
1271 OnPing(*reinterpret_cast<const spdy::SpdyPingControlFrame*>(frame));
1272 break;
1273 case spdy::SETTINGS:
1274 OnSettings(
1275 *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame));
1276 break;
1277 case spdy::RST_STREAM:
1278 OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame));
1279 break;
1280 case spdy::SYN_STREAM:
1281 OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame),
1282 headers);
1283 break;
1284 case spdy::HEADERS:
1285 OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame),
1286 headers);
1287 break;
1288 case spdy::SYN_REPLY:
1289 OnSynReply(
1290 *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame),
1291 headers);
1292 break;
1293 case spdy::WINDOW_UPDATE:
1294 OnWindowUpdate(
1295 *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame));
1296 break;
1297 default:
1298 DCHECK(false); // Error!
1299 }
1300 }
1301
OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,const char * header_data,size_t len)1302 bool SpdySession::OnControlFrameHeaderData(spdy::SpdyStreamId stream_id,
1303 const char* header_data,
1304 size_t len) {
1305 DCHECK(false);
1306 return false;
1307 }
1308
OnDataFrameHeader(const spdy::SpdyDataFrame * frame)1309 void SpdySession::OnDataFrameHeader(const spdy::SpdyDataFrame* frame) {
1310 DCHECK(false);
1311 }
1312
OnRst(const spdy::SpdyRstStreamControlFrame & frame)1313 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) {
1314 spdy::SpdyStreamId stream_id = frame.stream_id();
1315
1316 net_log().AddEvent(
1317 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
1318 make_scoped_refptr(
1319 new NetLogSpdyRstParameter(stream_id, frame.status())));
1320
1321 bool valid_stream = IsStreamActive(stream_id);
1322 if (!valid_stream) {
1323 // NOTE: it may just be that the stream was cancelled.
1324 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1325 return;
1326 }
1327 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1328 CHECK_EQ(stream->stream_id(), stream_id);
1329 CHECK(!stream->cancelled());
1330
1331 if (frame.status() == 0) {
1332 stream->OnDataReceived(NULL, 0);
1333 } else {
1334 LOG(ERROR) << "Spdy stream closed: " << frame.status();
1335 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1336 // For now, it doesn't matter much - it is a protocol error.
1337 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
1338 }
1339 }
1340
OnGoAway(const spdy::SpdyGoAwayControlFrame & frame)1341 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) {
1342 net_log_.AddEvent(
1343 NetLog::TYPE_SPDY_SESSION_GOAWAY,
1344 make_scoped_refptr(
1345 new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(),
1346 active_streams_.size(),
1347 unclaimed_pushed_streams_.size())));
1348 RemoveFromPool();
1349 CloseAllStreams(net::ERR_ABORTED);
1350
1351 // TODO(willchan): Cancel any streams that are past the GoAway frame's
1352 // |last_accepted_stream_id|.
1353
1354 // Don't bother killing any streams that are still reading. They'll either
1355 // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is
1356 // closed.
1357 }
1358
OnPing(const spdy::SpdyPingControlFrame & frame)1359 void SpdySession::OnPing(const spdy::SpdyPingControlFrame& frame) {
1360 net_log_.AddEvent(
1361 NetLog::TYPE_SPDY_SESSION_PING,
1362 make_scoped_refptr(new NetLogSpdyPingParameter(frame.unique_id())));
1363
1364 // Send response to a PING from server.
1365 if (frame.unique_id() % 2 == 0) {
1366 WritePingFrame(frame.unique_id());
1367 return;
1368 }
1369
1370 --pings_in_flight_;
1371 if (pings_in_flight_ < 0) {
1372 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true);
1373 return;
1374 }
1375
1376 if (pings_in_flight_ > 0)
1377 return;
1378
1379 if (!need_to_send_ping_)
1380 return;
1381
1382 PlanToSendTrailingPing();
1383 }
1384
OnSettings(const spdy::SpdySettingsControlFrame & frame)1385 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) {
1386 spdy::SpdySettings settings;
1387 if (spdy_framer_.ParseSettings(&frame, &settings)) {
1388 HandleSettings(settings);
1389 spdy_settings_->Set(host_port_pair(), settings);
1390 }
1391
1392 received_settings_ = true;
1393
1394 net_log_.AddEvent(
1395 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1396 make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1397 }
1398
OnWindowUpdate(const spdy::SpdyWindowUpdateControlFrame & frame)1399 void SpdySession::OnWindowUpdate(
1400 const spdy::SpdyWindowUpdateControlFrame& frame) {
1401 spdy::SpdyStreamId stream_id = frame.stream_id();
1402 if (!IsStreamActive(stream_id)) {
1403 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
1404 return;
1405 }
1406
1407 int delta_window_size = static_cast<int>(frame.delta_window_size());
1408 if (delta_window_size < 1) {
1409 LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size "
1410 << delta_window_size;
1411 ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR);
1412 return;
1413 }
1414
1415 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1416 CHECK_EQ(stream->stream_id(), stream_id);
1417 CHECK(!stream->cancelled());
1418
1419 if (use_flow_control_)
1420 stream->IncreaseSendWindowSize(delta_window_size);
1421
1422 net_log_.AddEvent(
1423 NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE,
1424 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1425 stream_id, delta_window_size, stream->send_window_size())));
1426 }
1427
SendWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1428 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id,
1429 int delta_window_size) {
1430 DCHECK(IsStreamActive(stream_id));
1431 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1432 CHECK_EQ(stream->stream_id(), stream_id);
1433
1434 net_log_.AddEvent(
1435 NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE,
1436 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1437 stream_id, delta_window_size, stream->recv_window_size())));
1438
1439 scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame(
1440 spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size));
1441 QueueFrame(window_update_frame.get(), stream->priority(), stream);
1442 }
1443
1444 // Given a cwnd that we would have sent to the server, modify it based on the
1445 // field trial policy.
ApplyCwndFieldTrialPolicy(int cwnd)1446 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1447 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1448 if (!trial) {
1449 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1450 return cwnd;
1451 }
1452 if (trial->group_name() == "cwnd10")
1453 return 10;
1454 else if (trial->group_name() == "cwnd16")
1455 return 16;
1456 else if (trial->group_name() == "cwndMin16")
1457 return std::max(cwnd, 16);
1458 else if (trial->group_name() == "cwndMin10")
1459 return std::max(cwnd, 10);
1460 else if (trial->group_name() == "cwndDynamic")
1461 return cwnd;
1462 NOTREACHED();
1463 return cwnd;
1464 }
1465
SendSettings()1466 void SpdySession::SendSettings() {
1467 // Note: we're copying the settings here, so that we can potentially modify
1468 // the settings for the field trial. When removing the field trial, make
1469 // this a reference to the const SpdySettings again.
1470 spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair());
1471 if (settings.empty())
1472 return;
1473
1474 // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable.
1475 for (spdy::SpdySettings::iterator i = settings.begin(),
1476 end = settings.end(); i != end; ++i) {
1477 const uint32 id = i->first.id();
1478 const uint32 val = i->second;
1479 switch (id) {
1480 case spdy::SETTINGS_CURRENT_CWND:
1481 uint32 cwnd = 0;
1482 cwnd = ApplyCwndFieldTrialPolicy(val);
1483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent",
1484 cwnd,
1485 1, 200, 100);
1486 if (cwnd != val) {
1487 i->second = cwnd;
1488 i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST);
1489 spdy_settings_->Set(host_port_pair(), settings);
1490 }
1491 break;
1492 }
1493 }
1494
1495 HandleSettings(settings);
1496
1497 net_log_.AddEvent(
1498 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1499 make_scoped_refptr(new NetLogSpdySettingsParameter(settings)));
1500
1501 // Create the SETTINGS frame and send it.
1502 scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame(
1503 spdy_framer_.CreateSettings(settings));
1504 sent_settings_ = true;
1505 QueueFrame(settings_frame.get(), 0, NULL);
1506 }
1507
HandleSettings(const spdy::SpdySettings & settings)1508 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) {
1509 for (spdy::SpdySettings::const_iterator i = settings.begin(),
1510 end = settings.end(); i != end; ++i) {
1511 const uint32 id = i->first.id();
1512 const uint32 val = i->second;
1513 switch (id) {
1514 case spdy::SETTINGS_MAX_CONCURRENT_STREAMS:
1515 max_concurrent_streams_ = std::min(static_cast<size_t>(val),
1516 max_concurrent_stream_limit_);
1517 ProcessPendingCreateStreams();
1518 break;
1519 }
1520 }
1521 }
1522
SendPrefacePingIfNoneInFlight()1523 void SpdySession::SendPrefacePingIfNoneInFlight() {
1524 if (pings_in_flight_ || trailing_ping_pending_ ||
1525 !enable_ping_based_connection_checking_)
1526 return;
1527
1528 const base::TimeDelta kConnectionAtRiskOfLoss =
1529 base::TimeDelta::FromMilliseconds(connection_at_risk_of_loss_ms_);
1530
1531 base::TimeTicks now = base::TimeTicks::Now();
1532 // If we haven't heard from server, then send a preface-PING.
1533 if ((now - received_data_time_) > kConnectionAtRiskOfLoss)
1534 SendPrefacePing();
1535
1536 PlanToSendTrailingPing();
1537 }
1538
SendPrefacePing()1539 void SpdySession::SendPrefacePing() {
1540 // TODO(rtenneti): Send preface pings when more servers support additional
1541 // pings.
1542 // WritePingFrame(next_ping_id_);
1543 }
1544
PlanToSendTrailingPing()1545 void SpdySession::PlanToSendTrailingPing() {
1546 if (trailing_ping_pending_)
1547 return;
1548
1549 trailing_ping_pending_ = true;
1550 MessageLoop::current()->PostDelayedTask(
1551 FROM_HERE,
1552 method_factory_.NewRunnableMethod(&SpdySession::SendTrailingPing),
1553 trailing_ping_delay_time_ms_);
1554 }
1555
SendTrailingPing()1556 void SpdySession::SendTrailingPing() {
1557 DCHECK(trailing_ping_pending_);
1558 trailing_ping_pending_ = false;
1559 WritePingFrame(next_ping_id_);
1560 }
1561
WritePingFrame(uint32 unique_id)1562 void SpdySession::WritePingFrame(uint32 unique_id) {
1563 scoped_ptr<spdy::SpdyPingControlFrame> ping_frame(
1564 spdy_framer_.CreatePingFrame(next_ping_id_));
1565 QueueFrame(ping_frame.get(), SPDY_PRIORITY_HIGHEST, NULL);
1566
1567 if (net_log().IsLoggingAllEvents()) {
1568 net_log().AddEvent(
1569 NetLog::TYPE_SPDY_SESSION_PING,
1570 make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_)));
1571 }
1572 if (unique_id % 2 != 0) {
1573 next_ping_id_ += 2;
1574 ++pings_in_flight_;
1575 need_to_send_ping_ = false;
1576 PlanToCheckPingStatus();
1577 }
1578 }
1579
PlanToCheckPingStatus()1580 void SpdySession::PlanToCheckPingStatus() {
1581 if (check_ping_status_pending_)
1582 return;
1583
1584 check_ping_status_pending_ = true;
1585 MessageLoop::current()->PostDelayedTask(
1586 FROM_HERE,
1587 method_factory_.NewRunnableMethod(
1588 &SpdySession::CheckPingStatus, base::TimeTicks::Now()),
1589 hung_interval_ms_);
1590 }
1591
CheckPingStatus(base::TimeTicks last_check_time)1592 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
1593 // Check if we got a response back for all PINGs we had sent.
1594 if (pings_in_flight_ == 0) {
1595 check_ping_status_pending_ = false;
1596 return;
1597 }
1598
1599 DCHECK(check_ping_status_pending_);
1600
1601 const base::TimeDelta kHungInterval =
1602 base::TimeDelta::FromMilliseconds(hung_interval_ms_);
1603
1604 base::TimeTicks now = base::TimeTicks::Now();
1605 base::TimeDelta delay = kHungInterval - (now - received_data_time_);
1606
1607 if (delay.InMilliseconds() < 0 || received_data_time_ < last_check_time) {
1608 DCHECK(now - received_data_time_ > kHungInterval);
1609 CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true);
1610 return;
1611 }
1612
1613 // Check the status of connection after a delay.
1614 MessageLoop::current()->PostDelayedTask(
1615 FROM_HERE,
1616 method_factory_.NewRunnableMethod(&SpdySession::CheckPingStatus, now),
1617 delay.InMilliseconds());
1618 }
1619
RecordHistograms()1620 void SpdySession::RecordHistograms() {
1621 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
1622 streams_initiated_count_,
1623 0, 300, 50);
1624 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
1625 streams_pushed_count_,
1626 0, 300, 50);
1627 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
1628 streams_pushed_and_claimed_count_,
1629 0, 300, 50);
1630 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
1631 streams_abandoned_count_,
1632 0, 300, 50);
1633 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
1634 sent_settings_ ? 1 : 0, 2);
1635 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
1636 received_settings_ ? 1 : 0, 2);
1637 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
1638 stalled_streams_,
1639 0, 300, 50);
1640 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
1641 stalled_streams_ > 0 ? 1 : 0, 2);
1642
1643 if (received_settings_) {
1644 // Enumerate the saved settings, and set histograms for it.
1645 const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair());
1646
1647 spdy::SpdySettings::const_iterator it;
1648 for (it = settings.begin(); it != settings.end(); ++it) {
1649 const spdy::SpdySetting setting = *it;
1650 switch (setting.first.id()) {
1651 case spdy::SETTINGS_CURRENT_CWND:
1652 // Record several different histograms to see if cwnd converges
1653 // for larger volumes of data being sent.
1654 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
1655 setting.second,
1656 1, 200, 100);
1657 if (bytes_received_ > 10 * 1024) {
1658 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
1659 setting.second,
1660 1, 200, 100);
1661 if (bytes_received_ > 25 * 1024) {
1662 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
1663 setting.second,
1664 1, 200, 100);
1665 if (bytes_received_ > 50 * 1024) {
1666 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
1667 setting.second,
1668 1, 200, 100);
1669 if (bytes_received_ > 100 * 1024) {
1670 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
1671 setting.second,
1672 1, 200, 100);
1673 }
1674 }
1675 }
1676 }
1677 break;
1678 case spdy::SETTINGS_ROUND_TRIP_TIME:
1679 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
1680 setting.second,
1681 1, 1200, 100);
1682 break;
1683 case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE:
1684 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
1685 setting.second,
1686 1, 100, 50);
1687 break;
1688 }
1689 }
1690 }
1691 }
1692
InvokeUserStreamCreationCallback(scoped_refptr<SpdyStream> * stream)1693 void SpdySession::InvokeUserStreamCreationCallback(
1694 scoped_refptr<SpdyStream>* stream) {
1695 PendingCallbackMap::iterator it = pending_callback_map_.find(stream);
1696
1697 // Exit if the request has already been cancelled.
1698 if (it == pending_callback_map_.end())
1699 return;
1700
1701 CompletionCallback* callback = it->second.callback;
1702 int result = it->second.result;
1703 pending_callback_map_.erase(it);
1704 callback->Run(result);
1705 }
1706
1707 } // namespace net
1708