1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/cert/cert_verify_result.h"
33 #include "net/http/http_log_util.h"
34 #include "net/http/http_network_session.h"
35 #include "net/http/http_server_properties.h"
36 #include "net/http/http_util.h"
37 #include "net/http/transport_security_state.h"
38 #include "net/socket/ssl_client_socket.h"
39 #include "net/spdy/spdy_buffer_producer.h"
40 #include "net/spdy/spdy_frame_builder.h"
41 #include "net/spdy/spdy_http_utils.h"
42 #include "net/spdy/spdy_protocol.h"
43 #include "net/spdy/spdy_session_pool.h"
44 #include "net/spdy/spdy_stream.h"
45 #include "net/ssl/channel_id_service.h"
46 #include "net/ssl/ssl_cipher_suite_names.h"
47 #include "net/ssl/ssl_connection_status_flags.h"
48
49 namespace net {
50
51 namespace {
52
53 const int kReadBufferSize = 8 * 1024;
54 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
55 const int kHungIntervalSeconds = 10;
56
57 // Minimum seconds that unclaimed pushed streams will be kept in memory.
58 const int kMinPushedStreamLifetimeSeconds = 300;
59
SpdyHeaderBlockToListValue(const SpdyHeaderBlock & headers,net::NetLog::LogLevel log_level)60 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
61 const SpdyHeaderBlock& headers,
62 net::NetLog::LogLevel log_level) {
63 scoped_ptr<base::ListValue> headers_list(new base::ListValue());
64 for (SpdyHeaderBlock::const_iterator it = headers.begin();
65 it != headers.end(); ++it) {
66 headers_list->AppendString(
67 it->first + ": " +
68 ElideHeaderValueForNetLog(log_level, it->first, it->second));
69 }
70 return headers_list.Pass();
71 }
72
NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock * headers,bool fin,bool unidirectional,SpdyPriority spdy_priority,SpdyStreamId stream_id,NetLog::LogLevel log_level)73 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
74 bool fin,
75 bool unidirectional,
76 SpdyPriority spdy_priority,
77 SpdyStreamId stream_id,
78 NetLog::LogLevel log_level) {
79 base::DictionaryValue* dict = new base::DictionaryValue();
80 dict->Set("headers",
81 SpdyHeaderBlockToListValue(*headers, log_level).release());
82 dict->SetBoolean("fin", fin);
83 dict->SetBoolean("unidirectional", unidirectional);
84 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
85 dict->SetInteger("stream_id", stream_id);
86 return dict;
87 }
88
NetLogSpdySynStreamReceivedCallback(const SpdyHeaderBlock * headers,bool fin,bool unidirectional,SpdyPriority spdy_priority,SpdyStreamId stream_id,SpdyStreamId associated_stream,NetLog::LogLevel log_level)89 base::Value* NetLogSpdySynStreamReceivedCallback(
90 const SpdyHeaderBlock* headers,
91 bool fin,
92 bool unidirectional,
93 SpdyPriority spdy_priority,
94 SpdyStreamId stream_id,
95 SpdyStreamId associated_stream,
96 NetLog::LogLevel log_level) {
97 base::DictionaryValue* dict = new base::DictionaryValue();
98 dict->Set("headers",
99 SpdyHeaderBlockToListValue(*headers, log_level).release());
100 dict->SetBoolean("fin", fin);
101 dict->SetBoolean("unidirectional", unidirectional);
102 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
103 dict->SetInteger("stream_id", stream_id);
104 dict->SetInteger("associated_stream", associated_stream);
105 return dict;
106 }
107
NetLogSpdySynReplyOrHeadersReceivedCallback(const SpdyHeaderBlock * headers,bool fin,SpdyStreamId stream_id,NetLog::LogLevel log_level)108 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
109 const SpdyHeaderBlock* headers,
110 bool fin,
111 SpdyStreamId stream_id,
112 NetLog::LogLevel log_level) {
113 base::DictionaryValue* dict = new base::DictionaryValue();
114 dict->Set("headers",
115 SpdyHeaderBlockToListValue(*headers, log_level).release());
116 dict->SetBoolean("fin", fin);
117 dict->SetInteger("stream_id", stream_id);
118 return dict;
119 }
120
NetLogSpdySessionCloseCallback(int net_error,const std::string * description,NetLog::LogLevel)121 base::Value* NetLogSpdySessionCloseCallback(int net_error,
122 const std::string* description,
123 NetLog::LogLevel /* log_level */) {
124 base::DictionaryValue* dict = new base::DictionaryValue();
125 dict->SetInteger("net_error", net_error);
126 dict->SetString("description", *description);
127 return dict;
128 }
129
NetLogSpdySessionCallback(const HostPortProxyPair * host_pair,NetLog::LogLevel)130 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
131 NetLog::LogLevel /* log_level */) {
132 base::DictionaryValue* dict = new base::DictionaryValue();
133 dict->SetString("host", host_pair->first.ToString());
134 dict->SetString("proxy", host_pair->second.ToPacString());
135 return dict;
136 }
137
NetLogSpdyInitializedCallback(NetLog::Source source,const NextProto protocol_version,NetLog::LogLevel)138 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
139 const NextProto protocol_version,
140 NetLog::LogLevel /* log_level */) {
141 base::DictionaryValue* dict = new base::DictionaryValue();
142 if (source.IsValid()) {
143 source.AddToEventParameters(dict);
144 }
145 dict->SetString("protocol",
146 SSLClientSocket::NextProtoToString(protocol_version));
147 return dict;
148 }
149
NetLogSpdySettingsCallback(const HostPortPair & host_port_pair,bool clear_persisted,NetLog::LogLevel)150 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
151 bool clear_persisted,
152 NetLog::LogLevel /* log_level */) {
153 base::DictionaryValue* dict = new base::DictionaryValue();
154 dict->SetString("host", host_port_pair.ToString());
155 dict->SetBoolean("clear_persisted", clear_persisted);
156 return dict;
157 }
158
NetLogSpdySettingCallback(SpdySettingsIds id,const SpdyMajorVersion protocol_version,SpdySettingsFlags flags,uint32 value,NetLog::LogLevel)159 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
160 const SpdyMajorVersion protocol_version,
161 SpdySettingsFlags flags,
162 uint32 value,
163 NetLog::LogLevel /* log_level */) {
164 base::DictionaryValue* dict = new base::DictionaryValue();
165 dict->SetInteger("id",
166 SpdyConstants::SerializeSettingId(protocol_version, id));
167 dict->SetInteger("flags", flags);
168 dict->SetInteger("value", value);
169 return dict;
170 }
171
NetLogSpdySendSettingsCallback(const SettingsMap * settings,const SpdyMajorVersion protocol_version,NetLog::LogLevel)172 base::Value* NetLogSpdySendSettingsCallback(
173 const SettingsMap* settings,
174 const SpdyMajorVersion protocol_version,
175 NetLog::LogLevel /* log_level */) {
176 base::DictionaryValue* dict = new base::DictionaryValue();
177 base::ListValue* settings_list = new base::ListValue();
178 for (SettingsMap::const_iterator it = settings->begin();
179 it != settings->end(); ++it) {
180 const SpdySettingsIds id = it->first;
181 const SpdySettingsFlags flags = it->second.first;
182 const uint32 value = it->second.second;
183 settings_list->Append(new base::StringValue(base::StringPrintf(
184 "[id:%u flags:%u value:%u]",
185 SpdyConstants::SerializeSettingId(protocol_version, id),
186 flags,
187 value)));
188 }
189 dict->Set("settings", settings_list);
190 return dict;
191 }
192
NetLogSpdyWindowUpdateFrameCallback(SpdyStreamId stream_id,uint32 delta,NetLog::LogLevel)193 base::Value* NetLogSpdyWindowUpdateFrameCallback(
194 SpdyStreamId stream_id,
195 uint32 delta,
196 NetLog::LogLevel /* log_level */) {
197 base::DictionaryValue* dict = new base::DictionaryValue();
198 dict->SetInteger("stream_id", static_cast<int>(stream_id));
199 dict->SetInteger("delta", delta);
200 return dict;
201 }
202
NetLogSpdySessionWindowUpdateCallback(int32 delta,int32 window_size,NetLog::LogLevel)203 base::Value* NetLogSpdySessionWindowUpdateCallback(
204 int32 delta,
205 int32 window_size,
206 NetLog::LogLevel /* log_level */) {
207 base::DictionaryValue* dict = new base::DictionaryValue();
208 dict->SetInteger("delta", delta);
209 dict->SetInteger("window_size", window_size);
210 return dict;
211 }
212
NetLogSpdyDataCallback(SpdyStreamId stream_id,int size,bool fin,NetLog::LogLevel)213 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
214 int size,
215 bool fin,
216 NetLog::LogLevel /* log_level */) {
217 base::DictionaryValue* dict = new base::DictionaryValue();
218 dict->SetInteger("stream_id", static_cast<int>(stream_id));
219 dict->SetInteger("size", size);
220 dict->SetBoolean("fin", fin);
221 return dict;
222 }
223
NetLogSpdyRstCallback(SpdyStreamId stream_id,int status,const std::string * description,NetLog::LogLevel)224 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
225 int status,
226 const std::string* description,
227 NetLog::LogLevel /* log_level */) {
228 base::DictionaryValue* dict = new base::DictionaryValue();
229 dict->SetInteger("stream_id", static_cast<int>(stream_id));
230 dict->SetInteger("status", status);
231 dict->SetString("description", *description);
232 return dict;
233 }
234
NetLogSpdyPingCallback(SpdyPingId unique_id,bool is_ack,const char * type,NetLog::LogLevel)235 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
236 bool is_ack,
237 const char* type,
238 NetLog::LogLevel /* log_level */) {
239 base::DictionaryValue* dict = new base::DictionaryValue();
240 dict->SetInteger("unique_id", unique_id);
241 dict->SetString("type", type);
242 dict->SetBoolean("is_ack", is_ack);
243 return dict;
244 }
245
NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,int active_streams,int unclaimed_streams,SpdyGoAwayStatus status,NetLog::LogLevel)246 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
247 int active_streams,
248 int unclaimed_streams,
249 SpdyGoAwayStatus status,
250 NetLog::LogLevel /* log_level */) {
251 base::DictionaryValue* dict = new base::DictionaryValue();
252 dict->SetInteger("last_accepted_stream_id",
253 static_cast<int>(last_stream_id));
254 dict->SetInteger("active_streams", active_streams);
255 dict->SetInteger("unclaimed_streams", unclaimed_streams);
256 dict->SetInteger("status", static_cast<int>(status));
257 return dict;
258 }
259
NetLogSpdyPushPromiseReceivedCallback(const SpdyHeaderBlock * headers,SpdyStreamId stream_id,SpdyStreamId promised_stream_id,NetLog::LogLevel log_level)260 base::Value* NetLogSpdyPushPromiseReceivedCallback(
261 const SpdyHeaderBlock* headers,
262 SpdyStreamId stream_id,
263 SpdyStreamId promised_stream_id,
264 NetLog::LogLevel log_level) {
265 base::DictionaryValue* dict = new base::DictionaryValue();
266 dict->Set("headers",
267 SpdyHeaderBlockToListValue(*headers, log_level).release());
268 dict->SetInteger("id", stream_id);
269 dict->SetInteger("promised_stream_id", promised_stream_id);
270 return dict;
271 }
272
273 // Helper function to return the total size of an array of objects
274 // with .size() member functions.
GetTotalSize(const T (& arr)[N])275 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
276 size_t total_size = 0;
277 for (size_t i = 0; i < N; ++i) {
278 total_size += arr[i].size();
279 }
280 return total_size;
281 }
282
283 // Helper class for std:find_if on STL container containing
284 // SpdyStreamRequest weak pointers.
285 class RequestEquals {
286 public:
RequestEquals(const base::WeakPtr<SpdyStreamRequest> & request)287 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
288 : request_(request) {}
289
operator ()(const base::WeakPtr<SpdyStreamRequest> & request) const290 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
291 return request_.get() == request.get();
292 }
293
294 private:
295 const base::WeakPtr<SpdyStreamRequest> request_;
296 };
297
298 // The maximum number of concurrent streams we will ever create. Even if
299 // the server permits more, we will never exceed this limit.
300 const size_t kMaxConcurrentStreamLimit = 256;
301
302 } // namespace
303
MapFramerErrorToProtocolError(SpdyFramer::SpdyError err)304 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
305 SpdyFramer::SpdyError err) {
306 switch(err) {
307 case SpdyFramer::SPDY_NO_ERROR:
308 return SPDY_ERROR_NO_ERROR;
309 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
310 return SPDY_ERROR_INVALID_CONTROL_FRAME;
311 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
312 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
313 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
314 return SPDY_ERROR_ZLIB_INIT_FAILURE;
315 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
316 return SPDY_ERROR_UNSUPPORTED_VERSION;
317 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
318 return SPDY_ERROR_DECOMPRESS_FAILURE;
319 case SpdyFramer::SPDY_COMPRESS_FAILURE:
320 return SPDY_ERROR_COMPRESS_FAILURE;
321 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
322 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
323 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
324 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
325 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
326 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
327 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
328 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
329 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
330 return SPDY_ERROR_UNEXPECTED_FRAME;
331 default:
332 NOTREACHED();
333 return static_cast<SpdyProtocolErrorDetails>(-1);
334 }
335 }
336
MapFramerErrorToNetError(SpdyFramer::SpdyError err)337 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
338 switch (err) {
339 case SpdyFramer::SPDY_NO_ERROR:
340 return OK;
341 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
342 return ERR_SPDY_PROTOCOL_ERROR;
343 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
344 return ERR_SPDY_FRAME_SIZE_ERROR;
345 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
346 return ERR_SPDY_COMPRESSION_ERROR;
347 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
348 return ERR_SPDY_PROTOCOL_ERROR;
349 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
350 return ERR_SPDY_COMPRESSION_ERROR;
351 case SpdyFramer::SPDY_COMPRESS_FAILURE:
352 return ERR_SPDY_COMPRESSION_ERROR;
353 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
354 return ERR_SPDY_PROTOCOL_ERROR;
355 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
356 return ERR_SPDY_PROTOCOL_ERROR;
357 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
358 return ERR_SPDY_PROTOCOL_ERROR;
359 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
360 return ERR_SPDY_PROTOCOL_ERROR;
361 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
362 return ERR_SPDY_PROTOCOL_ERROR;
363 default:
364 NOTREACHED();
365 return ERR_SPDY_PROTOCOL_ERROR;
366 }
367 }
368
MapRstStreamStatusToProtocolError(SpdyRstStreamStatus status)369 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
370 SpdyRstStreamStatus status) {
371 switch(status) {
372 case RST_STREAM_PROTOCOL_ERROR:
373 return STATUS_CODE_PROTOCOL_ERROR;
374 case RST_STREAM_INVALID_STREAM:
375 return STATUS_CODE_INVALID_STREAM;
376 case RST_STREAM_REFUSED_STREAM:
377 return STATUS_CODE_REFUSED_STREAM;
378 case RST_STREAM_UNSUPPORTED_VERSION:
379 return STATUS_CODE_UNSUPPORTED_VERSION;
380 case RST_STREAM_CANCEL:
381 return STATUS_CODE_CANCEL;
382 case RST_STREAM_INTERNAL_ERROR:
383 return STATUS_CODE_INTERNAL_ERROR;
384 case RST_STREAM_FLOW_CONTROL_ERROR:
385 return STATUS_CODE_FLOW_CONTROL_ERROR;
386 case RST_STREAM_STREAM_IN_USE:
387 return STATUS_CODE_STREAM_IN_USE;
388 case RST_STREAM_STREAM_ALREADY_CLOSED:
389 return STATUS_CODE_STREAM_ALREADY_CLOSED;
390 case RST_STREAM_INVALID_CREDENTIALS:
391 return STATUS_CODE_INVALID_CREDENTIALS;
392 case RST_STREAM_FRAME_SIZE_ERROR:
393 return STATUS_CODE_FRAME_SIZE_ERROR;
394 case RST_STREAM_SETTINGS_TIMEOUT:
395 return STATUS_CODE_SETTINGS_TIMEOUT;
396 case RST_STREAM_CONNECT_ERROR:
397 return STATUS_CODE_CONNECT_ERROR;
398 case RST_STREAM_ENHANCE_YOUR_CALM:
399 return STATUS_CODE_ENHANCE_YOUR_CALM;
400 default:
401 NOTREACHED();
402 return static_cast<SpdyProtocolErrorDetails>(-1);
403 }
404 }
405
MapNetErrorToGoAwayStatus(Error err)406 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
407 switch (err) {
408 case OK:
409 return GOAWAY_NO_ERROR;
410 case ERR_SPDY_PROTOCOL_ERROR:
411 return GOAWAY_PROTOCOL_ERROR;
412 case ERR_SPDY_FLOW_CONTROL_ERROR:
413 return GOAWAY_FLOW_CONTROL_ERROR;
414 case ERR_SPDY_FRAME_SIZE_ERROR:
415 return GOAWAY_FRAME_SIZE_ERROR;
416 case ERR_SPDY_COMPRESSION_ERROR:
417 return GOAWAY_COMPRESSION_ERROR;
418 case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
419 return GOAWAY_INADEQUATE_SECURITY;
420 default:
421 return GOAWAY_PROTOCOL_ERROR;
422 }
423 }
424
SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock & headers,SpdyMajorVersion protocol_version,SpdyHeaderBlock * request_headers,SpdyHeaderBlock * response_headers)425 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
426 SpdyMajorVersion protocol_version,
427 SpdyHeaderBlock* request_headers,
428 SpdyHeaderBlock* response_headers) {
429 DCHECK(response_headers);
430 DCHECK(request_headers);
431 for (SpdyHeaderBlock::const_iterator it = headers.begin();
432 it != headers.end();
433 ++it) {
434 SpdyHeaderBlock* to_insert = response_headers;
435 if (protocol_version == SPDY2) {
436 if (it->first == "url")
437 to_insert = request_headers;
438 } else {
439 const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
440 static const char* scheme = ":scheme";
441 static const char* path = ":path";
442 if (it->first == host || it->first == scheme || it->first == path)
443 to_insert = request_headers;
444 }
445 to_insert->insert(*it);
446 }
447 }
448
SpdyStreamRequest()449 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
450 Reset();
451 }
452
~SpdyStreamRequest()453 SpdyStreamRequest::~SpdyStreamRequest() {
454 CancelRequest();
455 }
456
StartRequest(SpdyStreamType type,const base::WeakPtr<SpdySession> & session,const GURL & url,RequestPriority priority,const BoundNetLog & net_log,const CompletionCallback & callback)457 int SpdyStreamRequest::StartRequest(
458 SpdyStreamType type,
459 const base::WeakPtr<SpdySession>& session,
460 const GURL& url,
461 RequestPriority priority,
462 const BoundNetLog& net_log,
463 const CompletionCallback& callback) {
464 DCHECK(session);
465 DCHECK(!session_);
466 DCHECK(!stream_);
467 DCHECK(callback_.is_null());
468
469 type_ = type;
470 session_ = session;
471 url_ = url;
472 priority_ = priority;
473 net_log_ = net_log;
474 callback_ = callback;
475
476 base::WeakPtr<SpdyStream> stream;
477 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
478 if (rv == OK) {
479 Reset();
480 stream_ = stream;
481 }
482 return rv;
483 }
484
CancelRequest()485 void SpdyStreamRequest::CancelRequest() {
486 if (session_)
487 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
488 Reset();
489 // Do this to cancel any pending CompleteStreamRequest() tasks.
490 weak_ptr_factory_.InvalidateWeakPtrs();
491 }
492
ReleaseStream()493 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
494 DCHECK(!session_);
495 base::WeakPtr<SpdyStream> stream = stream_;
496 DCHECK(stream);
497 Reset();
498 return stream;
499 }
500
OnRequestCompleteSuccess(const base::WeakPtr<SpdyStream> & stream)501 void SpdyStreamRequest::OnRequestCompleteSuccess(
502 const base::WeakPtr<SpdyStream>& stream) {
503 DCHECK(session_);
504 DCHECK(!stream_);
505 DCHECK(!callback_.is_null());
506 CompletionCallback callback = callback_;
507 Reset();
508 DCHECK(stream);
509 stream_ = stream;
510 callback.Run(OK);
511 }
512
OnRequestCompleteFailure(int rv)513 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
514 DCHECK(session_);
515 DCHECK(!stream_);
516 DCHECK(!callback_.is_null());
517 CompletionCallback callback = callback_;
518 Reset();
519 DCHECK_NE(rv, OK);
520 callback.Run(rv);
521 }
522
Reset()523 void SpdyStreamRequest::Reset() {
524 type_ = SPDY_BIDIRECTIONAL_STREAM;
525 session_.reset();
526 stream_.reset();
527 url_ = GURL();
528 priority_ = MINIMUM_PRIORITY;
529 net_log_ = BoundNetLog();
530 callback_.Reset();
531 }
532
ActiveStreamInfo()533 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
534 : stream(NULL),
535 waiting_for_syn_reply(false) {}
536
ActiveStreamInfo(SpdyStream * stream)537 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
538 : stream(stream),
539 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
540 }
541
~ActiveStreamInfo()542 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
543
PushedStreamInfo()544 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
545
PushedStreamInfo(SpdyStreamId stream_id,base::TimeTicks creation_time)546 SpdySession::PushedStreamInfo::PushedStreamInfo(
547 SpdyStreamId stream_id,
548 base::TimeTicks creation_time)
549 : stream_id(stream_id),
550 creation_time(creation_time) {}
551
~PushedStreamInfo()552 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
553
554 // static
CanPool(TransportSecurityState * transport_security_state,const SSLInfo & ssl_info,const std::string & old_hostname,const std::string & new_hostname)555 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
556 const SSLInfo& ssl_info,
557 const std::string& old_hostname,
558 const std::string& new_hostname) {
559 // Pooling is prohibited if the server cert is not valid for the new domain,
560 // and for connections on which client certs were sent. It is also prohibited
561 // when channel ID was sent if the hosts are from different eTLDs+1.
562 if (IsCertStatusError(ssl_info.cert_status))
563 return false;
564
565 if (ssl_info.client_cert_sent)
566 return false;
567
568 if (ssl_info.channel_id_sent &&
569 ChannelIDService::GetDomainForHost(new_hostname) !=
570 ChannelIDService::GetDomainForHost(old_hostname)) {
571 return false;
572 }
573
574 bool unused = false;
575 if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
576 return false;
577
578 std::string pinning_failure_log;
579 if (!transport_security_state->CheckPublicKeyPins(
580 new_hostname,
581 ssl_info.is_issued_by_known_root,
582 ssl_info.public_key_hashes,
583 &pinning_failure_log)) {
584 return false;
585 }
586
587 return true;
588 }
589
SpdySession(const SpdySessionKey & spdy_session_key,const base::WeakPtr<HttpServerProperties> & http_server_properties,TransportSecurityState * transport_security_state,bool verify_domain_authentication,bool enable_sending_initial_data,bool enable_compression,bool enable_ping_based_connection_checking,NextProto default_protocol,size_t stream_initial_recv_window_size,size_t initial_max_concurrent_streams,size_t max_concurrent_streams_limit,TimeFunc time_func,const HostPortPair & trusted_spdy_proxy,NetLog * net_log)590 SpdySession::SpdySession(
591 const SpdySessionKey& spdy_session_key,
592 const base::WeakPtr<HttpServerProperties>& http_server_properties,
593 TransportSecurityState* transport_security_state,
594 bool verify_domain_authentication,
595 bool enable_sending_initial_data,
596 bool enable_compression,
597 bool enable_ping_based_connection_checking,
598 NextProto default_protocol,
599 size_t stream_initial_recv_window_size,
600 size_t initial_max_concurrent_streams,
601 size_t max_concurrent_streams_limit,
602 TimeFunc time_func,
603 const HostPortPair& trusted_spdy_proxy,
604 NetLog* net_log)
605 : in_io_loop_(false),
606 spdy_session_key_(spdy_session_key),
607 pool_(NULL),
608 http_server_properties_(http_server_properties),
609 transport_security_state_(transport_security_state),
610 read_buffer_(new IOBuffer(kReadBufferSize)),
611 stream_hi_water_mark_(kFirstStreamId),
612 last_accepted_push_stream_id_(0),
613 num_pushed_streams_(0u),
614 num_active_pushed_streams_(0u),
615 in_flight_write_frame_type_(DATA),
616 in_flight_write_frame_size_(0),
617 is_secure_(false),
618 certificate_error_code_(OK),
619 availability_state_(STATE_AVAILABLE),
620 read_state_(READ_STATE_DO_READ),
621 write_state_(WRITE_STATE_IDLE),
622 error_on_close_(OK),
623 max_concurrent_streams_(initial_max_concurrent_streams == 0
624 ? kInitialMaxConcurrentStreams
625 : initial_max_concurrent_streams),
626 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
627 ? kMaxConcurrentStreamLimit
628 : max_concurrent_streams_limit),
629 max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
630 streams_initiated_count_(0),
631 streams_pushed_count_(0),
632 streams_pushed_and_claimed_count_(0),
633 streams_abandoned_count_(0),
634 total_bytes_received_(0),
635 sent_settings_(false),
636 received_settings_(false),
637 stalled_streams_(0),
638 pings_in_flight_(0),
639 next_ping_id_(1),
640 last_activity_time_(time_func()),
641 last_compressed_frame_len_(0),
642 check_ping_status_pending_(false),
643 send_connection_header_prefix_(false),
644 flow_control_state_(FLOW_CONTROL_NONE),
645 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
646 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
647 ? kDefaultInitialRecvWindowSize
648 : stream_initial_recv_window_size),
649 session_send_window_size_(0),
650 session_recv_window_size_(0),
651 session_unacked_recv_window_bytes_(0),
652 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
653 verify_domain_authentication_(verify_domain_authentication),
654 enable_sending_initial_data_(enable_sending_initial_data),
655 enable_compression_(enable_compression),
656 enable_ping_based_connection_checking_(
657 enable_ping_based_connection_checking),
658 protocol_(default_protocol),
659 connection_at_risk_of_loss_time_(
660 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
661 hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
662 trusted_spdy_proxy_(trusted_spdy_proxy),
663 time_func_(time_func),
664 weak_factory_(this) {
665 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
666 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
667 DCHECK(HttpStreamFactory::spdy_enabled());
668 net_log_.BeginEvent(
669 NetLog::TYPE_SPDY_SESSION,
670 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
671 next_unclaimed_push_stream_sweep_time_ = time_func_() +
672 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
673 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
674 }
675
~SpdySession()676 SpdySession::~SpdySession() {
677 CHECK(!in_io_loop_);
678 DcheckDraining();
679
680 // TODO(akalin): Check connection->is_initialized() instead. This
681 // requires re-working CreateFakeSpdySession(), though.
682 DCHECK(connection_->socket());
683 // With SPDY we can't recycle sockets.
684 connection_->socket()->Disconnect();
685
686 RecordHistograms();
687
688 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
689 }
690
InitializeWithSocket(scoped_ptr<ClientSocketHandle> connection,SpdySessionPool * pool,bool is_secure,int certificate_error_code)691 void SpdySession::InitializeWithSocket(
692 scoped_ptr<ClientSocketHandle> connection,
693 SpdySessionPool* pool,
694 bool is_secure,
695 int certificate_error_code) {
696 CHECK(!in_io_loop_);
697 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
698 DCHECK_EQ(read_state_, READ_STATE_DO_READ);
699 DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
700 DCHECK(!connection_);
701
702 DCHECK(certificate_error_code == OK ||
703 certificate_error_code < ERR_IO_PENDING);
704 // TODO(akalin): Check connection->is_initialized() instead. This
705 // requires re-working CreateFakeSpdySession(), though.
706 DCHECK(connection->socket());
707
708 base::StatsCounter spdy_sessions("spdy.sessions");
709 spdy_sessions.Increment();
710
711 connection_ = connection.Pass();
712 is_secure_ = is_secure;
713 certificate_error_code_ = certificate_error_code;
714
715 NextProto protocol_negotiated =
716 connection_->socket()->GetNegotiatedProtocol();
717 if (protocol_negotiated != kProtoUnknown) {
718 protocol_ = protocol_negotiated;
719 }
720 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
721 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
722
723 if (protocol_ == kProtoSPDY4)
724 send_connection_header_prefix_ = true;
725
726 if (protocol_ >= kProtoSPDY31) {
727 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
728 session_send_window_size_ = kSpdySessionInitialWindowSize;
729 session_recv_window_size_ = kSpdySessionInitialWindowSize;
730 } else if (protocol_ >= kProtoSPDY3) {
731 flow_control_state_ = FLOW_CONTROL_STREAM;
732 } else {
733 flow_control_state_ = FLOW_CONTROL_NONE;
734 }
735
736 buffered_spdy_framer_.reset(
737 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
738 enable_compression_));
739 buffered_spdy_framer_->set_visitor(this);
740 buffered_spdy_framer_->set_debug_visitor(this);
741 UMA_HISTOGRAM_ENUMERATION(
742 "Net.SpdyVersion2",
743 protocol_ - kProtoSPDYMinimumVersion,
744 kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
745
746 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
747 base::Bind(&NetLogSpdyInitializedCallback,
748 connection_->socket()->NetLog().source(),
749 protocol_));
750
751 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
752 connection_->AddHigherLayeredPool(this);
753 if (enable_sending_initial_data_)
754 SendInitialData();
755 pool_ = pool;
756
757 // Bootstrap the read loop.
758 base::MessageLoop::current()->PostTask(
759 FROM_HERE,
760 base::Bind(&SpdySession::PumpReadLoop,
761 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
762 }
763
VerifyDomainAuthentication(const std::string & domain)764 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
765 if (!verify_domain_authentication_)
766 return true;
767
768 if (availability_state_ == STATE_DRAINING)
769 return false;
770
771 SSLInfo ssl_info;
772 bool was_npn_negotiated;
773 NextProto protocol_negotiated = kProtoUnknown;
774 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
775 return true; // This is not a secure session, so all domains are okay.
776
777 return CanPool(transport_security_state_, ssl_info,
778 host_port_pair().host(), domain);
779 }
780
GetPushStream(const GURL & url,base::WeakPtr<SpdyStream> * stream,const BoundNetLog & stream_net_log)781 int SpdySession::GetPushStream(
782 const GURL& url,
783 base::WeakPtr<SpdyStream>* stream,
784 const BoundNetLog& stream_net_log) {
785 CHECK(!in_io_loop_);
786
787 stream->reset();
788
789 if (availability_state_ == STATE_DRAINING)
790 return ERR_CONNECTION_CLOSED;
791
792 Error err = TryAccessStream(url);
793 if (err != OK)
794 return err;
795
796 *stream = GetActivePushStream(url);
797 if (*stream) {
798 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
799 streams_pushed_and_claimed_count_++;
800 }
801 return OK;
802 }
803
804 // {,Try}CreateStream() and TryAccessStream() can be called with
805 // |in_io_loop_| set if a stream is being created in response to
806 // another being closed due to received data.
807
TryAccessStream(const GURL & url)808 Error SpdySession::TryAccessStream(const GURL& url) {
809 if (is_secure_ && certificate_error_code_ != OK &&
810 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
811 RecordProtocolErrorHistogram(
812 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
813 DoDrainSession(
814 static_cast<Error>(certificate_error_code_),
815 "Tried to get SPDY stream for secure content over an unauthenticated "
816 "session.");
817 return ERR_SPDY_PROTOCOL_ERROR;
818 }
819 return OK;
820 }
821
TryCreateStream(const base::WeakPtr<SpdyStreamRequest> & request,base::WeakPtr<SpdyStream> * stream)822 int SpdySession::TryCreateStream(
823 const base::WeakPtr<SpdyStreamRequest>& request,
824 base::WeakPtr<SpdyStream>* stream) {
825 DCHECK(request);
826
827 if (availability_state_ == STATE_GOING_AWAY)
828 return ERR_FAILED;
829
830 if (availability_state_ == STATE_DRAINING)
831 return ERR_CONNECTION_CLOSED;
832
833 Error err = TryAccessStream(request->url());
834 if (err != OK)
835 return err;
836
837 if (!max_concurrent_streams_ ||
838 (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
839 max_concurrent_streams_)) {
840 return CreateStream(*request, stream);
841 }
842
843 stalled_streams_++;
844 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
845 RequestPriority priority = request->priority();
846 CHECK_GE(priority, MINIMUM_PRIORITY);
847 CHECK_LE(priority, MAXIMUM_PRIORITY);
848 pending_create_stream_queues_[priority].push_back(request);
849 return ERR_IO_PENDING;
850 }
851
CreateStream(const SpdyStreamRequest & request,base::WeakPtr<SpdyStream> * stream)852 int SpdySession::CreateStream(const SpdyStreamRequest& request,
853 base::WeakPtr<SpdyStream>* stream) {
854 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
855 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
856
857 if (availability_state_ == STATE_GOING_AWAY)
858 return ERR_FAILED;
859
860 if (availability_state_ == STATE_DRAINING)
861 return ERR_CONNECTION_CLOSED;
862
863 Error err = TryAccessStream(request.url());
864 if (err != OK) {
865 // This should have been caught in TryCreateStream().
866 NOTREACHED();
867 return err;
868 }
869
870 DCHECK(connection_->socket());
871 DCHECK(connection_->socket()->IsConnected());
872 if (connection_->socket()) {
873 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
874 connection_->socket()->IsConnected());
875 if (!connection_->socket()->IsConnected()) {
876 DoDrainSession(
877 ERR_CONNECTION_CLOSED,
878 "Tried to create SPDY stream for a closed socket connection.");
879 return ERR_CONNECTION_CLOSED;
880 }
881 }
882
883 scoped_ptr<SpdyStream> new_stream(
884 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
885 request.priority(),
886 stream_initial_send_window_size_,
887 stream_initial_recv_window_size_,
888 request.net_log()));
889 *stream = new_stream->GetWeakPtr();
890 InsertCreatedStream(new_stream.Pass());
891
892 UMA_HISTOGRAM_CUSTOM_COUNTS(
893 "Net.SpdyPriorityCount",
894 static_cast<int>(request.priority()), 0, 10, 11);
895
896 return OK;
897 }
898
CancelStreamRequest(const base::WeakPtr<SpdyStreamRequest> & request)899 void SpdySession::CancelStreamRequest(
900 const base::WeakPtr<SpdyStreamRequest>& request) {
901 DCHECK(request);
902 RequestPriority priority = request->priority();
903 CHECK_GE(priority, MINIMUM_PRIORITY);
904 CHECK_LE(priority, MAXIMUM_PRIORITY);
905
906 #if DCHECK_IS_ON
907 // |request| should not be in a queue not matching its priority.
908 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
909 if (priority == i)
910 continue;
911 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
912 DCHECK(std::find_if(queue->begin(),
913 queue->end(),
914 RequestEquals(request)) == queue->end());
915 }
916 #endif
917
918 PendingStreamRequestQueue* queue =
919 &pending_create_stream_queues_[priority];
920 // Remove |request| from |queue| while preserving the order of the
921 // other elements.
922 PendingStreamRequestQueue::iterator it =
923 std::find_if(queue->begin(), queue->end(), RequestEquals(request));
924 // The request may already be removed if there's a
925 // CompleteStreamRequest() in flight.
926 if (it != queue->end()) {
927 it = queue->erase(it);
928 // |request| should be in the queue at most once, and if it is
929 // present, should not be pending completion.
930 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
931 queue->end());
932 }
933 }
934
GetNextPendingStreamRequest()935 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
936 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
937 if (pending_create_stream_queues_[j].empty())
938 continue;
939
940 base::WeakPtr<SpdyStreamRequest> pending_request =
941 pending_create_stream_queues_[j].front();
942 DCHECK(pending_request);
943 pending_create_stream_queues_[j].pop_front();
944 return pending_request;
945 }
946 return base::WeakPtr<SpdyStreamRequest>();
947 }
948
ProcessPendingStreamRequests()949 void SpdySession::ProcessPendingStreamRequests() {
950 // Like |max_concurrent_streams_|, 0 means infinite for
951 // |max_requests_to_process|.
952 size_t max_requests_to_process = 0;
953 if (max_concurrent_streams_ != 0) {
954 max_requests_to_process =
955 max_concurrent_streams_ -
956 (active_streams_.size() + created_streams_.size());
957 }
958 for (size_t i = 0;
959 max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
960 base::WeakPtr<SpdyStreamRequest> pending_request =
961 GetNextPendingStreamRequest();
962 if (!pending_request)
963 break;
964
965 // Note that this post can race with other stream creations, and it's
966 // possible that the un-stalled stream will be stalled again if it loses.
967 // TODO(jgraettinger): Provide stronger ordering guarantees.
968 base::MessageLoop::current()->PostTask(
969 FROM_HERE,
970 base::Bind(&SpdySession::CompleteStreamRequest,
971 weak_factory_.GetWeakPtr(),
972 pending_request));
973 }
974 }
975
AddPooledAlias(const SpdySessionKey & alias_key)976 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
977 pooled_aliases_.insert(alias_key);
978 }
979
GetProtocolVersion() const980 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
981 DCHECK(buffered_spdy_framer_.get());
982 return buffered_spdy_framer_->protocol_version();
983 }
984
HasAcceptableTransportSecurity() const985 bool SpdySession::HasAcceptableTransportSecurity() const {
986 // If we're not even using TLS, we have no standards to meet.
987 if (!is_secure_) {
988 return true;
989 }
990
991 // We don't enforce transport security standards for older SPDY versions.
992 if (GetProtocolVersion() < SPDY4) {
993 return true;
994 }
995
996 SSLInfo ssl_info;
997 CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
998
999 // HTTP/2 requires TLS 1.2+
1000 if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
1001 SSL_CONNECTION_VERSION_TLS1_2) {
1002 return false;
1003 }
1004
1005 if (!IsSecureTLSCipherSuite(
1006 SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
1007 return false;
1008 }
1009
1010 return true;
1011 }
1012
GetWeakPtr()1013 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
1014 return weak_factory_.GetWeakPtr();
1015 }
1016
CloseOneIdleConnection()1017 bool SpdySession::CloseOneIdleConnection() {
1018 CHECK(!in_io_loop_);
1019 DCHECK(pool_);
1020 if (active_streams_.empty()) {
1021 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1022 }
1023 // Return false as the socket wasn't immediately closed.
1024 return false;
1025 }
1026
EnqueueStreamWrite(const base::WeakPtr<SpdyStream> & stream,SpdyFrameType frame_type,scoped_ptr<SpdyBufferProducer> producer)1027 void SpdySession::EnqueueStreamWrite(
1028 const base::WeakPtr<SpdyStream>& stream,
1029 SpdyFrameType frame_type,
1030 scoped_ptr<SpdyBufferProducer> producer) {
1031 DCHECK(frame_type == HEADERS ||
1032 frame_type == DATA ||
1033 frame_type == CREDENTIAL ||
1034 frame_type == SYN_STREAM);
1035 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1036 }
1037
CreateSynStream(SpdyStreamId stream_id,RequestPriority priority,SpdyControlFlags flags,const SpdyHeaderBlock & block)1038 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1039 SpdyStreamId stream_id,
1040 RequestPriority priority,
1041 SpdyControlFlags flags,
1042 const SpdyHeaderBlock& block) {
1043 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1044 CHECK(it != active_streams_.end());
1045 CHECK_EQ(it->second.stream->stream_id(), stream_id);
1046
1047 SendPrefacePingIfNoneInFlight();
1048
1049 DCHECK(buffered_spdy_framer_.get());
1050 SpdyPriority spdy_priority =
1051 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1052
1053 scoped_ptr<SpdyFrame> syn_frame;
1054 // TODO(hkhalil): Avoid copy of |block|.
1055 if (GetProtocolVersion() <= SPDY3) {
1056 SpdySynStreamIR syn_stream(stream_id);
1057 syn_stream.set_associated_to_stream_id(0);
1058 syn_stream.set_priority(spdy_priority);
1059 syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1060 syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1061 syn_stream.set_name_value_block(block);
1062 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1063 } else {
1064 SpdyHeadersIR headers(stream_id);
1065 headers.set_priority(spdy_priority);
1066 headers.set_has_priority(true);
1067 headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1068 headers.set_name_value_block(block);
1069 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1070 }
1071
1072 base::StatsCounter spdy_requests("spdy.requests");
1073 spdy_requests.Increment();
1074 streams_initiated_count_++;
1075
1076 if (net_log().IsLogging()) {
1077 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1078 base::Bind(&NetLogSpdySynStreamSentCallback,
1079 &block,
1080 (flags & CONTROL_FLAG_FIN) != 0,
1081 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1082 spdy_priority,
1083 stream_id));
1084 }
1085
1086 return syn_frame.Pass();
1087 }
1088
CreateDataBuffer(SpdyStreamId stream_id,IOBuffer * data,int len,SpdyDataFlags flags)1089 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1090 IOBuffer* data,
1091 int len,
1092 SpdyDataFlags flags) {
1093 if (availability_state_ == STATE_DRAINING) {
1094 return scoped_ptr<SpdyBuffer>();
1095 }
1096
1097 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1098 CHECK(it != active_streams_.end());
1099 SpdyStream* stream = it->second.stream;
1100 CHECK_EQ(stream->stream_id(), stream_id);
1101
1102 if (len < 0) {
1103 NOTREACHED();
1104 return scoped_ptr<SpdyBuffer>();
1105 }
1106
1107 int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1108
1109 bool send_stalled_by_stream =
1110 (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1111 (stream->send_window_size() <= 0);
1112 bool send_stalled_by_session = IsSendStalled();
1113
1114 // NOTE: There's an enum of the same name in histograms.xml.
1115 enum SpdyFrameFlowControlState {
1116 SEND_NOT_STALLED,
1117 SEND_STALLED_BY_STREAM,
1118 SEND_STALLED_BY_SESSION,
1119 SEND_STALLED_BY_STREAM_AND_SESSION,
1120 };
1121
1122 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1123 if (send_stalled_by_stream) {
1124 if (send_stalled_by_session) {
1125 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1126 } else {
1127 frame_flow_control_state = SEND_STALLED_BY_STREAM;
1128 }
1129 } else if (send_stalled_by_session) {
1130 frame_flow_control_state = SEND_STALLED_BY_SESSION;
1131 }
1132
1133 if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1134 UMA_HISTOGRAM_ENUMERATION(
1135 "Net.SpdyFrameStreamFlowControlState",
1136 frame_flow_control_state,
1137 SEND_STALLED_BY_STREAM + 1);
1138 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1139 UMA_HISTOGRAM_ENUMERATION(
1140 "Net.SpdyFrameStreamAndSessionFlowControlState",
1141 frame_flow_control_state,
1142 SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1143 }
1144
1145 // Obey send window size of the stream if stream flow control is
1146 // enabled.
1147 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1148 if (send_stalled_by_stream) {
1149 stream->set_send_stalled_by_flow_control(true);
1150 // Even though we're currently stalled only by the stream, we
1151 // might end up being stalled by the session also.
1152 QueueSendStalledStream(*stream);
1153 net_log().AddEvent(
1154 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1155 NetLog::IntegerCallback("stream_id", stream_id));
1156 return scoped_ptr<SpdyBuffer>();
1157 }
1158
1159 effective_len = std::min(effective_len, stream->send_window_size());
1160 }
1161
1162 // Obey send window size of the session if session flow control is
1163 // enabled.
1164 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1165 if (send_stalled_by_session) {
1166 stream->set_send_stalled_by_flow_control(true);
1167 QueueSendStalledStream(*stream);
1168 net_log().AddEvent(
1169 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1170 NetLog::IntegerCallback("stream_id", stream_id));
1171 return scoped_ptr<SpdyBuffer>();
1172 }
1173
1174 effective_len = std::min(effective_len, session_send_window_size_);
1175 }
1176
1177 DCHECK_GE(effective_len, 0);
1178
1179 // Clear FIN flag if only some of the data will be in the data
1180 // frame.
1181 if (effective_len < len)
1182 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1183
1184 if (net_log().IsLogging()) {
1185 net_log().AddEvent(
1186 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1187 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1188 (flags & DATA_FLAG_FIN) != 0));
1189 }
1190
1191 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1192 if (effective_len > 0)
1193 SendPrefacePingIfNoneInFlight();
1194
1195 // TODO(mbelshe): reduce memory copies here.
1196 DCHECK(buffered_spdy_framer_.get());
1197 scoped_ptr<SpdyFrame> frame(
1198 buffered_spdy_framer_->CreateDataFrame(
1199 stream_id, data->data(),
1200 static_cast<uint32>(effective_len), flags));
1201
1202 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1203
1204 // Send window size is based on payload size, so nothing to do if this is
1205 // just a FIN with no payload.
1206 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
1207 effective_len != 0) {
1208 DecreaseSendWindowSize(static_cast<int32>(effective_len));
1209 data_buffer->AddConsumeCallback(
1210 base::Bind(&SpdySession::OnWriteBufferConsumed,
1211 weak_factory_.GetWeakPtr(),
1212 static_cast<size_t>(effective_len)));
1213 }
1214
1215 return data_buffer.Pass();
1216 }
1217
CloseActiveStream(SpdyStreamId stream_id,int status)1218 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1219 DCHECK_NE(stream_id, 0u);
1220
1221 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1222 if (it == active_streams_.end()) {
1223 NOTREACHED();
1224 return;
1225 }
1226
1227 CloseActiveStreamIterator(it, status);
1228 }
1229
CloseCreatedStream(const base::WeakPtr<SpdyStream> & stream,int status)1230 void SpdySession::CloseCreatedStream(
1231 const base::WeakPtr<SpdyStream>& stream, int status) {
1232 DCHECK_EQ(stream->stream_id(), 0u);
1233
1234 CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1235 if (it == created_streams_.end()) {
1236 NOTREACHED();
1237 return;
1238 }
1239
1240 CloseCreatedStreamIterator(it, status);
1241 }
1242
ResetStream(SpdyStreamId stream_id,SpdyRstStreamStatus status,const std::string & description)1243 void SpdySession::ResetStream(SpdyStreamId stream_id,
1244 SpdyRstStreamStatus status,
1245 const std::string& description) {
1246 DCHECK_NE(stream_id, 0u);
1247
1248 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1249 if (it == active_streams_.end()) {
1250 NOTREACHED();
1251 return;
1252 }
1253
1254 ResetStreamIterator(it, status, description);
1255 }
1256
IsStreamActive(SpdyStreamId stream_id) const1257 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1258 return ContainsKey(active_streams_, stream_id);
1259 }
1260
GetLoadState() const1261 LoadState SpdySession::GetLoadState() const {
1262 // Just report that we're idle since the session could be doing
1263 // many things concurrently.
1264 return LOAD_STATE_IDLE;
1265 }
1266
CloseActiveStreamIterator(ActiveStreamMap::iterator it,int status)1267 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1268 int status) {
1269 // TODO(mbelshe): We should send a RST_STREAM control frame here
1270 // so that the server can cancel a large send.
1271
1272 scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1273 active_streams_.erase(it);
1274
1275 // TODO(akalin): When SpdyStream was ref-counted (and
1276 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1277 // was only done when status was not OK. This meant that pushed
1278 // streams can still be claimed after they're closed. This is
1279 // probably something that we still want to support, although server
1280 // push is hardly used. Write tests for this and fix this. (See
1281 // http://crbug.com/261712 .)
1282 if (owned_stream->type() == SPDY_PUSH_STREAM) {
1283 unclaimed_pushed_streams_.erase(owned_stream->url());
1284 num_pushed_streams_--;
1285 if (!owned_stream->IsReservedRemote())
1286 num_active_pushed_streams_--;
1287 }
1288
1289 DeleteStream(owned_stream.Pass(), status);
1290 MaybeFinishGoingAway();
1291
1292 // If there are no active streams and the socket pool is stalled, close the
1293 // session to free up a socket slot.
1294 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1295 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1296 }
1297 }
1298
CloseCreatedStreamIterator(CreatedStreamSet::iterator it,int status)1299 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1300 int status) {
1301 scoped_ptr<SpdyStream> owned_stream(*it);
1302 created_streams_.erase(it);
1303 DeleteStream(owned_stream.Pass(), status);
1304 }
1305
ResetStreamIterator(ActiveStreamMap::iterator it,SpdyRstStreamStatus status,const std::string & description)1306 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1307 SpdyRstStreamStatus status,
1308 const std::string& description) {
1309 // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1310 // may close us.
1311 SpdyStreamId stream_id = it->first;
1312 RequestPriority priority = it->second.stream->priority();
1313 EnqueueResetStreamFrame(stream_id, priority, status, description);
1314
1315 // Removes any pending writes for the stream except for possibly an
1316 // in-flight one.
1317 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1318 }
1319
EnqueueResetStreamFrame(SpdyStreamId stream_id,RequestPriority priority,SpdyRstStreamStatus status,const std::string & description)1320 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1321 RequestPriority priority,
1322 SpdyRstStreamStatus status,
1323 const std::string& description) {
1324 DCHECK_NE(stream_id, 0u);
1325
1326 net_log().AddEvent(
1327 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1328 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1329
1330 DCHECK(buffered_spdy_framer_.get());
1331 scoped_ptr<SpdyFrame> rst_frame(
1332 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1333
1334 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1335 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1336 }
1337
PumpReadLoop(ReadState expected_read_state,int result)1338 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1339 CHECK(!in_io_loop_);
1340 if (availability_state_ == STATE_DRAINING) {
1341 return;
1342 }
1343 ignore_result(DoReadLoop(expected_read_state, result));
1344 }
1345
DoReadLoop(ReadState expected_read_state,int result)1346 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1347 CHECK(!in_io_loop_);
1348 CHECK_EQ(read_state_, expected_read_state);
1349
1350 in_io_loop_ = true;
1351
1352 int bytes_read_without_yielding = 0;
1353
1354 // Loop until the session is draining, the read becomes blocked, or
1355 // the read limit is exceeded.
1356 while (true) {
1357 switch (read_state_) {
1358 case READ_STATE_DO_READ:
1359 CHECK_EQ(result, OK);
1360 result = DoRead();
1361 break;
1362 case READ_STATE_DO_READ_COMPLETE:
1363 if (result > 0)
1364 bytes_read_without_yielding += result;
1365 result = DoReadComplete(result);
1366 break;
1367 default:
1368 NOTREACHED() << "read_state_: " << read_state_;
1369 break;
1370 }
1371
1372 if (availability_state_ == STATE_DRAINING)
1373 break;
1374
1375 if (result == ERR_IO_PENDING)
1376 break;
1377
1378 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1379 read_state_ = READ_STATE_DO_READ;
1380 base::MessageLoop::current()->PostTask(
1381 FROM_HERE,
1382 base::Bind(&SpdySession::PumpReadLoop,
1383 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1384 result = ERR_IO_PENDING;
1385 break;
1386 }
1387 }
1388
1389 CHECK(in_io_loop_);
1390 in_io_loop_ = false;
1391
1392 return result;
1393 }
1394
DoRead()1395 int SpdySession::DoRead() {
1396 CHECK(in_io_loop_);
1397
1398 CHECK(connection_);
1399 CHECK(connection_->socket());
1400 read_state_ = READ_STATE_DO_READ_COMPLETE;
1401 return connection_->socket()->Read(
1402 read_buffer_.get(),
1403 kReadBufferSize,
1404 base::Bind(&SpdySession::PumpReadLoop,
1405 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1406 }
1407
DoReadComplete(int result)1408 int SpdySession::DoReadComplete(int result) {
1409 CHECK(in_io_loop_);
1410
1411 // Parse a frame. For now this code requires that the frame fit into our
1412 // buffer (kReadBufferSize).
1413 // TODO(mbelshe): support arbitrarily large frames!
1414
1415 if (result == 0) {
1416 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1417 total_bytes_received_, 1, 100000000, 50);
1418 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1419
1420 return ERR_CONNECTION_CLOSED;
1421 }
1422
1423 if (result < 0) {
1424 DoDrainSession(static_cast<Error>(result), "result is < 0.");
1425 return result;
1426 }
1427 CHECK_LE(result, kReadBufferSize);
1428 total_bytes_received_ += result;
1429
1430 last_activity_time_ = time_func_();
1431
1432 DCHECK(buffered_spdy_framer_.get());
1433 char* data = read_buffer_->data();
1434 while (result > 0) {
1435 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1436 result -= bytes_processed;
1437 data += bytes_processed;
1438
1439 if (availability_state_ == STATE_DRAINING) {
1440 return ERR_CONNECTION_CLOSED;
1441 }
1442
1443 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1444 }
1445
1446 read_state_ = READ_STATE_DO_READ;
1447 return OK;
1448 }
1449
PumpWriteLoop(WriteState expected_write_state,int result)1450 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1451 CHECK(!in_io_loop_);
1452 DCHECK_EQ(write_state_, expected_write_state);
1453
1454 DoWriteLoop(expected_write_state, result);
1455
1456 if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1457 write_queue_.IsEmpty()) {
1458 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|.
1459 return;
1460 }
1461 }
1462
DoWriteLoop(WriteState expected_write_state,int result)1463 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1464 CHECK(!in_io_loop_);
1465 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1466 DCHECK_EQ(write_state_, expected_write_state);
1467
1468 in_io_loop_ = true;
1469
1470 // Loop until the session is closed or the write becomes blocked.
1471 while (true) {
1472 switch (write_state_) {
1473 case WRITE_STATE_DO_WRITE:
1474 DCHECK_EQ(result, OK);
1475 result = DoWrite();
1476 break;
1477 case WRITE_STATE_DO_WRITE_COMPLETE:
1478 result = DoWriteComplete(result);
1479 break;
1480 case WRITE_STATE_IDLE:
1481 default:
1482 NOTREACHED() << "write_state_: " << write_state_;
1483 break;
1484 }
1485
1486 if (write_state_ == WRITE_STATE_IDLE) {
1487 DCHECK_EQ(result, ERR_IO_PENDING);
1488 break;
1489 }
1490
1491 if (result == ERR_IO_PENDING)
1492 break;
1493 }
1494
1495 CHECK(in_io_loop_);
1496 in_io_loop_ = false;
1497
1498 return result;
1499 }
1500
DoWrite()1501 int SpdySession::DoWrite() {
1502 CHECK(in_io_loop_);
1503
1504 DCHECK(buffered_spdy_framer_);
1505 if (in_flight_write_) {
1506 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1507 } else {
1508 // Grab the next frame to send.
1509 SpdyFrameType frame_type = DATA;
1510 scoped_ptr<SpdyBufferProducer> producer;
1511 base::WeakPtr<SpdyStream> stream;
1512 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1513 write_state_ = WRITE_STATE_IDLE;
1514 return ERR_IO_PENDING;
1515 }
1516
1517 if (stream.get())
1518 CHECK(!stream->IsClosed());
1519
1520 // Activate the stream only when sending the SYN_STREAM frame to
1521 // guarantee monotonically-increasing stream IDs.
1522 if (frame_type == SYN_STREAM) {
1523 CHECK(stream.get());
1524 CHECK_EQ(stream->stream_id(), 0u);
1525 scoped_ptr<SpdyStream> owned_stream =
1526 ActivateCreatedStream(stream.get());
1527 InsertActivatedStream(owned_stream.Pass());
1528
1529 if (stream_hi_water_mark_ > kLastStreamId) {
1530 CHECK_EQ(stream->stream_id(), kLastStreamId);
1531 // We've exhausted the stream ID space, and no new streams may be
1532 // created after this one.
1533 MakeUnavailable();
1534 StartGoingAway(kLastStreamId, ERR_ABORTED);
1535 }
1536 }
1537
1538 in_flight_write_ = producer->ProduceBuffer();
1539 if (!in_flight_write_) {
1540 NOTREACHED();
1541 return ERR_UNEXPECTED;
1542 }
1543 in_flight_write_frame_type_ = frame_type;
1544 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1545 DCHECK_GE(in_flight_write_frame_size_,
1546 buffered_spdy_framer_->GetFrameMinimumSize());
1547 in_flight_write_stream_ = stream;
1548 }
1549
1550 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1551
1552 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1553 // with Socket implementations that don't store their IOBuffer
1554 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1555 scoped_refptr<IOBuffer> write_io_buffer =
1556 in_flight_write_->GetIOBufferForRemainingData();
1557 return connection_->socket()->Write(
1558 write_io_buffer.get(),
1559 in_flight_write_->GetRemainingSize(),
1560 base::Bind(&SpdySession::PumpWriteLoop,
1561 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1562 }
1563
DoWriteComplete(int result)1564 int SpdySession::DoWriteComplete(int result) {
1565 CHECK(in_io_loop_);
1566 DCHECK_NE(result, ERR_IO_PENDING);
1567 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1568
1569 last_activity_time_ = time_func_();
1570
1571 if (result < 0) {
1572 DCHECK_NE(result, ERR_IO_PENDING);
1573 in_flight_write_.reset();
1574 in_flight_write_frame_type_ = DATA;
1575 in_flight_write_frame_size_ = 0;
1576 in_flight_write_stream_.reset();
1577 write_state_ = WRITE_STATE_DO_WRITE;
1578 DoDrainSession(static_cast<Error>(result), "Write error");
1579 return OK;
1580 }
1581
1582 // It should not be possible to have written more bytes than our
1583 // in_flight_write_.
1584 DCHECK_LE(static_cast<size_t>(result),
1585 in_flight_write_->GetRemainingSize());
1586
1587 if (result > 0) {
1588 in_flight_write_->Consume(static_cast<size_t>(result));
1589
1590 // We only notify the stream when we've fully written the pending frame.
1591 if (in_flight_write_->GetRemainingSize() == 0) {
1592 // It is possible that the stream was cancelled while we were
1593 // writing to the socket.
1594 if (in_flight_write_stream_.get()) {
1595 DCHECK_GT(in_flight_write_frame_size_, 0u);
1596 in_flight_write_stream_->OnFrameWriteComplete(
1597 in_flight_write_frame_type_,
1598 in_flight_write_frame_size_);
1599 }
1600
1601 // Cleanup the write which just completed.
1602 in_flight_write_.reset();
1603 in_flight_write_frame_type_ = DATA;
1604 in_flight_write_frame_size_ = 0;
1605 in_flight_write_stream_.reset();
1606 }
1607 }
1608
1609 write_state_ = WRITE_STATE_DO_WRITE;
1610 return OK;
1611 }
1612
DcheckGoingAway() const1613 void SpdySession::DcheckGoingAway() const {
1614 #if DCHECK_IS_ON
1615 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1616 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1617 DCHECK(pending_create_stream_queues_[i].empty());
1618 }
1619 DCHECK(created_streams_.empty());
1620 #endif
1621 }
1622
DcheckDraining() const1623 void SpdySession::DcheckDraining() const {
1624 DcheckGoingAway();
1625 DCHECK_EQ(availability_state_, STATE_DRAINING);
1626 DCHECK(active_streams_.empty());
1627 DCHECK(unclaimed_pushed_streams_.empty());
1628 }
1629
StartGoingAway(SpdyStreamId last_good_stream_id,Error status)1630 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1631 Error status) {
1632 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1633
1634 // The loops below are carefully written to avoid reentrancy problems.
1635
1636 while (true) {
1637 size_t old_size = GetTotalSize(pending_create_stream_queues_);
1638 base::WeakPtr<SpdyStreamRequest> pending_request =
1639 GetNextPendingStreamRequest();
1640 if (!pending_request)
1641 break;
1642 // No new stream requests should be added while the session is
1643 // going away.
1644 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1645 pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1646 }
1647
1648 while (true) {
1649 size_t old_size = active_streams_.size();
1650 ActiveStreamMap::iterator it =
1651 active_streams_.lower_bound(last_good_stream_id + 1);
1652 if (it == active_streams_.end())
1653 break;
1654 LogAbandonedActiveStream(it, status);
1655 CloseActiveStreamIterator(it, status);
1656 // No new streams should be activated while the session is going
1657 // away.
1658 DCHECK_GT(old_size, active_streams_.size());
1659 }
1660
1661 while (!created_streams_.empty()) {
1662 size_t old_size = created_streams_.size();
1663 CreatedStreamSet::iterator it = created_streams_.begin();
1664 LogAbandonedStream(*it, status);
1665 CloseCreatedStreamIterator(it, status);
1666 // No new streams should be created while the session is going
1667 // away.
1668 DCHECK_GT(old_size, created_streams_.size());
1669 }
1670
1671 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1672
1673 DcheckGoingAway();
1674 }
1675
MaybeFinishGoingAway()1676 void SpdySession::MaybeFinishGoingAway() {
1677 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1678 DoDrainSession(OK, "Finished going away");
1679 }
1680 }
1681
DoDrainSession(Error err,const std::string & description)1682 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1683 if (availability_state_ == STATE_DRAINING) {
1684 return;
1685 }
1686 MakeUnavailable();
1687
1688 // If |err| indicates an error occurred, inform the peer that we're closing
1689 // and why. Don't GOAWAY on a graceful or idle close, as that may
1690 // unnecessarily wake the radio. We could technically GOAWAY on network errors
1691 // (we'll probably fail to actually write it, but that's okay), however many
1692 // unit-tests would need to be updated.
1693 if (err != OK &&
1694 err != ERR_ABORTED && // Used by SpdySessionPool to close idle sessions.
1695 err != ERR_NETWORK_CHANGED && // Used to deprecate sessions on IP change.
1696 err != ERR_SOCKET_NOT_CONNECTED &&
1697 err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1698 // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1699 SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
1700 MapNetErrorToGoAwayStatus(err),
1701 description);
1702 EnqueueSessionWrite(HIGHEST,
1703 GOAWAY,
1704 scoped_ptr<SpdyFrame>(
1705 buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1706 }
1707
1708 availability_state_ = STATE_DRAINING;
1709 error_on_close_ = err;
1710
1711 net_log_.AddEvent(
1712 NetLog::TYPE_SPDY_SESSION_CLOSE,
1713 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1714
1715 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1716 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1717 total_bytes_received_, 1, 100000000, 50);
1718
1719 if (err == OK) {
1720 // We ought to be going away already, as this is a graceful close.
1721 DcheckGoingAway();
1722 } else {
1723 StartGoingAway(0, err);
1724 }
1725 DcheckDraining();
1726 MaybePostWriteLoop();
1727 }
1728
LogAbandonedStream(SpdyStream * stream,Error status)1729 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1730 DCHECK(stream);
1731 std::string description = base::StringPrintf(
1732 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1733 stream->url().spec();
1734 stream->LogStreamError(status, description);
1735 // We don't increment the streams abandoned counter here. If the
1736 // stream isn't active (i.e., it hasn't written anything to the wire
1737 // yet) then it's as if it never existed. If it is active, then
1738 // LogAbandonedActiveStream() will increment the counters.
1739 }
1740
LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,Error status)1741 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1742 Error status) {
1743 DCHECK_GT(it->first, 0u);
1744 LogAbandonedStream(it->second.stream, status);
1745 ++streams_abandoned_count_;
1746 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1747 abandoned_streams.Increment();
1748 if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1749 unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1750 unclaimed_pushed_streams_.end()) {
1751 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1752 abandoned_push_streams.Increment();
1753 }
1754 }
1755
GetNewStreamId()1756 SpdyStreamId SpdySession::GetNewStreamId() {
1757 CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1758 SpdyStreamId id = stream_hi_water_mark_;
1759 stream_hi_water_mark_ += 2;
1760 return id;
1761 }
1762
CloseSessionOnError(Error err,const std::string & description)1763 void SpdySession::CloseSessionOnError(Error err,
1764 const std::string& description) {
1765 DCHECK_LT(err, ERR_IO_PENDING);
1766 DoDrainSession(err, description);
1767 }
1768
MakeUnavailable()1769 void SpdySession::MakeUnavailable() {
1770 if (availability_state_ == STATE_AVAILABLE) {
1771 availability_state_ = STATE_GOING_AWAY;
1772 pool_->MakeSessionUnavailable(GetWeakPtr());
1773 }
1774 }
1775
GetInfoAsValue() const1776 base::Value* SpdySession::GetInfoAsValue() const {
1777 base::DictionaryValue* dict = new base::DictionaryValue();
1778
1779 dict->SetInteger("source_id", net_log_.source().id);
1780
1781 dict->SetString("host_port_pair", host_port_pair().ToString());
1782 if (!pooled_aliases_.empty()) {
1783 base::ListValue* alias_list = new base::ListValue();
1784 for (std::set<SpdySessionKey>::const_iterator it =
1785 pooled_aliases_.begin();
1786 it != pooled_aliases_.end(); it++) {
1787 alias_list->Append(new base::StringValue(
1788 it->host_port_pair().ToString()));
1789 }
1790 dict->Set("aliases", alias_list);
1791 }
1792 dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1793
1794 dict->SetInteger("active_streams", active_streams_.size());
1795
1796 dict->SetInteger("unclaimed_pushed_streams",
1797 unclaimed_pushed_streams_.size());
1798
1799 dict->SetBoolean("is_secure", is_secure_);
1800
1801 dict->SetString("protocol_negotiated",
1802 SSLClientSocket::NextProtoToString(
1803 connection_->socket()->GetNegotiatedProtocol()));
1804
1805 dict->SetInteger("error", error_on_close_);
1806 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1807
1808 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1809 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1810 dict->SetInteger("streams_pushed_and_claimed_count",
1811 streams_pushed_and_claimed_count_);
1812 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1813 DCHECK(buffered_spdy_framer_.get());
1814 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1815
1816 dict->SetBoolean("sent_settings", sent_settings_);
1817 dict->SetBoolean("received_settings", received_settings_);
1818
1819 dict->SetInteger("send_window_size", session_send_window_size_);
1820 dict->SetInteger("recv_window_size", session_recv_window_size_);
1821 dict->SetInteger("unacked_recv_window_bytes",
1822 session_unacked_recv_window_bytes_);
1823 return dict;
1824 }
1825
IsReused() const1826 bool SpdySession::IsReused() const {
1827 return buffered_spdy_framer_->frames_received() > 0 ||
1828 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1829 }
1830
GetLoadTimingInfo(SpdyStreamId stream_id,LoadTimingInfo * load_timing_info) const1831 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1832 LoadTimingInfo* load_timing_info) const {
1833 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1834 load_timing_info);
1835 }
1836
GetPeerAddress(IPEndPoint * address) const1837 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1838 int rv = ERR_SOCKET_NOT_CONNECTED;
1839 if (connection_->socket()) {
1840 rv = connection_->socket()->GetPeerAddress(address);
1841 }
1842
1843 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1844 rv == ERR_SOCKET_NOT_CONNECTED);
1845
1846 return rv;
1847 }
1848
GetLocalAddress(IPEndPoint * address) const1849 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1850 int rv = ERR_SOCKET_NOT_CONNECTED;
1851 if (connection_->socket()) {
1852 rv = connection_->socket()->GetLocalAddress(address);
1853 }
1854
1855 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1856 rv == ERR_SOCKET_NOT_CONNECTED);
1857
1858 return rv;
1859 }
1860
EnqueueSessionWrite(RequestPriority priority,SpdyFrameType frame_type,scoped_ptr<SpdyFrame> frame)1861 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1862 SpdyFrameType frame_type,
1863 scoped_ptr<SpdyFrame> frame) {
1864 DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1865 frame_type == WINDOW_UPDATE || frame_type == PING ||
1866 frame_type == GOAWAY);
1867 EnqueueWrite(
1868 priority, frame_type,
1869 scoped_ptr<SpdyBufferProducer>(
1870 new SimpleBufferProducer(
1871 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1872 base::WeakPtr<SpdyStream>());
1873 }
1874
EnqueueWrite(RequestPriority priority,SpdyFrameType frame_type,scoped_ptr<SpdyBufferProducer> producer,const base::WeakPtr<SpdyStream> & stream)1875 void SpdySession::EnqueueWrite(RequestPriority priority,
1876 SpdyFrameType frame_type,
1877 scoped_ptr<SpdyBufferProducer> producer,
1878 const base::WeakPtr<SpdyStream>& stream) {
1879 if (availability_state_ == STATE_DRAINING)
1880 return;
1881
1882 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1883 MaybePostWriteLoop();
1884 }
1885
MaybePostWriteLoop()1886 void SpdySession::MaybePostWriteLoop() {
1887 if (write_state_ == WRITE_STATE_IDLE) {
1888 CHECK(!in_flight_write_);
1889 write_state_ = WRITE_STATE_DO_WRITE;
1890 base::MessageLoop::current()->PostTask(
1891 FROM_HERE,
1892 base::Bind(&SpdySession::PumpWriteLoop,
1893 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1894 }
1895 }
1896
InsertCreatedStream(scoped_ptr<SpdyStream> stream)1897 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1898 CHECK_EQ(stream->stream_id(), 0u);
1899 CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1900 created_streams_.insert(stream.release());
1901 }
1902
ActivateCreatedStream(SpdyStream * stream)1903 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1904 CHECK_EQ(stream->stream_id(), 0u);
1905 CHECK(created_streams_.find(stream) != created_streams_.end());
1906 stream->set_stream_id(GetNewStreamId());
1907 scoped_ptr<SpdyStream> owned_stream(stream);
1908 created_streams_.erase(stream);
1909 return owned_stream.Pass();
1910 }
1911
InsertActivatedStream(scoped_ptr<SpdyStream> stream)1912 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1913 SpdyStreamId stream_id = stream->stream_id();
1914 CHECK_NE(stream_id, 0u);
1915 std::pair<ActiveStreamMap::iterator, bool> result =
1916 active_streams_.insert(
1917 std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1918 CHECK(result.second);
1919 ignore_result(stream.release());
1920 }
1921
DeleteStream(scoped_ptr<SpdyStream> stream,int status)1922 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1923 if (in_flight_write_stream_.get() == stream.get()) {
1924 // If we're deleting the stream for the in-flight write, we still
1925 // need to let the write complete, so we clear
1926 // |in_flight_write_stream_| and let the write finish on its own
1927 // without notifying |in_flight_write_stream_|.
1928 in_flight_write_stream_.reset();
1929 }
1930
1931 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1932 stream->OnClose(status);
1933
1934 if (availability_state_ == STATE_AVAILABLE) {
1935 ProcessPendingStreamRequests();
1936 }
1937 }
1938
GetActivePushStream(const GURL & url)1939 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1940 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1941
1942 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1943 if (unclaimed_it == unclaimed_pushed_streams_.end())
1944 return base::WeakPtr<SpdyStream>();
1945
1946 SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1947 unclaimed_pushed_streams_.erase(unclaimed_it);
1948
1949 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1950 if (active_it == active_streams_.end()) {
1951 NOTREACHED();
1952 return base::WeakPtr<SpdyStream>();
1953 }
1954
1955 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1956 used_push_streams.Increment();
1957 return active_it->second.stream->GetWeakPtr();
1958 }
1959
GetSSLInfo(SSLInfo * ssl_info,bool * was_npn_negotiated,NextProto * protocol_negotiated)1960 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1961 bool* was_npn_negotiated,
1962 NextProto* protocol_negotiated) {
1963 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1964 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1965 return connection_->socket()->GetSSLInfo(ssl_info);
1966 }
1967
GetSSLCertRequestInfo(SSLCertRequestInfo * cert_request_info)1968 bool SpdySession::GetSSLCertRequestInfo(
1969 SSLCertRequestInfo* cert_request_info) {
1970 if (!is_secure_)
1971 return false;
1972 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1973 return true;
1974 }
1975
OnError(SpdyFramer::SpdyError error_code)1976 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1977 CHECK(in_io_loop_);
1978
1979 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1980 std::string description =
1981 base::StringPrintf("Framer error: %d (%s).",
1982 error_code,
1983 SpdyFramer::ErrorCodeToString(error_code));
1984 DoDrainSession(MapFramerErrorToNetError(error_code), description);
1985 }
1986
OnStreamError(SpdyStreamId stream_id,const std::string & description)1987 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1988 const std::string& description) {
1989 CHECK(in_io_loop_);
1990
1991 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1992 if (it == active_streams_.end()) {
1993 // We still want to send a frame to reset the stream even if we
1994 // don't know anything about it.
1995 EnqueueResetStreamFrame(
1996 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1997 return;
1998 }
1999
2000 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
2001 }
2002
OnDataFrameHeader(SpdyStreamId stream_id,size_t length,bool fin)2003 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
2004 size_t length,
2005 bool fin) {
2006 CHECK(in_io_loop_);
2007
2008 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2009
2010 // By the time data comes in, the stream may already be inactive.
2011 if (it == active_streams_.end())
2012 return;
2013
2014 SpdyStream* stream = it->second.stream;
2015 CHECK_EQ(stream->stream_id(), stream_id);
2016
2017 DCHECK(buffered_spdy_framer_);
2018 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
2019 stream->IncrementRawReceivedBytes(header_len);
2020 }
2021
OnStreamFrameData(SpdyStreamId stream_id,const char * data,size_t len,bool fin)2022 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2023 const char* data,
2024 size_t len,
2025 bool fin) {
2026 CHECK(in_io_loop_);
2027
2028 if (data == NULL && len != 0) {
2029 // This is notification of consumed data padding.
2030 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2031 // See crbug.com/353012.
2032 return;
2033 }
2034
2035 DCHECK_LT(len, 1u << 24);
2036 if (net_log().IsLogging()) {
2037 net_log().AddEvent(
2038 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2039 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2040 }
2041
2042 // Build the buffer as early as possible so that we go through the
2043 // session flow control checks and update
2044 // |unacked_recv_window_bytes_| properly even when the stream is
2045 // inactive (since the other side has still reduced its session send
2046 // window).
2047 scoped_ptr<SpdyBuffer> buffer;
2048 if (data) {
2049 DCHECK_GT(len, 0u);
2050 CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2051 buffer.reset(new SpdyBuffer(data, len));
2052
2053 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2054 DecreaseRecvWindowSize(static_cast<int32>(len));
2055 buffer->AddConsumeCallback(
2056 base::Bind(&SpdySession::OnReadBufferConsumed,
2057 weak_factory_.GetWeakPtr()));
2058 }
2059 } else {
2060 DCHECK_EQ(len, 0u);
2061 }
2062
2063 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2064
2065 // By the time data comes in, the stream may already be inactive.
2066 if (it == active_streams_.end())
2067 return;
2068
2069 SpdyStream* stream = it->second.stream;
2070 CHECK_EQ(stream->stream_id(), stream_id);
2071
2072 stream->IncrementRawReceivedBytes(len);
2073
2074 if (it->second.waiting_for_syn_reply) {
2075 const std::string& error = "Data received before SYN_REPLY.";
2076 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2077 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2078 return;
2079 }
2080
2081 stream->OnDataReceived(buffer.Pass());
2082 }
2083
OnSettings(bool clear_persisted)2084 void SpdySession::OnSettings(bool clear_persisted) {
2085 CHECK(in_io_loop_);
2086
2087 if (clear_persisted)
2088 http_server_properties_->ClearSpdySettings(host_port_pair());
2089
2090 if (net_log_.IsLogging()) {
2091 net_log_.AddEvent(
2092 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2093 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2094 clear_persisted));
2095 }
2096
2097 if (GetProtocolVersion() >= SPDY4) {
2098 // Send an acknowledgment of the setting.
2099 SpdySettingsIR settings_ir;
2100 settings_ir.set_is_ack(true);
2101 EnqueueSessionWrite(
2102 HIGHEST,
2103 SETTINGS,
2104 scoped_ptr<SpdyFrame>(
2105 buffered_spdy_framer_->SerializeFrame(settings_ir)));
2106 }
2107 }
2108
OnSetting(SpdySettingsIds id,uint8 flags,uint32 value)2109 void SpdySession::OnSetting(SpdySettingsIds id,
2110 uint8 flags,
2111 uint32 value) {
2112 CHECK(in_io_loop_);
2113
2114 HandleSetting(id, value);
2115 http_server_properties_->SetSpdySetting(
2116 host_port_pair(),
2117 id,
2118 static_cast<SpdySettingsFlags>(flags),
2119 value);
2120 received_settings_ = true;
2121
2122 // Log the setting.
2123 const SpdyMajorVersion protocol_version = GetProtocolVersion();
2124 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2125 base::Bind(&NetLogSpdySettingCallback,
2126 id,
2127 protocol_version,
2128 static_cast<SpdySettingsFlags>(flags),
2129 value));
2130 }
2131
OnSendCompressedFrame(SpdyStreamId stream_id,SpdyFrameType type,size_t payload_len,size_t frame_len)2132 void SpdySession::OnSendCompressedFrame(
2133 SpdyStreamId stream_id,
2134 SpdyFrameType type,
2135 size_t payload_len,
2136 size_t frame_len) {
2137 if (type != SYN_STREAM && type != HEADERS)
2138 return;
2139
2140 DCHECK(buffered_spdy_framer_.get());
2141 size_t compressed_len =
2142 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2143
2144 if (payload_len) {
2145 // Make sure we avoid early decimal truncation.
2146 int compression_pct = 100 - (100 * compressed_len) / payload_len;
2147 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2148 compression_pct);
2149 }
2150 }
2151
OnReceiveCompressedFrame(SpdyStreamId stream_id,SpdyFrameType type,size_t frame_len)2152 void SpdySession::OnReceiveCompressedFrame(
2153 SpdyStreamId stream_id,
2154 SpdyFrameType type,
2155 size_t frame_len) {
2156 last_compressed_frame_len_ = frame_len;
2157 }
2158
OnInitialResponseHeadersReceived(const SpdyHeaderBlock & response_headers,base::Time response_time,base::TimeTicks recv_first_byte_time,SpdyStream * stream)2159 int SpdySession::OnInitialResponseHeadersReceived(
2160 const SpdyHeaderBlock& response_headers,
2161 base::Time response_time,
2162 base::TimeTicks recv_first_byte_time,
2163 SpdyStream* stream) {
2164 CHECK(in_io_loop_);
2165 SpdyStreamId stream_id = stream->stream_id();
2166
2167 if (stream->type() == SPDY_PUSH_STREAM) {
2168 DCHECK(stream->IsReservedRemote());
2169 if (max_concurrent_pushed_streams_ &&
2170 num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2171 ResetStream(stream_id,
2172 RST_STREAM_REFUSED_STREAM,
2173 "Stream concurrency limit reached.");
2174 return STATUS_CODE_REFUSED_STREAM;
2175 }
2176 }
2177
2178 if (stream->type() == SPDY_PUSH_STREAM) {
2179 // Will be balanced in DeleteStream.
2180 num_active_pushed_streams_++;
2181 }
2182
2183 // May invalidate |stream|.
2184 int rv = stream->OnInitialResponseHeadersReceived(
2185 response_headers, response_time, recv_first_byte_time);
2186 if (rv < 0) {
2187 DCHECK_NE(rv, ERR_IO_PENDING);
2188 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2189 }
2190
2191 return rv;
2192 }
2193
OnSynStream(SpdyStreamId stream_id,SpdyStreamId associated_stream_id,SpdyPriority priority,bool fin,bool unidirectional,const SpdyHeaderBlock & headers)2194 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2195 SpdyStreamId associated_stream_id,
2196 SpdyPriority priority,
2197 bool fin,
2198 bool unidirectional,
2199 const SpdyHeaderBlock& headers) {
2200 CHECK(in_io_loop_);
2201
2202 if (GetProtocolVersion() >= SPDY4) {
2203 DCHECK_EQ(0u, associated_stream_id);
2204 OnHeaders(stream_id, fin, headers);
2205 return;
2206 }
2207
2208 base::Time response_time = base::Time::Now();
2209 base::TimeTicks recv_first_byte_time = time_func_();
2210
2211 if (net_log_.IsLogging()) {
2212 net_log_.AddEvent(
2213 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2214 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2215 &headers, fin, unidirectional, priority,
2216 stream_id, associated_stream_id));
2217 }
2218
2219 // Split headers to simulate push promise and response.
2220 SpdyHeaderBlock request_headers;
2221 SpdyHeaderBlock response_headers;
2222 SplitPushedHeadersToRequestAndResponse(
2223 headers, GetProtocolVersion(), &request_headers, &response_headers);
2224
2225 if (!TryCreatePushStream(
2226 stream_id, associated_stream_id, priority, request_headers))
2227 return;
2228
2229 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2230 if (active_it == active_streams_.end()) {
2231 NOTREACHED();
2232 return;
2233 }
2234
2235 if (OnInitialResponseHeadersReceived(response_headers,
2236 response_time,
2237 recv_first_byte_time,
2238 active_it->second.stream) != OK)
2239 return;
2240
2241 base::StatsCounter push_requests("spdy.pushed_streams");
2242 push_requests.Increment();
2243 }
2244
DeleteExpiredPushedStreams()2245 void SpdySession::DeleteExpiredPushedStreams() {
2246 if (unclaimed_pushed_streams_.empty())
2247 return;
2248
2249 // Check that adequate time has elapsed since the last sweep.
2250 if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2251 return;
2252
2253 // Gather old streams to delete.
2254 base::TimeTicks minimum_freshness = time_func_() -
2255 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2256 std::vector<SpdyStreamId> streams_to_close;
2257 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2258 it != unclaimed_pushed_streams_.end(); ++it) {
2259 if (minimum_freshness > it->second.creation_time)
2260 streams_to_close.push_back(it->second.stream_id);
2261 }
2262
2263 for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2264 streams_to_close.begin();
2265 to_close_it != streams_to_close.end(); ++to_close_it) {
2266 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2267 if (active_it == active_streams_.end())
2268 continue;
2269
2270 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2271 // CloseActiveStreamIterator() will remove the stream from
2272 // |unclaimed_pushed_streams_|.
2273 ResetStreamIterator(
2274 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2275 }
2276
2277 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2278 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2279 }
2280
OnSynReply(SpdyStreamId stream_id,bool fin,const SpdyHeaderBlock & headers)2281 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2282 bool fin,
2283 const SpdyHeaderBlock& headers) {
2284 CHECK(in_io_loop_);
2285
2286 base::Time response_time = base::Time::Now();
2287 base::TimeTicks recv_first_byte_time = time_func_();
2288
2289 if (net_log().IsLogging()) {
2290 net_log().AddEvent(
2291 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2292 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2293 &headers, fin, stream_id));
2294 }
2295
2296 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2297 if (it == active_streams_.end()) {
2298 // NOTE: it may just be that the stream was cancelled.
2299 return;
2300 }
2301
2302 SpdyStream* stream = it->second.stream;
2303 CHECK_EQ(stream->stream_id(), stream_id);
2304
2305 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2306 last_compressed_frame_len_ = 0;
2307
2308 if (GetProtocolVersion() >= SPDY4) {
2309 const std::string& error =
2310 "SPDY4 wasn't expecting SYN_REPLY.";
2311 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2312 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2313 return;
2314 }
2315 if (!it->second.waiting_for_syn_reply) {
2316 const std::string& error =
2317 "Received duplicate SYN_REPLY for stream.";
2318 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2319 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2320 return;
2321 }
2322 it->second.waiting_for_syn_reply = false;
2323
2324 ignore_result(OnInitialResponseHeadersReceived(
2325 headers, response_time, recv_first_byte_time, stream));
2326 }
2327
OnHeaders(SpdyStreamId stream_id,bool fin,const SpdyHeaderBlock & headers)2328 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2329 bool fin,
2330 const SpdyHeaderBlock& headers) {
2331 CHECK(in_io_loop_);
2332
2333 if (net_log().IsLogging()) {
2334 net_log().AddEvent(
2335 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2336 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2337 &headers, fin, stream_id));
2338 }
2339
2340 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2341 if (it == active_streams_.end()) {
2342 // NOTE: it may just be that the stream was cancelled.
2343 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2344 return;
2345 }
2346
2347 SpdyStream* stream = it->second.stream;
2348 CHECK_EQ(stream->stream_id(), stream_id);
2349
2350 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2351 last_compressed_frame_len_ = 0;
2352
2353 base::Time response_time = base::Time::Now();
2354 base::TimeTicks recv_first_byte_time = time_func_();
2355
2356 if (it->second.waiting_for_syn_reply) {
2357 if (GetProtocolVersion() < SPDY4) {
2358 const std::string& error =
2359 "Was expecting SYN_REPLY, not HEADERS.";
2360 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2361 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2362 return;
2363 }
2364
2365 it->second.waiting_for_syn_reply = false;
2366 ignore_result(OnInitialResponseHeadersReceived(
2367 headers, response_time, recv_first_byte_time, stream));
2368 } else if (it->second.stream->IsReservedRemote()) {
2369 ignore_result(OnInitialResponseHeadersReceived(
2370 headers, response_time, recv_first_byte_time, stream));
2371 } else {
2372 int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2373 if (rv < 0) {
2374 DCHECK_NE(rv, ERR_IO_PENDING);
2375 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2376 }
2377 }
2378 }
2379
OnUnknownFrame(SpdyStreamId stream_id,int frame_type)2380 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
2381 // Validate stream id.
2382 // Was the frame sent on a stream id that has not been used in this session?
2383 if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
2384 return false;
2385
2386 if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
2387 return false;
2388
2389 return true;
2390 }
2391
OnRstStream(SpdyStreamId stream_id,SpdyRstStreamStatus status)2392 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2393 SpdyRstStreamStatus status) {
2394 CHECK(in_io_loop_);
2395
2396 std::string description;
2397 net_log().AddEvent(
2398 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2399 base::Bind(&NetLogSpdyRstCallback,
2400 stream_id, status, &description));
2401
2402 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2403 if (it == active_streams_.end()) {
2404 // NOTE: it may just be that the stream was cancelled.
2405 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2406 return;
2407 }
2408
2409 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2410
2411 if (status == 0) {
2412 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2413 } else if (status == RST_STREAM_REFUSED_STREAM) {
2414 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2415 } else {
2416 RecordProtocolErrorHistogram(
2417 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2418 it->second.stream->LogStreamError(
2419 ERR_SPDY_PROTOCOL_ERROR,
2420 base::StringPrintf("SPDY stream closed with status: %d", status));
2421 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2422 // For now, it doesn't matter much - it is a protocol error.
2423 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2424 }
2425 }
2426
OnGoAway(SpdyStreamId last_accepted_stream_id,SpdyGoAwayStatus status)2427 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2428 SpdyGoAwayStatus status) {
2429 CHECK(in_io_loop_);
2430
2431 // TODO(jgraettinger): UMA histogram on |status|.
2432
2433 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2434 base::Bind(&NetLogSpdyGoAwayCallback,
2435 last_accepted_stream_id,
2436 active_streams_.size(),
2437 unclaimed_pushed_streams_.size(),
2438 status));
2439 MakeUnavailable();
2440 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2441 // This is to handle the case when we already don't have any active
2442 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2443 // active streams and so the last one being closed will finish the
2444 // going away process (see DeleteStream()).
2445 MaybeFinishGoingAway();
2446 }
2447
OnPing(SpdyPingId unique_id,bool is_ack)2448 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2449 CHECK(in_io_loop_);
2450
2451 net_log_.AddEvent(
2452 NetLog::TYPE_SPDY_SESSION_PING,
2453 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2454
2455 // Send response to a PING from server.
2456 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2457 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2458 WritePingFrame(unique_id, true);
2459 return;
2460 }
2461
2462 --pings_in_flight_;
2463 if (pings_in_flight_ < 0) {
2464 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2465 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2466 pings_in_flight_ = 0;
2467 return;
2468 }
2469
2470 if (pings_in_flight_ > 0)
2471 return;
2472
2473 // We will record RTT in histogram when there are no more client sent
2474 // pings_in_flight_.
2475 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2476 }
2477
OnWindowUpdate(SpdyStreamId stream_id,uint32 delta_window_size)2478 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2479 uint32 delta_window_size) {
2480 CHECK(in_io_loop_);
2481
2482 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2483 net_log_.AddEvent(
2484 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2485 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2486 stream_id, delta_window_size));
2487
2488 if (stream_id == kSessionFlowControlStreamId) {
2489 // WINDOW_UPDATE for the session.
2490 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2491 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2492 << "session flow control is not turned on";
2493 // TODO(akalin): Record an error and close the session.
2494 return;
2495 }
2496
2497 if (delta_window_size < 1u) {
2498 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2499 DoDrainSession(
2500 ERR_SPDY_PROTOCOL_ERROR,
2501 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2502 base::UintToString(delta_window_size));
2503 return;
2504 }
2505
2506 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2507 } else {
2508 // WINDOW_UPDATE for a stream.
2509 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2510 // TODO(akalin): Record an error and close the session.
2511 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2512 << " when flow control is not turned on";
2513 return;
2514 }
2515
2516 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2517
2518 if (it == active_streams_.end()) {
2519 // NOTE: it may just be that the stream was cancelled.
2520 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2521 return;
2522 }
2523
2524 SpdyStream* stream = it->second.stream;
2525 CHECK_EQ(stream->stream_id(), stream_id);
2526
2527 if (delta_window_size < 1u) {
2528 ResetStreamIterator(it,
2529 RST_STREAM_FLOW_CONTROL_ERROR,
2530 base::StringPrintf(
2531 "Received WINDOW_UPDATE with an invalid "
2532 "delta_window_size %ud", delta_window_size));
2533 return;
2534 }
2535
2536 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2537 it->second.stream->IncreaseSendWindowSize(
2538 static_cast<int32>(delta_window_size));
2539 }
2540 }
2541
TryCreatePushStream(SpdyStreamId stream_id,SpdyStreamId associated_stream_id,SpdyPriority priority,const SpdyHeaderBlock & headers)2542 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2543 SpdyStreamId associated_stream_id,
2544 SpdyPriority priority,
2545 const SpdyHeaderBlock& headers) {
2546 // Server-initiated streams should have even sequence numbers.
2547 if ((stream_id & 0x1) != 0) {
2548 LOG(WARNING) << "Received invalid push stream id " << stream_id;
2549 if (GetProtocolVersion() > SPDY2)
2550 CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
2551 return false;
2552 }
2553
2554 if (GetProtocolVersion() > SPDY2) {
2555 if (stream_id <= last_accepted_push_stream_id_) {
2556 LOG(WARNING) << "Received push stream id lesser or equal to the last "
2557 << "accepted before " << stream_id;
2558 CloseSessionOnError(
2559 ERR_SPDY_PROTOCOL_ERROR,
2560 "New push stream id must be greater than the last accepted.");
2561 return false;
2562 }
2563 }
2564
2565 if (IsStreamActive(stream_id)) {
2566 // For SPDY3 and higher we should not get here, we'll start going away
2567 // earlier on |last_seen_push_stream_id_| check.
2568 CHECK_GT(SPDY3, GetProtocolVersion());
2569 LOG(WARNING) << "Received push for active stream " << stream_id;
2570 return false;
2571 }
2572
2573 last_accepted_push_stream_id_ = stream_id;
2574
2575 RequestPriority request_priority =
2576 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2577
2578 if (availability_state_ == STATE_GOING_AWAY) {
2579 // TODO(akalin): This behavior isn't in the SPDY spec, although it
2580 // probably should be.
2581 EnqueueResetStreamFrame(stream_id,
2582 request_priority,
2583 RST_STREAM_REFUSED_STREAM,
2584 "push stream request received when going away");
2585 return false;
2586 }
2587
2588 if (associated_stream_id == 0) {
2589 // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2590 // session going away. We should never get here.
2591 CHECK_GT(SPDY4, GetProtocolVersion());
2592 std::string description = base::StringPrintf(
2593 "Received invalid associated stream id %d for pushed stream %d",
2594 associated_stream_id,
2595 stream_id);
2596 EnqueueResetStreamFrame(
2597 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2598 return false;
2599 }
2600
2601 streams_pushed_count_++;
2602
2603 // TODO(mbelshe): DCHECK that this is a GET method?
2604
2605 // Verify that the response had a URL for us.
2606 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2607 if (!gurl.is_valid()) {
2608 EnqueueResetStreamFrame(stream_id,
2609 request_priority,
2610 RST_STREAM_PROTOCOL_ERROR,
2611 "Pushed stream url was invalid: " + gurl.spec());
2612 return false;
2613 }
2614
2615 // Verify we have a valid stream association.
2616 ActiveStreamMap::iterator associated_it =
2617 active_streams_.find(associated_stream_id);
2618 if (associated_it == active_streams_.end()) {
2619 EnqueueResetStreamFrame(
2620 stream_id,
2621 request_priority,
2622 RST_STREAM_INVALID_STREAM,
2623 base::StringPrintf("Received push for inactive associated stream %d",
2624 associated_stream_id));
2625 return false;
2626 }
2627
2628 // Check that the pushed stream advertises the same origin as its associated
2629 // stream. Bypass this check if and only if this session is with a SPDY proxy
2630 // that is trusted explicitly via the --trusted-spdy-proxy switch.
2631 if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2632 // Disallow pushing of HTTPS content.
2633 if (gurl.SchemeIs("https")) {
2634 EnqueueResetStreamFrame(
2635 stream_id,
2636 request_priority,
2637 RST_STREAM_REFUSED_STREAM,
2638 base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2639 associated_stream_id));
2640 }
2641 } else {
2642 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2643 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2644 EnqueueResetStreamFrame(
2645 stream_id,
2646 request_priority,
2647 RST_STREAM_REFUSED_STREAM,
2648 base::StringPrintf("Rejected Cross Origin Push Stream %d",
2649 associated_stream_id));
2650 return false;
2651 }
2652 }
2653
2654 // There should not be an existing pushed stream with the same path.
2655 PushedStreamMap::iterator pushed_it =
2656 unclaimed_pushed_streams_.lower_bound(gurl);
2657 if (pushed_it != unclaimed_pushed_streams_.end() &&
2658 pushed_it->first == gurl) {
2659 EnqueueResetStreamFrame(
2660 stream_id,
2661 request_priority,
2662 RST_STREAM_PROTOCOL_ERROR,
2663 "Received duplicate pushed stream with url: " + gurl.spec());
2664 return false;
2665 }
2666
2667 scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2668 GetWeakPtr(),
2669 gurl,
2670 request_priority,
2671 stream_initial_send_window_size_,
2672 stream_initial_recv_window_size_,
2673 net_log_));
2674 stream->set_stream_id(stream_id);
2675
2676 // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2677 if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2678 associated_it->second.stream->IncrementRawReceivedBytes(
2679 last_compressed_frame_len_);
2680 } else {
2681 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2682 }
2683
2684 last_compressed_frame_len_ = 0;
2685
2686 DeleteExpiredPushedStreams();
2687 PushedStreamMap::iterator inserted_pushed_it =
2688 unclaimed_pushed_streams_.insert(
2689 pushed_it,
2690 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2691 DCHECK(inserted_pushed_it != pushed_it);
2692
2693 InsertActivatedStream(stream.Pass());
2694
2695 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2696 if (active_it == active_streams_.end()) {
2697 NOTREACHED();
2698 return false;
2699 }
2700
2701 active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2702 DCHECK(active_it->second.stream->IsReservedRemote());
2703 num_pushed_streams_++;
2704 return true;
2705 }
2706
OnPushPromise(SpdyStreamId stream_id,SpdyStreamId promised_stream_id,const SpdyHeaderBlock & headers)2707 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2708 SpdyStreamId promised_stream_id,
2709 const SpdyHeaderBlock& headers) {
2710 CHECK(in_io_loop_);
2711
2712 if (net_log_.IsLogging()) {
2713 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2714 base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2715 &headers,
2716 stream_id,
2717 promised_stream_id));
2718 }
2719
2720 // Any priority will do.
2721 // TODO(baranovich): pass parent stream id priority?
2722 if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2723 return;
2724
2725 base::StatsCounter push_requests("spdy.pushed_streams");
2726 push_requests.Increment();
2727 }
2728
SendStreamWindowUpdate(SpdyStreamId stream_id,uint32 delta_window_size)2729 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2730 uint32 delta_window_size) {
2731 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2732 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2733 CHECK(it != active_streams_.end());
2734 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2735 SendWindowUpdateFrame(
2736 stream_id, delta_window_size, it->second.stream->priority());
2737 }
2738
SendInitialData()2739 void SpdySession::SendInitialData() {
2740 DCHECK(enable_sending_initial_data_);
2741
2742 if (send_connection_header_prefix_) {
2743 DCHECK_EQ(protocol_, kProtoSPDY4);
2744 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2745 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2746 kHttp2ConnectionHeaderPrefixSize,
2747 false /* take_ownership */));
2748 // Count the prefix as part of the subsequent SETTINGS frame.
2749 EnqueueSessionWrite(HIGHEST, SETTINGS,
2750 connection_header_prefix_frame.Pass());
2751 }
2752
2753 // First, notify the server about the settings they should use when
2754 // communicating with us.
2755 SettingsMap settings_map;
2756 // Create a new settings frame notifying the server of our
2757 // max concurrent streams and initial window size.
2758 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2759 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2760 if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2761 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2762 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2763 SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2764 stream_initial_recv_window_size_);
2765 }
2766 SendSettings(settings_map);
2767
2768 // Next, notify the server about our initial recv window size.
2769 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2770 // Bump up the receive window size to the real initial value. This
2771 // has to go here since the WINDOW_UPDATE frame sent by
2772 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2773 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2774 // This condition implies that |kDefaultInitialRecvWindowSize| -
2775 // |session_recv_window_size_| doesn't overflow.
2776 DCHECK_GT(session_recv_window_size_, 0);
2777 IncreaseRecvWindowSize(
2778 kDefaultInitialRecvWindowSize - session_recv_window_size_);
2779 }
2780
2781 if (protocol_ <= kProtoSPDY31) {
2782 // Finally, notify the server about the settings they have
2783 // previously told us to use when communicating with them (after
2784 // applying them).
2785 const SettingsMap& server_settings_map =
2786 http_server_properties_->GetSpdySettings(host_port_pair());
2787 if (server_settings_map.empty())
2788 return;
2789
2790 SettingsMap::const_iterator it =
2791 server_settings_map.find(SETTINGS_CURRENT_CWND);
2792 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2793 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2794
2795 for (SettingsMap::const_iterator it = server_settings_map.begin();
2796 it != server_settings_map.end(); ++it) {
2797 const SpdySettingsIds new_id = it->first;
2798 const uint32 new_val = it->second.second;
2799 HandleSetting(new_id, new_val);
2800 }
2801
2802 SendSettings(server_settings_map);
2803 }
2804 }
2805
2806
SendSettings(const SettingsMap & settings)2807 void SpdySession::SendSettings(const SettingsMap& settings) {
2808 const SpdyMajorVersion protocol_version = GetProtocolVersion();
2809 net_log_.AddEvent(
2810 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2811 base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
2812 // Create the SETTINGS frame and send it.
2813 DCHECK(buffered_spdy_framer_.get());
2814 scoped_ptr<SpdyFrame> settings_frame(
2815 buffered_spdy_framer_->CreateSettings(settings));
2816 sent_settings_ = true;
2817 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2818 }
2819
HandleSetting(uint32 id,uint32 value)2820 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2821 switch (id) {
2822 case SETTINGS_MAX_CONCURRENT_STREAMS:
2823 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2824 kMaxConcurrentStreamLimit);
2825 ProcessPendingStreamRequests();
2826 break;
2827 case SETTINGS_INITIAL_WINDOW_SIZE: {
2828 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2829 net_log().AddEvent(
2830 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2831 return;
2832 }
2833
2834 if (value > static_cast<uint32>(kint32max)) {
2835 net_log().AddEvent(
2836 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2837 NetLog::IntegerCallback("initial_window_size", value));
2838 return;
2839 }
2840
2841 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2842 int32 delta_window_size =
2843 static_cast<int32>(value) - stream_initial_send_window_size_;
2844 stream_initial_send_window_size_ = static_cast<int32>(value);
2845 UpdateStreamsSendWindowSize(delta_window_size);
2846 net_log().AddEvent(
2847 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2848 NetLog::IntegerCallback("delta_window_size", delta_window_size));
2849 break;
2850 }
2851 }
2852 }
2853
UpdateStreamsSendWindowSize(int32 delta_window_size)2854 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2855 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2856 for (ActiveStreamMap::iterator it = active_streams_.begin();
2857 it != active_streams_.end(); ++it) {
2858 it->second.stream->AdjustSendWindowSize(delta_window_size);
2859 }
2860
2861 for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2862 it != created_streams_.end(); it++) {
2863 (*it)->AdjustSendWindowSize(delta_window_size);
2864 }
2865 }
2866
SendPrefacePingIfNoneInFlight()2867 void SpdySession::SendPrefacePingIfNoneInFlight() {
2868 if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2869 return;
2870
2871 base::TimeTicks now = time_func_();
2872 // If there is no activity in the session, then send a preface-PING.
2873 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2874 SendPrefacePing();
2875 }
2876
SendPrefacePing()2877 void SpdySession::SendPrefacePing() {
2878 WritePingFrame(next_ping_id_, false);
2879 }
2880
SendWindowUpdateFrame(SpdyStreamId stream_id,uint32 delta_window_size,RequestPriority priority)2881 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2882 uint32 delta_window_size,
2883 RequestPriority priority) {
2884 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2885 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2886 if (it != active_streams_.end()) {
2887 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2888 } else {
2889 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2890 CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2891 }
2892
2893 net_log_.AddEvent(
2894 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2895 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2896 stream_id, delta_window_size));
2897
2898 DCHECK(buffered_spdy_framer_.get());
2899 scoped_ptr<SpdyFrame> window_update_frame(
2900 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2901 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2902 }
2903
WritePingFrame(uint32 unique_id,bool is_ack)2904 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2905 DCHECK(buffered_spdy_framer_.get());
2906 scoped_ptr<SpdyFrame> ping_frame(
2907 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2908 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2909
2910 if (net_log().IsLogging()) {
2911 net_log().AddEvent(
2912 NetLog::TYPE_SPDY_SESSION_PING,
2913 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2914 }
2915 if (!is_ack) {
2916 next_ping_id_ += 2;
2917 ++pings_in_flight_;
2918 PlanToCheckPingStatus();
2919 last_ping_sent_time_ = time_func_();
2920 }
2921 }
2922
PlanToCheckPingStatus()2923 void SpdySession::PlanToCheckPingStatus() {
2924 if (check_ping_status_pending_)
2925 return;
2926
2927 check_ping_status_pending_ = true;
2928 base::MessageLoop::current()->PostDelayedTask(
2929 FROM_HERE,
2930 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2931 time_func_()), hung_interval_);
2932 }
2933
CheckPingStatus(base::TimeTicks last_check_time)2934 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2935 CHECK(!in_io_loop_);
2936
2937 // Check if we got a response back for all PINGs we had sent.
2938 if (pings_in_flight_ == 0) {
2939 check_ping_status_pending_ = false;
2940 return;
2941 }
2942
2943 DCHECK(check_ping_status_pending_);
2944
2945 base::TimeTicks now = time_func_();
2946 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2947
2948 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2949 // Track all failed PING messages in a separate bucket.
2950 RecordPingRTTHistogram(base::TimeDelta::Max());
2951 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2952 return;
2953 }
2954
2955 // Check the status of connection after a delay.
2956 base::MessageLoop::current()->PostDelayedTask(
2957 FROM_HERE,
2958 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2959 now),
2960 delay);
2961 }
2962
RecordPingRTTHistogram(base::TimeDelta duration)2963 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2964 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2965 }
2966
RecordProtocolErrorHistogram(SpdyProtocolErrorDetails details)2967 void SpdySession::RecordProtocolErrorHistogram(
2968 SpdyProtocolErrorDetails details) {
2969 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2970 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2971 if (EndsWith(host_port_pair().host(), "google.com", false)) {
2972 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2973 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2974 }
2975 }
2976
RecordHistograms()2977 void SpdySession::RecordHistograms() {
2978 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2979 streams_initiated_count_,
2980 0, 300, 50);
2981 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2982 streams_pushed_count_,
2983 0, 300, 50);
2984 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2985 streams_pushed_and_claimed_count_,
2986 0, 300, 50);
2987 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2988 streams_abandoned_count_,
2989 0, 300, 50);
2990 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2991 sent_settings_ ? 1 : 0, 2);
2992 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2993 received_settings_ ? 1 : 0, 2);
2994 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2995 stalled_streams_,
2996 0, 300, 50);
2997 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2998 stalled_streams_ > 0 ? 1 : 0, 2);
2999
3000 if (received_settings_) {
3001 // Enumerate the saved settings, and set histograms for it.
3002 const SettingsMap& settings_map =
3003 http_server_properties_->GetSpdySettings(host_port_pair());
3004
3005 SettingsMap::const_iterator it;
3006 for (it = settings_map.begin(); it != settings_map.end(); ++it) {
3007 const SpdySettingsIds id = it->first;
3008 const uint32 val = it->second.second;
3009 switch (id) {
3010 case SETTINGS_CURRENT_CWND:
3011 // Record several different histograms to see if cwnd converges
3012 // for larger volumes of data being sent.
3013 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
3014 val, 1, 200, 100);
3015 if (total_bytes_received_ > 10 * 1024) {
3016 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
3017 val, 1, 200, 100);
3018 if (total_bytes_received_ > 25 * 1024) {
3019 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
3020 val, 1, 200, 100);
3021 if (total_bytes_received_ > 50 * 1024) {
3022 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
3023 val, 1, 200, 100);
3024 if (total_bytes_received_ > 100 * 1024) {
3025 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
3026 val, 1, 200, 100);
3027 }
3028 }
3029 }
3030 }
3031 break;
3032 case SETTINGS_ROUND_TRIP_TIME:
3033 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
3034 val, 1, 1200, 100);
3035 break;
3036 case SETTINGS_DOWNLOAD_RETRANS_RATE:
3037 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
3038 val, 1, 100, 50);
3039 break;
3040 default:
3041 break;
3042 }
3043 }
3044 }
3045 }
3046
CompleteStreamRequest(const base::WeakPtr<SpdyStreamRequest> & pending_request)3047 void SpdySession::CompleteStreamRequest(
3048 const base::WeakPtr<SpdyStreamRequest>& pending_request) {
3049 // Abort if the request has already been cancelled.
3050 if (!pending_request)
3051 return;
3052
3053 base::WeakPtr<SpdyStream> stream;
3054 int rv = TryCreateStream(pending_request, &stream);
3055
3056 if (rv == OK) {
3057 DCHECK(stream);
3058 pending_request->OnRequestCompleteSuccess(stream);
3059 return;
3060 }
3061 DCHECK(!stream);
3062
3063 if (rv != ERR_IO_PENDING) {
3064 pending_request->OnRequestCompleteFailure(rv);
3065 }
3066 }
3067
GetSSLClientSocket() const3068 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3069 if (!is_secure_)
3070 return NULL;
3071 SSLClientSocket* ssl_socket =
3072 reinterpret_cast<SSLClientSocket*>(connection_->socket());
3073 DCHECK(ssl_socket);
3074 return ssl_socket;
3075 }
3076
OnWriteBufferConsumed(size_t frame_payload_size,size_t consume_size,SpdyBuffer::ConsumeSource consume_source)3077 void SpdySession::OnWriteBufferConsumed(
3078 size_t frame_payload_size,
3079 size_t consume_size,
3080 SpdyBuffer::ConsumeSource consume_source) {
3081 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3082 // deleted (e.g., a stream is closed due to incoming data).
3083
3084 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3085
3086 if (consume_source == SpdyBuffer::DISCARD) {
3087 // If we're discarding a frame or part of it, increase the send
3088 // window by the number of discarded bytes. (Although if we're
3089 // discarding part of a frame, it's probably because of a write
3090 // error and we'll be tearing down the session soon.)
3091 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3092 DCHECK_GT(remaining_payload_bytes, 0u);
3093 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3094 }
3095 // For consumed bytes, the send window is increased when we receive
3096 // a WINDOW_UPDATE frame.
3097 }
3098
IncreaseSendWindowSize(int32 delta_window_size)3099 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3100 // We can be called with |in_io_loop_| set if a SpdyBuffer is
3101 // deleted (e.g., a stream is closed due to incoming data).
3102
3103 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3104 DCHECK_GE(delta_window_size, 1);
3105
3106 // Check for overflow.
3107 int32 max_delta_window_size = kint32max - session_send_window_size_;
3108 if (delta_window_size > max_delta_window_size) {
3109 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3110 DoDrainSession(
3111 ERR_SPDY_PROTOCOL_ERROR,
3112 "Received WINDOW_UPDATE [delta: " +
3113 base::IntToString(delta_window_size) +
3114 "] for session overflows session_send_window_size_ [current: " +
3115 base::IntToString(session_send_window_size_) + "]");
3116 return;
3117 }
3118
3119 session_send_window_size_ += delta_window_size;
3120
3121 net_log_.AddEvent(
3122 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3123 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3124 delta_window_size, session_send_window_size_));
3125
3126 DCHECK(!IsSendStalled());
3127 ResumeSendStalledStreams();
3128 }
3129
DecreaseSendWindowSize(int32 delta_window_size)3130 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3131 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3132
3133 // We only call this method when sending a frame. Therefore,
3134 // |delta_window_size| should be within the valid frame size range.
3135 DCHECK_GE(delta_window_size, 1);
3136 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3137
3138 // |send_window_size_| should have been at least |delta_window_size| for
3139 // this call to happen.
3140 DCHECK_GE(session_send_window_size_, delta_window_size);
3141
3142 session_send_window_size_ -= delta_window_size;
3143
3144 net_log_.AddEvent(
3145 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3146 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3147 -delta_window_size, session_send_window_size_));
3148 }
3149
OnReadBufferConsumed(size_t consume_size,SpdyBuffer::ConsumeSource consume_source)3150 void SpdySession::OnReadBufferConsumed(
3151 size_t consume_size,
3152 SpdyBuffer::ConsumeSource consume_source) {
3153 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3154 // deleted (e.g., discarded by a SpdyReadQueue).
3155
3156 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3157 DCHECK_GE(consume_size, 1u);
3158 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3159
3160 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3161 }
3162
IncreaseRecvWindowSize(int32 delta_window_size)3163 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3164 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3165 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3166 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3167 DCHECK_GE(delta_window_size, 1);
3168 // Check for overflow.
3169 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3170
3171 session_recv_window_size_ += delta_window_size;
3172 net_log_.AddEvent(
3173 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3174 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3175 delta_window_size, session_recv_window_size_));
3176
3177 session_unacked_recv_window_bytes_ += delta_window_size;
3178 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3179 SendWindowUpdateFrame(kSessionFlowControlStreamId,
3180 session_unacked_recv_window_bytes_,
3181 HIGHEST);
3182 session_unacked_recv_window_bytes_ = 0;
3183 }
3184 }
3185
DecreaseRecvWindowSize(int32 delta_window_size)3186 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3187 CHECK(in_io_loop_);
3188 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3189 DCHECK_GE(delta_window_size, 1);
3190
3191 // Since we never decrease the initial receive window size,
3192 // |delta_window_size| should never cause |recv_window_size_| to go
3193 // negative. If we do, the receive window isn't being respected.
3194 if (delta_window_size > session_recv_window_size_) {
3195 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3196 DoDrainSession(
3197 ERR_SPDY_FLOW_CONTROL_ERROR,
3198 "delta_window_size is " + base::IntToString(delta_window_size) +
3199 " in DecreaseRecvWindowSize, which is larger than the receive " +
3200 "window size of " + base::IntToString(session_recv_window_size_));
3201 return;
3202 }
3203
3204 session_recv_window_size_ -= delta_window_size;
3205 net_log_.AddEvent(
3206 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3207 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3208 -delta_window_size, session_recv_window_size_));
3209 }
3210
QueueSendStalledStream(const SpdyStream & stream)3211 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3212 DCHECK(stream.send_stalled_by_flow_control());
3213 RequestPriority priority = stream.priority();
3214 CHECK_GE(priority, MINIMUM_PRIORITY);
3215 CHECK_LE(priority, MAXIMUM_PRIORITY);
3216 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3217 }
3218
ResumeSendStalledStreams()3219 void SpdySession::ResumeSendStalledStreams() {
3220 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3221
3222 // We don't have to worry about new streams being queued, since
3223 // doing so would cause IsSendStalled() to return true. But we do
3224 // have to worry about streams being closed, as well as ourselves
3225 // being closed.
3226
3227 while (!IsSendStalled()) {
3228 size_t old_size = 0;
3229 #if DCHECK_IS_ON
3230 old_size = GetTotalSize(stream_send_unstall_queue_);
3231 #endif
3232
3233 SpdyStreamId stream_id = PopStreamToPossiblyResume();
3234 if (stream_id == 0)
3235 break;
3236 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3237 // The stream may actually still be send-stalled after this (due
3238 // to its own send window) but that's okay -- it'll then be
3239 // resumed once its send window increases.
3240 if (it != active_streams_.end())
3241 it->second.stream->PossiblyResumeIfSendStalled();
3242
3243 // The size should decrease unless we got send-stalled again.
3244 if (!IsSendStalled())
3245 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3246 }
3247 }
3248
PopStreamToPossiblyResume()3249 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3250 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3251 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3252 if (!queue->empty()) {
3253 SpdyStreamId stream_id = queue->front();
3254 queue->pop_front();
3255 return stream_id;
3256 }
3257 }
3258 return 0;
3259 }
3260
3261 } // namespace net
3262