1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_client_session.h"
6
7 #include "base/callback_helpers.h"
8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h"
10 #include "base/metrics/sparse_histogram.h"
11 #include "base/stl_util.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/values.h"
14 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h"
16 #include "net/quic/quic_connection_helper.h"
17 #include "net/quic/quic_crypto_client_stream_factory.h"
18 #include "net/quic/quic_default_packet_writer.h"
19 #include "net/quic/quic_stream_factory.h"
20 #include "net/ssl/ssl_info.h"
21 #include "net/udp/datagram_client_socket.h"
22
23 namespace net {
24
25 namespace {
26
27 // Note: these values must be kept in sync with the corresponding values in:
28 // tools/metrics/histograms/histograms.xml
29 enum HandshakeState {
30 STATE_STARTED = 0,
31 STATE_ENCRYPTION_ESTABLISHED = 1,
32 STATE_HANDSHAKE_CONFIRMED = 2,
33 STATE_FAILED = 3,
34 NUM_HANDSHAKE_STATES = 4
35 };
36
RecordHandshakeState(HandshakeState state)37 void RecordHandshakeState(HandshakeState state) {
38 UMA_HISTOGRAM_ENUMERATION("Net.QuicHandshakeState", state,
39 NUM_HANDSHAKE_STATES);
40 }
41
42 } // namespace
43
StreamRequest()44 QuicClientSession::StreamRequest::StreamRequest() : stream_(NULL) {}
45
~StreamRequest()46 QuicClientSession::StreamRequest::~StreamRequest() {
47 CancelRequest();
48 }
49
StartRequest(const base::WeakPtr<QuicClientSession> & session,QuicReliableClientStream ** stream,const CompletionCallback & callback)50 int QuicClientSession::StreamRequest::StartRequest(
51 const base::WeakPtr<QuicClientSession>& session,
52 QuicReliableClientStream** stream,
53 const CompletionCallback& callback) {
54 session_ = session;
55 stream_ = stream;
56 int rv = session_->TryCreateStream(this, stream_);
57 if (rv == ERR_IO_PENDING) {
58 callback_ = callback;
59 }
60
61 return rv;
62 }
63
CancelRequest()64 void QuicClientSession::StreamRequest::CancelRequest() {
65 if (session_)
66 session_->CancelRequest(this);
67 session_.reset();
68 callback_.Reset();
69 }
70
OnRequestCompleteSuccess(QuicReliableClientStream * stream)71 void QuicClientSession::StreamRequest::OnRequestCompleteSuccess(
72 QuicReliableClientStream* stream) {
73 session_.reset();
74 *stream_ = stream;
75 ResetAndReturn(&callback_).Run(OK);
76 }
77
OnRequestCompleteFailure(int rv)78 void QuicClientSession::StreamRequest::OnRequestCompleteFailure(int rv) {
79 session_.reset();
80 ResetAndReturn(&callback_).Run(rv);
81 }
82
QuicClientSession(QuicConnection * connection,scoped_ptr<DatagramClientSocket> socket,scoped_ptr<QuicDefaultPacketWriter> writer,QuicStreamFactory * stream_factory,QuicCryptoClientStreamFactory * crypto_client_stream_factory,const string & server_hostname,const QuicConfig & config,QuicCryptoClientConfig * crypto_config,NetLog * net_log)83 QuicClientSession::QuicClientSession(
84 QuicConnection* connection,
85 scoped_ptr<DatagramClientSocket> socket,
86 scoped_ptr<QuicDefaultPacketWriter> writer,
87 QuicStreamFactory* stream_factory,
88 QuicCryptoClientStreamFactory* crypto_client_stream_factory,
89 const string& server_hostname,
90 const QuicConfig& config,
91 QuicCryptoClientConfig* crypto_config,
92 NetLog* net_log)
93 : QuicSession(connection, config),
94 require_confirmation_(false),
95 stream_factory_(stream_factory),
96 socket_(socket.Pass()),
97 writer_(writer.Pass()),
98 read_buffer_(new IOBufferWithSize(kMaxPacketSize)),
99 read_pending_(false),
100 num_total_streams_(0),
101 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)),
102 logger_(net_log_),
103 num_packets_read_(0),
104 weak_factory_(this) {
105 crypto_stream_.reset(
106 crypto_client_stream_factory ?
107 crypto_client_stream_factory->CreateQuicCryptoClientStream(
108 server_hostname, this, crypto_config) :
109 new QuicCryptoClientStream(server_hostname, this, crypto_config));
110
111 connection->set_debug_visitor(&logger_);
112 // TODO(rch): pass in full host port proxy pair
113 net_log_.BeginEvent(
114 NetLog::TYPE_QUIC_SESSION,
115 NetLog::StringCallback("host", &server_hostname));
116 }
117
~QuicClientSession()118 QuicClientSession::~QuicClientSession() {
119 // The session must be closed before it is destroyed.
120 DCHECK(streams()->empty());
121 CloseAllStreams(ERR_UNEXPECTED);
122 DCHECK(observers_.empty());
123 CloseAllObservers(ERR_UNEXPECTED);
124
125 connection()->set_debug_visitor(NULL);
126 net_log_.EndEvent(NetLog::TYPE_QUIC_SESSION);
127
128 while (!stream_requests_.empty()) {
129 StreamRequest* request = stream_requests_.front();
130 stream_requests_.pop_front();
131 request->OnRequestCompleteFailure(ERR_ABORTED);
132 }
133
134 if (IsEncryptionEstablished())
135 RecordHandshakeState(STATE_ENCRYPTION_ESTABLISHED);
136 if (IsCryptoHandshakeConfirmed())
137 RecordHandshakeState(STATE_HANDSHAKE_CONFIRMED);
138 else
139 RecordHandshakeState(STATE_FAILED);
140
141 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellos",
142 crypto_stream_->num_sent_client_hellos());
143 if (IsCryptoHandshakeConfirmed()) {
144 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellosCryptoHandshakeConfirmed",
145 crypto_stream_->num_sent_client_hellos());
146 }
147
148 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumTotalStreams", num_total_streams_);
149 }
150
OnStreamFrames(const std::vector<QuicStreamFrame> & frames)151 bool QuicClientSession::OnStreamFrames(
152 const std::vector<QuicStreamFrame>& frames) {
153 // Record total number of stream frames.
154 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesInPacket", frames.size());
155
156 // Record number of frames per stream in packet.
157 typedef std::map<QuicStreamId, size_t> FrameCounter;
158 FrameCounter frames_per_stream;
159 for (size_t i = 0; i < frames.size(); ++i) {
160 frames_per_stream[frames[i].stream_id]++;
161 }
162 for (FrameCounter::const_iterator it = frames_per_stream.begin();
163 it != frames_per_stream.end(); ++it) {
164 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesPerStreamInPacket",
165 it->second);
166 }
167
168 return QuicSession::OnStreamFrames(frames);
169 }
170
AddObserver(Observer * observer)171 void QuicClientSession::AddObserver(Observer* observer) {
172 DCHECK(!ContainsKey(observers_, observer));
173 observers_.insert(observer);
174 }
175
RemoveObserver(Observer * observer)176 void QuicClientSession::RemoveObserver(Observer* observer) {
177 DCHECK(ContainsKey(observers_, observer));
178 observers_.erase(observer);
179 }
180
TryCreateStream(StreamRequest * request,QuicReliableClientStream ** stream)181 int QuicClientSession::TryCreateStream(StreamRequest* request,
182 QuicReliableClientStream** stream) {
183 if (!crypto_stream_->encryption_established()) {
184 DLOG(DFATAL) << "Encryption not established.";
185 return ERR_CONNECTION_CLOSED;
186 }
187
188 if (goaway_received()) {
189 DVLOG(1) << "Going away.";
190 return ERR_CONNECTION_CLOSED;
191 }
192
193 if (!connection()->connected()) {
194 DVLOG(1) << "Already closed.";
195 return ERR_CONNECTION_CLOSED;
196 }
197
198 if (GetNumOpenStreams() < get_max_open_streams()) {
199 *stream = CreateOutgoingReliableStreamImpl();
200 return OK;
201 }
202
203 stream_requests_.push_back(request);
204 return ERR_IO_PENDING;
205 }
206
CancelRequest(StreamRequest * request)207 void QuicClientSession::CancelRequest(StreamRequest* request) {
208 // Remove |request| from the queue while preserving the order of the
209 // other elements.
210 StreamRequestQueue::iterator it =
211 std::find(stream_requests_.begin(), stream_requests_.end(), request);
212 if (it != stream_requests_.end()) {
213 it = stream_requests_.erase(it);
214 }
215 }
216
CreateOutgoingDataStream()217 QuicReliableClientStream* QuicClientSession::CreateOutgoingDataStream() {
218 if (!crypto_stream_->encryption_established()) {
219 DVLOG(1) << "Encryption not active so no outgoing stream created.";
220 return NULL;
221 }
222 if (GetNumOpenStreams() >= get_max_open_streams()) {
223 DVLOG(1) << "Failed to create a new outgoing stream. "
224 << "Already " << GetNumOpenStreams() << " open.";
225 return NULL;
226 }
227 if (goaway_received()) {
228 DVLOG(1) << "Failed to create a new outgoing stream. "
229 << "Already received goaway.";
230 return NULL;
231 }
232
233 return CreateOutgoingReliableStreamImpl();
234 }
235
236 QuicReliableClientStream*
CreateOutgoingReliableStreamImpl()237 QuicClientSession::CreateOutgoingReliableStreamImpl() {
238 DCHECK(connection()->connected());
239 QuicReliableClientStream* stream =
240 new QuicReliableClientStream(GetNextStreamId(), this, net_log_);
241 ActivateStream(stream);
242 ++num_total_streams_;
243 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumOpenStreams", GetNumOpenStreams());
244 return stream;
245 }
246
GetCryptoStream()247 QuicCryptoClientStream* QuicClientSession::GetCryptoStream() {
248 return crypto_stream_.get();
249 };
250
GetSSLInfo(SSLInfo * ssl_info)251 bool QuicClientSession::GetSSLInfo(SSLInfo* ssl_info) {
252 DCHECK(crypto_stream_.get());
253 return crypto_stream_->GetSSLInfo(ssl_info);
254 }
255
CryptoConnect(bool require_confirmation,const CompletionCallback & callback)256 int QuicClientSession::CryptoConnect(bool require_confirmation,
257 const CompletionCallback& callback) {
258 require_confirmation_ = require_confirmation;
259 RecordHandshakeState(STATE_STARTED);
260 if (!crypto_stream_->CryptoConnect()) {
261 // TODO(wtc): change crypto_stream_.CryptoConnect() to return a
262 // QuicErrorCode and map it to a net error code.
263 return ERR_CONNECTION_FAILED;
264 }
265
266 bool can_notify = require_confirmation_ ?
267 IsCryptoHandshakeConfirmed() : IsEncryptionEstablished();
268 if (can_notify) {
269 return OK;
270 }
271
272 callback_ = callback;
273 return ERR_IO_PENDING;
274 }
275
GetNumSentClientHellos() const276 int QuicClientSession::GetNumSentClientHellos() const {
277 return crypto_stream_->num_sent_client_hellos();
278 }
279
CreateIncomingDataStream(QuicStreamId id)280 QuicDataStream* QuicClientSession::CreateIncomingDataStream(
281 QuicStreamId id) {
282 DLOG(ERROR) << "Server push not supported";
283 return NULL;
284 }
285
CloseStream(QuicStreamId stream_id)286 void QuicClientSession::CloseStream(QuicStreamId stream_id) {
287 QuicSession::CloseStream(stream_id);
288 OnClosedStream();
289 }
290
SendRstStream(QuicStreamId id,QuicRstStreamErrorCode error)291 void QuicClientSession::SendRstStream(QuicStreamId id,
292 QuicRstStreamErrorCode error) {
293 QuicSession::SendRstStream(id, error);
294 OnClosedStream();
295 }
296
OnClosedStream()297 void QuicClientSession::OnClosedStream() {
298 if (GetNumOpenStreams() < get_max_open_streams() &&
299 !stream_requests_.empty() &&
300 crypto_stream_->encryption_established() &&
301 !goaway_received() &&
302 connection()->connected()) {
303 StreamRequest* request = stream_requests_.front();
304 stream_requests_.pop_front();
305 request->OnRequestCompleteSuccess(CreateOutgoingReliableStreamImpl());
306 }
307
308 if (GetNumOpenStreams() == 0) {
309 stream_factory_->OnIdleSession(this);
310 }
311 }
312
OnCryptoHandshakeEvent(CryptoHandshakeEvent event)313 void QuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
314 if (!callback_.is_null() &&
315 (!require_confirmation_ || event == HANDSHAKE_CONFIRMED)) {
316 // TODO(rtenneti): Currently for all CryptoHandshakeEvent events, callback_
317 // could be called because there are no error events in CryptoHandshakeEvent
318 // enum. If error events are added to CryptoHandshakeEvent, then the
319 // following code needs to changed.
320 base::ResetAndReturn(&callback_).Run(OK);
321 }
322 if (event == HANDSHAKE_CONFIRMED) {
323 ObserverSet::iterator it = observers_.begin();
324 while (it != observers_.end()) {
325 Observer* observer = *it;
326 ++it;
327 observer->OnCryptoHandshakeConfirmed();
328 }
329 }
330 QuicSession::OnCryptoHandshakeEvent(event);
331 }
332
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage & message)333 void QuicClientSession::OnCryptoHandshakeMessageSent(
334 const CryptoHandshakeMessage& message) {
335 logger_.OnCryptoHandshakeMessageSent(message);
336 }
337
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage & message)338 void QuicClientSession::OnCryptoHandshakeMessageReceived(
339 const CryptoHandshakeMessage& message) {
340 logger_.OnCryptoHandshakeMessageReceived(message);
341 }
342
OnConnectionClosed(QuicErrorCode error,bool from_peer)343 void QuicClientSession::OnConnectionClosed(QuicErrorCode error,
344 bool from_peer) {
345 DCHECK(!connection()->connected());
346 logger_.OnConnectionClosed(error, from_peer);
347 if (from_peer) {
348 UMA_HISTOGRAM_SPARSE_SLOWLY(
349 "Net.QuicSession.ConnectionCloseErrorCodeServer", error);
350 } else {
351 UMA_HISTOGRAM_SPARSE_SLOWLY(
352 "Net.QuicSession.ConnectionCloseErrorCodeClient", error);
353 }
354
355 if (error == QUIC_CONNECTION_TIMED_OUT) {
356 UMA_HISTOGRAM_COUNTS(
357 "Net.QuicSession.ConnectionClose.NumOpenStreams.TimedOut",
358 GetNumOpenStreams());
359 if (!IsCryptoHandshakeConfirmed()) {
360 // If there have been any streams created, they were 0-RTT speculative
361 // requests that have not be serviced.
362 UMA_HISTOGRAM_COUNTS(
363 "Net.QuicSession.ConnectionClose.NumTotalStreams.HandshakeTimedOut",
364 num_total_streams_);
365 }
366 }
367
368 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.QuicVersion",
369 connection()->version());
370 NotifyFactoryOfSessionGoingAway();
371 if (!callback_.is_null()) {
372 base::ResetAndReturn(&callback_).Run(ERR_QUIC_PROTOCOL_ERROR);
373 }
374 socket_->Close();
375 QuicSession::OnConnectionClosed(error, from_peer);
376 DCHECK(streams()->empty());
377 CloseAllStreams(ERR_UNEXPECTED);
378 CloseAllObservers(ERR_UNEXPECTED);
379 NotifyFactoryOfSessionClosedLater();
380 }
381
OnSuccessfulVersionNegotiation(const QuicVersion & version)382 void QuicClientSession::OnSuccessfulVersionNegotiation(
383 const QuicVersion& version) {
384 logger_.OnSuccessfulVersionNegotiation(version);
385 QuicSession::OnSuccessfulVersionNegotiation(version);
386 }
387
StartReading()388 void QuicClientSession::StartReading() {
389 if (read_pending_) {
390 return;
391 }
392 read_pending_ = true;
393 int rv = socket_->Read(read_buffer_.get(),
394 read_buffer_->size(),
395 base::Bind(&QuicClientSession::OnReadComplete,
396 weak_factory_.GetWeakPtr()));
397 if (rv == ERR_IO_PENDING) {
398 num_packets_read_ = 0;
399 return;
400 }
401
402 if (++num_packets_read_ > 32) {
403 num_packets_read_ = 0;
404 // Data was read, process it.
405 // Schedule the work through the message loop to avoid recursive
406 // callbacks.
407 base::MessageLoop::current()->PostTask(
408 FROM_HERE,
409 base::Bind(&QuicClientSession::OnReadComplete,
410 weak_factory_.GetWeakPtr(), rv));
411 } else {
412 OnReadComplete(rv);
413 }
414 }
415
CloseSessionOnError(int error)416 void QuicClientSession::CloseSessionOnError(int error) {
417 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.CloseSessionOnError", -error);
418 CloseSessionOnErrorInner(error, QUIC_INTERNAL_ERROR);
419 NotifyFactoryOfSessionClosed();
420 }
421
CloseSessionOnErrorInner(int net_error,QuicErrorCode quic_error)422 void QuicClientSession::CloseSessionOnErrorInner(int net_error,
423 QuicErrorCode quic_error) {
424 if (!callback_.is_null()) {
425 base::ResetAndReturn(&callback_).Run(net_error);
426 }
427 CloseAllStreams(net_error);
428 CloseAllObservers(net_error);
429 net_log_.AddEvent(
430 NetLog::TYPE_QUIC_SESSION_CLOSE_ON_ERROR,
431 NetLog::IntegerCallback("net_error", net_error));
432
433 connection()->CloseConnection(quic_error, false);
434 DCHECK(!connection()->connected());
435 }
436
CloseAllStreams(int net_error)437 void QuicClientSession::CloseAllStreams(int net_error) {
438 while (!streams()->empty()) {
439 ReliableQuicStream* stream = streams()->begin()->second;
440 QuicStreamId id = stream->id();
441 static_cast<QuicReliableClientStream*>(stream)->OnError(net_error);
442 CloseStream(id);
443 }
444 }
445
CloseAllObservers(int net_error)446 void QuicClientSession::CloseAllObservers(int net_error) {
447 while (!observers_.empty()) {
448 Observer* observer = *observers_.begin();
449 observers_.erase(observer);
450 observer->OnSessionClosed(net_error);
451 }
452 }
453
GetInfoAsValue(const HostPortPair & pair) const454 base::Value* QuicClientSession::GetInfoAsValue(const HostPortPair& pair) const {
455 base::DictionaryValue* dict = new base::DictionaryValue();
456 dict->SetString("host_port_pair", pair.ToString());
457 dict->SetString("version", QuicVersionToString(connection()->version()));
458 dict->SetInteger("open_streams", GetNumOpenStreams());
459 dict->SetInteger("total_streams", num_total_streams_);
460 dict->SetString("peer_address", peer_address().ToString());
461 dict->SetString("guid", base::Uint64ToString(guid()));
462 dict->SetBoolean("connected", connection()->connected());
463 return dict;
464 }
465
GetWeakPtr()466 base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() {
467 return weak_factory_.GetWeakPtr();
468 }
469
OnReadComplete(int result)470 void QuicClientSession::OnReadComplete(int result) {
471 read_pending_ = false;
472 if (result == 0)
473 result = ERR_CONNECTION_CLOSED;
474
475 if (result < 0) {
476 DVLOG(1) << "Closing session on read error: " << result;
477 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result);
478 NotifyFactoryOfSessionGoingAway();
479 CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR);
480 NotifyFactoryOfSessionClosedLater();
481 return;
482 }
483
484 scoped_refptr<IOBufferWithSize> buffer(read_buffer_);
485 read_buffer_ = new IOBufferWithSize(kMaxPacketSize);
486 QuicEncryptedPacket packet(buffer->data(), result);
487 IPEndPoint local_address;
488 IPEndPoint peer_address;
489 socket_->GetLocalAddress(&local_address);
490 socket_->GetPeerAddress(&peer_address);
491 // ProcessUdpPacket might result in |this| being deleted, so we
492 // use a weak pointer to be safe.
493 connection()->ProcessUdpPacket(local_address, peer_address, packet);
494 if (!connection()->connected()) {
495 stream_factory_->OnSessionClosed(this);
496 return;
497 }
498 StartReading();
499 }
500
NotifyFactoryOfSessionGoingAway()501 void QuicClientSession::NotifyFactoryOfSessionGoingAway() {
502 if (stream_factory_)
503 stream_factory_->OnSessionGoingAway(this);
504 }
505
NotifyFactoryOfSessionClosedLater()506 void QuicClientSession::NotifyFactoryOfSessionClosedLater() {
507 DCHECK_EQ(0u, GetNumOpenStreams());
508 DCHECK(!connection()->connected());
509 base::MessageLoop::current()->PostTask(
510 FROM_HERE,
511 base::Bind(&QuicClientSession::NotifyFactoryOfSessionClosed,
512 weak_factory_.GetWeakPtr()));
513 }
514
NotifyFactoryOfSessionClosed()515 void QuicClientSession::NotifyFactoryOfSessionClosed() {
516 DCHECK_EQ(0u, GetNumOpenStreams());
517 // Will delete |this|.
518 if (stream_factory_)
519 stream_factory_->OnSessionClosed(this);
520 }
521
522 } // namespace net
523