1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/flip/flip_session.h"
6
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/message_loop.h"
10 #include "base/rand_util.h"
11 #include "base/stats_counters.h"
12 #include "base/stl_util-inl.h"
13 #include "base/string_util.h"
14 #include "net/base/connection_type_histograms.h"
15 #include "net/base/load_flags.h"
16 #include "net/base/load_log.h"
17 #include "net/base/net_util.h"
18 #include "net/flip/flip_frame_builder.h"
19 #include "net/flip/flip_protocol.h"
20 #include "net/flip/flip_stream.h"
21 #include "net/http/http_network_session.h"
22 #include "net/http/http_request_info.h"
23 #include "net/http/http_response_headers.h"
24 #include "net/http/http_response_info.h"
25 #include "net/socket/client_socket.h"
26 #include "net/socket/client_socket_factory.h"
27 #include "net/socket/ssl_client_socket.h"
28 #include "net/tools/dump_cache/url_to_filename_encoder.h"
29
30 namespace {
31
32 // Diagnostics function to dump the headers of a request.
33 // TODO(mbelshe): Remove this function.
DumpFlipHeaders(const flip::FlipHeaderBlock & headers)34 void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) {
35 // Because this function gets called on every request,
36 // take extra care to optimize it away if logging is turned off.
37 if (logging::LOG_INFO < logging::GetMinLogLevel())
38 return;
39
40 flip::FlipHeaderBlock::const_iterator it = headers.begin();
41 while (it != headers.end()) {
42 std::string val = (*it).second;
43 std::string::size_type pos = 0;
44 while ((pos = val.find('\0', pos)) != val.npos)
45 val[pos] = '\n';
46 LOG(INFO) << (*it).first << "==" << val;
47 ++it;
48 }
49 }
50
51 } // namespace
52
53 namespace net {
54
55 namespace {
56
57 #ifdef WIN32
58 // We use an artificially small buffer size on windows because the async IO
59 // system will artifiially delay IO completions when we use large buffers.
60 const int kReadBufferSize = 2 * 1024;
61 #else
62 const int kReadBufferSize = 8 * 1024;
63 #endif
64
65 // Convert a FlipHeaderBlock into an HttpResponseInfo.
66 // |headers| input parameter with the FlipHeaderBlock.
67 // |info| output parameter for the HttpResponseInfo.
68 // Returns true if successfully converted. False if there was a failure
69 // or if the FlipHeaderBlock was invalid.
FlipHeadersToHttpResponse(const flip::FlipHeaderBlock & headers,HttpResponseInfo * response)70 bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers,
71 HttpResponseInfo* response) {
72 std::string version;
73 std::string status;
74
75 // The "status" and "version" headers are required.
76 flip::FlipHeaderBlock::const_iterator it;
77 it = headers.find("status");
78 if (it == headers.end()) {
79 LOG(ERROR) << "FlipHeaderBlock without status header.";
80 return false;
81 }
82 status = it->second;
83
84 // Grab the version. If not provided by the server,
85 it = headers.find("version");
86 if (it == headers.end()) {
87 LOG(ERROR) << "FlipHeaderBlock without version header.";
88 return false;
89 }
90 version = it->second;
91
92 std::string raw_headers(version);
93 raw_headers.push_back(' ');
94 raw_headers.append(status);
95 raw_headers.push_back('\0');
96 for (it = headers.begin(); it != headers.end(); ++it) {
97 // For each value, if the server sends a NUL-separated
98 // list of values, we separate that back out into
99 // individual headers for each value in the list.
100 // e.g.
101 // Set-Cookie "foo\0bar"
102 // becomes
103 // Set-Cookie: foo\0
104 // Set-Cookie: bar\0
105 std::string value = it->second;
106 size_t start = 0;
107 size_t end = 0;
108 do {
109 end = value.find('\0', start);
110 std::string tval;
111 if (end != value.npos)
112 tval = value.substr(start, (end - start));
113 else
114 tval = value.substr(start);
115 raw_headers.append(it->first);
116 raw_headers.push_back(':');
117 raw_headers.append(tval);
118 raw_headers.push_back('\0');
119 start = end + 1;
120 } while (end != value.npos);
121 }
122
123 response->headers = new HttpResponseHeaders(raw_headers);
124 response->was_fetched_via_spdy = true;
125 return true;
126 }
127
128 // Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
129 // a HttpRequestInfo block.
CreateFlipHeadersFromHttpRequest(const HttpRequestInfo & info,flip::FlipHeaderBlock * headers)130 void CreateFlipHeadersFromHttpRequest(
131 const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) {
132 static const char kHttpProtocolVersion[] = "HTTP/1.1";
133
134 HttpUtil::HeadersIterator it(info.extra_headers.begin(),
135 info.extra_headers.end(),
136 "\r\n");
137 while (it.GetNext()) {
138 std::string name = StringToLowerASCII(it.name());
139 if (headers->find(name) == headers->end()) {
140 (*headers)[name] = it.values();
141 } else {
142 std::string new_value = (*headers)[name];
143 new_value += "\0";
144 new_value += it.values();
145 (*headers)[name] = new_value;
146 }
147 }
148
149 // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc)
150 // TODO(mbelshe): Add authentication headers here.
151
152 (*headers)["method"] = info.method;
153 (*headers)["url"] = info.url.spec();
154 (*headers)["version"] = kHttpProtocolVersion;
155 if (info.user_agent.length())
156 (*headers)["user-agent"] = info.user_agent;
157 if (!info.referrer.is_empty())
158 (*headers)["referer"] = info.referrer.spec();
159
160 // Honor load flags that impact proxy caches.
161 if (info.load_flags & LOAD_BYPASS_CACHE) {
162 (*headers)["pragma"] = "no-cache";
163 (*headers)["cache-control"] = "no-cache";
164 } else if (info.load_flags & LOAD_VALIDATE_CACHE) {
165 (*headers)["cache-control"] = "max-age=0";
166 }
167 }
168
AdjustSocketBufferSizes(ClientSocket * socket)169 void AdjustSocketBufferSizes(ClientSocket* socket) {
170 // Adjust socket buffer sizes.
171 // FLIP uses one socket, and we want a really big buffer.
172 // This greatly helps on links with packet loss - we can even
173 // outperform Vista's dynamic window sizing algorithm.
174 // TODO(mbelshe): more study.
175 const int kSocketBufferSize = 512 * 1024;
176 socket->SetReceiveBufferSize(kSocketBufferSize);
177 socket->SetSendBufferSize(kSocketBufferSize);
178 }
179
180 } // namespace
181
182 // static
183 bool FlipSession::use_ssl_ = true;
184
FlipSession(const std::string & host,HttpNetworkSession * session)185 FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session)
186 : ALLOW_THIS_IN_INITIALIZER_LIST(
187 connect_callback_(this, &FlipSession::OnTCPConnect)),
188 ALLOW_THIS_IN_INITIALIZER_LIST(
189 ssl_connect_callback_(this, &FlipSession::OnSSLConnect)),
190 ALLOW_THIS_IN_INITIALIZER_LIST(
191 read_callback_(this, &FlipSession::OnReadComplete)),
192 ALLOW_THIS_IN_INITIALIZER_LIST(
193 write_callback_(this, &FlipSession::OnWriteComplete)),
194 domain_(host),
195 session_(session),
196 connection_(new ClientSocketHandle),
197 read_buffer_(new IOBuffer(kReadBufferSize)),
198 read_pending_(false),
199 stream_hi_water_mark_(1), // Always start at 1 for the first stream id.
200 write_pending_(false),
201 delayed_write_pending_(false),
202 is_secure_(false),
203 error_(OK),
204 state_(IDLE),
205 streams_initiated_count_(0),
206 streams_pushed_count_(0),
207 streams_pushed_and_claimed_count_(0),
208 streams_abandoned_count_(0) {
209 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
210
211 flip_framer_.set_visitor(this);
212
213 session_->ssl_config_service()->GetSSLConfig(&ssl_config_);
214
215 // TODO(agl): This is a temporary hack for testing reasons. In the medium
216 // term we'll want to use NPN for all HTTPS connections and use the protocol
217 // suggested.
218 //
219 // In the event that the server supports Next Protocol Negotiation, but
220 // doesn't support either of these protocols, we'll request the first
221 // protocol in the list. Because of that, HTTP is listed first because it's
222 // what we'll actually fallback to in the case that the server doesn't
223 // support SPDY.
224 ssl_config_.next_protos = "\007http1.1\004spdy";
225 }
226
~FlipSession()227 FlipSession::~FlipSession() {
228 // Cleanup all the streams.
229 CloseAllStreams(net::ERR_ABORTED);
230
231 if (connection_->is_initialized()) {
232 // With Flip we can't recycle sockets.
233 connection_->socket()->Disconnect();
234 }
235
236 // TODO(willchan): Don't hardcode port 80 here.
237 DCHECK(!session_->flip_session_pool()->HasSession(
238 HostResolver::RequestInfo(domain_, 80)));
239
240 // Record per-session histograms here.
241 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
242 streams_initiated_count_,
243 0, 300, 50);
244 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
245 streams_pushed_count_,
246 0, 300, 50);
247 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
248 streams_pushed_and_claimed_count_,
249 0, 300, 50);
250 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
251 streams_abandoned_count_,
252 0, 300, 50);
253 }
254
InitializeWithSocket(ClientSocketHandle * connection)255 void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) {
256 static StatsCounter flip_sessions("flip.sessions");
257 flip_sessions.Increment();
258
259 AdjustSocketBufferSizes(connection->socket());
260
261 state_ = CONNECTED;
262 connection_.reset(connection);
263
264 // This is a newly initialized session that no client should have a handle to
265 // yet, so there's no need to start writing data as in OnTCPConnect(), but we
266 // should start reading data.
267 ReadSocket();
268 }
269
Connect(const std::string & group_name,const HostResolver::RequestInfo & host,RequestPriority priority,LoadLog * load_log)270 net::Error FlipSession::Connect(const std::string& group_name,
271 const HostResolver::RequestInfo& host,
272 RequestPriority priority,
273 LoadLog* load_log) {
274 DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST);
275
276 // If the connect process is started, let the caller continue.
277 if (state_ > IDLE)
278 return net::OK;
279
280 state_ = CONNECTING;
281
282 static StatsCounter flip_sessions("flip.sessions");
283 flip_sessions.Increment();
284
285 int rv = connection_->Init(group_name, host, priority, &connect_callback_,
286 session_->tcp_socket_pool(), load_log);
287 DCHECK(rv <= 0);
288
289 // If the connect is pending, we still return ok. The APIs enqueue
290 // work until after the connect completes asynchronously later.
291 if (rv == net::ERR_IO_PENDING)
292 return net::OK;
293 return static_cast<net::Error>(rv);
294 }
295
GetOrCreateStream(const HttpRequestInfo & request,const UploadDataStream * upload_data,LoadLog * log)296 scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
297 const HttpRequestInfo& request,
298 const UploadDataStream* upload_data,
299 LoadLog* log) {
300 const GURL& url = request.url;
301 const std::string& path = url.PathForRequest();
302
303 scoped_refptr<FlipStream> stream;
304
305 // Check if we have a push stream for this path.
306 if (request.method == "GET") {
307 stream = GetPushStream(path);
308 if (stream) {
309 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
310 streams_pushed_and_claimed_count_++;
311 return stream;
312 }
313 }
314
315 // Check if we have a pending push stream for this url.
316 PendingStreamMap::iterator it;
317 it = pending_streams_.find(path);
318 if (it != pending_streams_.end()) {
319 DCHECK(!it->second);
320 // Server will assign a stream id when the push stream arrives. Use 0 for
321 // now.
322 LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM);
323 FlipStream* stream = new FlipStream(this, 0, true, log);
324 stream->set_path(path);
325 it->second = stream;
326 return it->second;
327 }
328
329 const flip::FlipStreamId stream_id = GetNewStreamId();
330
331 // If we still don't have a stream, activate one now.
332 stream = new FlipStream(this, stream_id, false, log);
333 stream->set_priority(request.priority);
334 stream->set_path(path);
335 ActivateStream(stream);
336
337 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
338 static_cast<int>(request.priority), 0, 10, 11);
339
340 LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url;
341
342 // TODO(mbelshe): Optimize memory allocations
343 DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST &&
344 request.priority <= FLIP_PRIORITY_LOWEST);
345
346 // Convert from HttpRequestHeaders to Flip Headers.
347 flip::FlipHeaderBlock headers;
348 CreateFlipHeadersFromHttpRequest(request, &headers);
349
350 flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE;
351 if (!request.upload_data || !upload_data->size())
352 flags = flip::CONTROL_FLAG_FIN;
353
354 // Create a SYN_STREAM packet and add to the output queue.
355 scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame(
356 flip_framer_.CreateSynStream(stream_id, request.priority, flags, false,
357 &headers));
358 int length = flip::FlipFrame::size() + syn_frame->length();
359 IOBuffer* buffer = new IOBuffer(length);
360 memcpy(buffer->data(), syn_frame->data(), length);
361 queue_.push(FlipIOBuffer(buffer, length, request.priority, stream));
362
363 static StatsCounter flip_requests("flip.requests");
364 flip_requests.Increment();
365
366 LOG(INFO) << "FETCHING: " << request.url.spec();
367 streams_initiated_count_++;
368
369 LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------";
370 DumpFlipHeaders(headers);
371
372 // Schedule to write to the socket after we've made it back
373 // to the message loop so that we can aggregate multiple
374 // requests.
375 // TODO(mbelshe): Should we do the "first" request immediately?
376 // maybe we should only 'do later' for subsequent
377 // requests.
378 WriteSocketLater();
379
380 return stream;
381 }
382
WriteStreamData(flip::FlipStreamId stream_id,net::IOBuffer * data,int len)383 int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
384 net::IOBuffer* data, int len) {
385 LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len
386 << " bytes)";
387 const int kMss = 1430; // This is somewhat arbitrary and not really fixed,
388 // but it will always work reasonably with ethernet.
389 // Chop the world into 2-packet chunks. This is somewhat arbitrary, but
390 // is reasonably small and ensures that we elicit ACKs quickly from TCP
391 // (because TCP tries to only ACK every other packet).
392 const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size();
393
394 // Find our stream
395 DCHECK(IsStreamActive(stream_id));
396 scoped_refptr<FlipStream> stream = active_streams_[stream_id];
397 CHECK(stream->stream_id() == stream_id);
398 if (!stream)
399 return ERR_INVALID_FLIP_STREAM;
400
401 // TODO(mbelshe): Setting of the FIN is assuming that the caller will pass
402 // all data to write in a single chunk. Is this always true?
403
404 // Set the flags on the upload.
405 flip::FlipDataFlags flags = flip::DATA_FLAG_FIN;
406 if (len > kMaxFlipFrameChunkSize) {
407 len = kMaxFlipFrameChunkSize;
408 flags = flip::DATA_FLAG_NONE;
409 }
410
411 // TODO(mbelshe): reduce memory copies here.
412 scoped_ptr<flip::FlipDataFrame> frame(
413 flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
414 int length = flip::FlipFrame::size() + frame->length();
415 IOBufferWithSize* buffer = new IOBufferWithSize(length);
416 memcpy(buffer->data(), frame->data(), length);
417 queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream));
418
419 // Whenever we queue onto the socket we need to ensure that we will write to
420 // it later.
421 WriteSocketLater();
422
423 return ERR_IO_PENDING;
424 }
425
CancelStream(flip::FlipStreamId stream_id)426 bool FlipSession::CancelStream(flip::FlipStreamId stream_id) {
427 LOG(INFO) << "Cancelling stream " << stream_id;
428 if (!IsStreamActive(stream_id))
429 return false;
430
431 // TODO(mbelshe): We should send a FIN_STREAM control frame here
432 // so that the server can cancel a large send.
433
434 // TODO(mbelshe): Write a method for tearing down a stream
435 // that cleans it out of the active list, the pending list,
436 // etc.
437 scoped_refptr<FlipStream> stream = active_streams_[stream_id];
438 DeactivateStream(stream_id);
439 return true;
440 }
441
IsStreamActive(flip::FlipStreamId stream_id) const442 bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const {
443 return ContainsKey(active_streams_, stream_id);
444 }
445
GetLoadState() const446 LoadState FlipSession::GetLoadState() const {
447 // NOTE: The application only queries the LoadState via the
448 // FlipNetworkTransaction, and details are only needed when
449 // we're in the process of connecting.
450
451 // If we're connecting, defer to the connection to give us the actual
452 // LoadState.
453 if (state_ == CONNECTING)
454 return connection_->GetLoadState();
455
456 // Just report that we're idle since the session could be doing
457 // many things concurrently.
458 return LOAD_STATE_IDLE;
459 }
460
OnTCPConnect(int result)461 void FlipSession::OnTCPConnect(int result) {
462 LOG(INFO) << "Flip socket connected (result=" << result << ")";
463
464 // We shouldn't be coming through this path if we didn't just open a fresh
465 // socket (or have an error trying to do so).
466 DCHECK(!connection_->socket() || !connection_->is_reused());
467
468 UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0);
469
470 if (result != net::OK) {
471 DCHECK_LT(result, 0);
472 CloseSessionOnError(static_cast<net::Error>(result));
473 return;
474 }
475
476 AdjustSocketBufferSizes(connection_->socket());
477
478 if (use_ssl_) {
479 // Add a SSL socket on top of our existing transport socket.
480 ClientSocket* socket = connection_->release_socket();
481 // TODO(mbelshe): Fix the hostname. This is BROKEN without having
482 // a real hostname.
483 socket = session_->socket_factory()->CreateSSLClientSocket(
484 socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_);
485 connection_->set_socket(socket);
486 is_secure_ = true;
487 // TODO(willchan): Plumb LoadLog into FLIP code.
488 int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL);
489 if (status != ERR_IO_PENDING)
490 OnSSLConnect(status);
491 } else {
492 DCHECK_EQ(state_, CONNECTING);
493 state_ = CONNECTED;
494
495 // Make sure we get any pending data sent.
496 WriteSocketLater();
497 // Start reading
498 ReadSocket();
499 }
500 }
501
OnSSLConnect(int result)502 void FlipSession::OnSSLConnect(int result) {
503 // TODO(mbelshe): We need to replicate the functionality of
504 // HttpNetworkTransaction::DoSSLConnectComplete here, where it calls
505 // HandleCertificateError() and such.
506 if (IsCertificateError(result))
507 result = OK; // TODO(mbelshe): pretend we're happy anyway.
508
509 if (result == OK) {
510 DCHECK_EQ(state_, CONNECTING);
511 state_ = CONNECTED;
512
513 // After we've connected, send any data to the server, and then issue
514 // our read.
515 WriteSocketLater();
516 ReadSocket();
517 } else {
518 DCHECK_LT(result, 0); // It should be an error, not a byte count.
519 CloseSessionOnError(static_cast<net::Error>(result));
520 }
521 }
522
OnReadComplete(int bytes_read)523 void FlipSession::OnReadComplete(int bytes_read) {
524 // Parse a frame. For now this code requires that the frame fit into our
525 // buffer (32KB).
526 // TODO(mbelshe): support arbitrarily large frames!
527
528 LOG(INFO) << "Flip socket read: " << bytes_read << " bytes";
529
530 read_pending_ = false;
531
532 if (bytes_read <= 0) {
533 // Session is tearing down.
534 net::Error error = static_cast<net::Error>(bytes_read);
535 if (error == OK)
536 error = ERR_CONNECTION_CLOSED;
537 CloseSessionOnError(error);
538 return;
539 }
540
541 // The FlipFramer will use callbacks onto |this| as it parses frames.
542 // When errors occur, those callbacks can lead to teardown of all references
543 // to |this|, so maintain a reference to self during this call for safe
544 // cleanup.
545 scoped_refptr<FlipSession> self(this);
546
547 char *data = read_buffer_->data();
548 while (bytes_read &&
549 flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) {
550 uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read);
551 bytes_read -= bytes_processed;
552 data += bytes_processed;
553 if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE)
554 flip_framer_.Reset();
555 }
556
557 if (state_ != CLOSED)
558 ReadSocket();
559 }
560
OnWriteComplete(int result)561 void FlipSession::OnWriteComplete(int result) {
562 DCHECK(write_pending_);
563 DCHECK(in_flight_write_.size());
564 DCHECK(result != 0); // This shouldn't happen for write.
565
566 write_pending_ = false;
567
568 LOG(INFO) << "Flip write complete (result=" << result << ") for stream: "
569 << in_flight_write_.stream()->stream_id();
570
571 if (result >= 0) {
572 // It should not be possible to have written more bytes than our
573 // in_flight_write_.
574 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
575
576 in_flight_write_.buffer()->DidConsume(result);
577
578 // We only notify the stream when we've fully written the pending frame.
579 if (!in_flight_write_.buffer()->BytesRemaining()) {
580 scoped_refptr<FlipStream> stream = in_flight_write_.stream();
581 DCHECK(stream.get());
582
583 // Report the number of bytes written to the caller, but exclude the
584 // frame size overhead. NOTE: if this frame was compressed the reported
585 // bytes written is the compressed size, not the original size.
586 if (result > 0) {
587 result = in_flight_write_.buffer()->size();
588 DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size()));
589 result -= static_cast<int>(flip::FlipFrame::size());
590 }
591
592 // It is possible that the stream was cancelled while we were writing
593 // to the socket.
594 if (!stream->cancelled())
595 stream->OnWriteComplete(result);
596
597 // Cleanup the write which just completed.
598 in_flight_write_.release();
599 }
600
601 // Write more data. We're already in a continuation, so we can
602 // go ahead and write it immediately (without going back to the
603 // message loop).
604 WriteSocketLater();
605 } else {
606 in_flight_write_.release();
607
608 // The stream is now errored. Close it down.
609 CloseSessionOnError(static_cast<net::Error>(result));
610 }
611 }
612
ReadSocket()613 void FlipSession::ReadSocket() {
614 if (read_pending_)
615 return;
616
617 if (state_ == CLOSED) {
618 NOTREACHED();
619 return;
620 }
621
622 CHECK(connection_.get());
623 CHECK(connection_->socket());
624 int bytes_read = connection_->socket()->Read(read_buffer_.get(),
625 kReadBufferSize,
626 &read_callback_);
627 switch (bytes_read) {
628 case 0:
629 // Socket is closed!
630 // TODO(mbelshe): Need to abort any active streams here.
631 DCHECK(!active_streams_.size());
632 return;
633 case net::ERR_IO_PENDING:
634 // Waiting for data. Nothing to do now.
635 read_pending_ = true;
636 return;
637 default:
638 // Data was read, process it.
639 // Schedule the work through the message loop to avoid recursive
640 // callbacks.
641 read_pending_ = true;
642 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
643 this, &FlipSession::OnReadComplete, bytes_read));
644 break;
645 }
646 }
647
WriteSocketLater()648 void FlipSession::WriteSocketLater() {
649 if (delayed_write_pending_)
650 return;
651
652 delayed_write_pending_ = true;
653 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
654 this, &FlipSession::WriteSocket));
655 }
656
WriteSocket()657 void FlipSession::WriteSocket() {
658 // This function should only be called via WriteSocketLater.
659 DCHECK(delayed_write_pending_);
660 delayed_write_pending_ = false;
661
662 // If the socket isn't connected yet, just wait; we'll get called
663 // again when the socket connection completes. If the socket is
664 // closed, just return.
665 if (state_ < CONNECTED || state_ == CLOSED)
666 return;
667
668 if (write_pending_) // Another write is in progress still.
669 return;
670
671 // Loop sending frames until we've sent everything or until the write
672 // returns error (or ERR_IO_PENDING).
673 while (in_flight_write_.buffer() || queue_.size()) {
674 if (!in_flight_write_.buffer()) {
675 // Grab the next FlipFrame to send.
676 FlipIOBuffer next_buffer = queue_.top();
677 queue_.pop();
678
679 // We've deferred compression until just before we write it to the socket,
680 // which is now. At this time, we don't compress our data frames.
681 flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
682 size_t size;
683 if (uncompressed_frame.is_control_frame()) {
684 scoped_ptr<flip::FlipFrame> compressed_frame(
685 flip_framer_.CompressFrame(&uncompressed_frame));
686 size = compressed_frame->length() + flip::FlipFrame::size();
687
688 DCHECK(size > 0);
689
690 // TODO(mbelshe): We have too much copying of data here.
691 IOBufferWithSize* buffer = new IOBufferWithSize(size);
692 memcpy(buffer->data(), compressed_frame->data(), size);
693
694 // Attempt to send the frame.
695 in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream());
696 } else {
697 size = uncompressed_frame.length() + flip::FlipFrame::size();
698 in_flight_write_ = next_buffer;
699 }
700 } else {
701 DCHECK(in_flight_write_.buffer()->BytesRemaining());
702 }
703
704 write_pending_ = true;
705 int rv = connection_->socket()->Write(in_flight_write_.buffer(),
706 in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
707 if (rv == net::ERR_IO_PENDING)
708 break;
709
710 // We sent the frame successfully.
711 OnWriteComplete(rv);
712
713 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
714 // as in an error state.
715 if (rv < 0)
716 break;
717 }
718 }
719
CloseAllStreams(net::Error code)720 void FlipSession::CloseAllStreams(net::Error code) {
721 LOG(INFO) << "Closing all FLIP Streams";
722
723 static StatsCounter abandoned_streams("flip.abandoned_streams");
724 static StatsCounter abandoned_push_streams("flip.abandoned_push_streams");
725
726 if (active_streams_.size()) {
727 abandoned_streams.Add(active_streams_.size());
728
729 // Create a copy of the list, since aborting streams can invalidate
730 // our list.
731 FlipStream** list = new FlipStream*[active_streams_.size()];
732 ActiveStreamMap::const_iterator it;
733 int index = 0;
734 for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
735 list[index++] = it->second;
736
737 // Issue the aborts.
738 for (--index; index >= 0; index--) {
739 LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
740 << "): " << list[index]->path();
741 list[index]->OnClose(code);
742 }
743
744 // Clear out anything pending.
745 active_streams_.clear();
746
747 delete[] list;
748 }
749
750 if (pushed_streams_.size()) {
751 streams_abandoned_count_ += pushed_streams_.size();
752 abandoned_push_streams.Add(pushed_streams_.size());
753 pushed_streams_.clear();
754 }
755 }
756
GetNewStreamId()757 int FlipSession::GetNewStreamId() {
758 int id = stream_hi_water_mark_;
759 stream_hi_water_mark_ += 2;
760 if (stream_hi_water_mark_ > 0x7fff)
761 stream_hi_water_mark_ = 1;
762 return id;
763 }
764
CloseSessionOnError(net::Error err)765 void FlipSession::CloseSessionOnError(net::Error err) {
766 DCHECK_LT(err, OK);
767 LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")";
768
769 // Don't close twice. This can occur because we can have both
770 // a read and a write outstanding, and each can complete with
771 // an error.
772 if (state_ != CLOSED) {
773 state_ = CLOSED;
774 error_ = err;
775 CloseAllStreams(err);
776 session_->flip_session_pool()->Remove(this);
777 }
778 }
779
ActivateStream(FlipStream * stream)780 void FlipSession::ActivateStream(FlipStream* stream) {
781 const flip::FlipStreamId id = stream->stream_id();
782 DCHECK(!IsStreamActive(id));
783
784 active_streams_[id] = stream;
785 }
786
DeactivateStream(flip::FlipStreamId id)787 void FlipSession::DeactivateStream(flip::FlipStreamId id) {
788 DCHECK(IsStreamActive(id));
789
790 // Verify it is not on the pushed_streams_ list.
791 ActiveStreamList::iterator it;
792 for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
793 scoped_refptr<FlipStream> curr = *it;
794 if (id == curr->stream_id()) {
795 pushed_streams_.erase(it);
796 break;
797 }
798 }
799
800 active_streams_.erase(id);
801 }
802
GetPushStream(const std::string & path)803 scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) {
804 static StatsCounter used_push_streams("flip.claimed_push_streams");
805
806 LOG(INFO) << "Looking for push stream: " << path;
807
808 scoped_refptr<FlipStream> stream;
809
810 // We just walk a linear list here.
811 ActiveStreamList::iterator it;
812 for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
813 stream = *it;
814 if (path == stream->path()) {
815 CHECK(stream->pushed());
816 pushed_streams_.erase(it);
817 used_push_streams.Increment();
818 LOG(INFO) << "Push Stream Claim for: " << path;
819 break;
820 }
821 }
822
823 return stream;
824 }
825
GetSSLInfo(SSLInfo * ssl_info)826 void FlipSession::GetSSLInfo(SSLInfo* ssl_info) {
827 if (is_secure_) {
828 SSLClientSocket* ssl_socket =
829 reinterpret_cast<SSLClientSocket*>(connection_->socket());
830 ssl_socket->GetSSLInfo(ssl_info);
831 }
832 }
833
OnError(flip::FlipFramer * framer)834 void FlipSession::OnError(flip::FlipFramer* framer) {
835 LOG(ERROR) << "FlipSession error: " << framer->error_code();
836 CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR);
837 }
838
OnStreamFrameData(flip::FlipStreamId stream_id,const char * data,size_t len)839 void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
840 const char* data,
841 size_t len) {
842 LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes";
843 bool valid_stream = IsStreamActive(stream_id);
844 if (!valid_stream) {
845 // NOTE: it may just be that the stream was cancelled.
846 LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
847 return;
848 }
849
850 scoped_refptr<FlipStream> stream = active_streams_[stream_id];
851 bool success = stream->OnDataReceived(data, len);
852 // |len| == 0 implies a closed stream.
853 if (!success || !len)
854 DeactivateStream(stream_id);
855 }
856
OnSyn(const flip::FlipSynStreamControlFrame * frame,const flip::FlipHeaderBlock * headers)857 void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
858 const flip::FlipHeaderBlock* headers) {
859 flip::FlipStreamId stream_id = frame->stream_id();
860
861 // Server-initiated streams should have even sequence numbers.
862 if ((stream_id & 0x1) != 0) {
863 LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
864 return;
865 }
866
867 if (IsStreamActive(stream_id)) {
868 LOG(ERROR) << "Received OnSyn for active stream " << stream_id;
869 return;
870 }
871
872 streams_pushed_count_++;
873
874 LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id;
875
876 LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------";
877 DumpFlipHeaders(*headers);
878
879 // TODO(mbelshe): DCHECK that this is a GET method?
880
881 const std::string& path = ContainsKey(*headers, "path") ?
882 headers->find("path")->second : "";
883
884 // Verify that the response had a URL for us.
885 DCHECK(!path.empty());
886 if (path.empty()) {
887 LOG(WARNING) << "Pushed stream did not contain a path.";
888 return;
889 }
890
891 scoped_refptr<FlipStream> stream;
892
893 // Check if we already have a delegate awaiting this stream.
894 PendingStreamMap::iterator it;
895 it = pending_streams_.find(path);
896 if (it != pending_streams_.end()) {
897 stream = it->second;
898 pending_streams_.erase(it);
899 if (stream)
900 pushed_streams_.push_back(stream);
901 } else {
902 pushed_streams_.push_back(stream);
903 }
904
905 if (stream) {
906 CHECK(stream->pushed());
907 CHECK(stream->stream_id() == 0);
908 stream->set_stream_id(stream_id);
909 } else {
910 // TODO(mbelshe): can we figure out how to use a LoadLog here?
911 stream = new FlipStream(this, stream_id, true, NULL);
912 }
913
914 // Activate a stream and parse the headers.
915 ActivateStream(stream);
916
917 stream->set_path(path);
918
919 // TODO(mbelshe): For now we convert from our nice hash map back
920 // to a string of headers; this is because the HttpResponseInfo
921 // is a bit rigid for its http (non-flip) design.
922 HttpResponseInfo response;
923 if (FlipHeadersToHttpResponse(*headers, &response)) {
924 GetSSLInfo(&response.ssl_info);
925 stream->OnResponseReceived(response);
926 } else {
927 stream->OnClose(ERR_INVALID_RESPONSE);
928 DeactivateStream(stream_id);
929 return;
930 }
931
932 LOG(INFO) << "Got pushed stream for " << stream->path();
933
934 static StatsCounter push_requests("flip.pushed_streams");
935 push_requests.Increment();
936 }
937
OnSynReply(const flip::FlipSynReplyControlFrame * frame,const flip::FlipHeaderBlock * headers)938 void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
939 const flip::FlipHeaderBlock* headers) {
940 DCHECK(headers);
941 flip::FlipStreamId stream_id = frame->stream_id();
942 bool valid_stream = IsStreamActive(stream_id);
943 if (!valid_stream) {
944 // NOTE: it may just be that the stream was cancelled.
945 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
946 return;
947 }
948
949 LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id;
950 DumpFlipHeaders(*headers);
951
952 // We record content declared as being pushed so that we don't
953 // request a duplicate stream which is already scheduled to be
954 // sent to us.
955 flip::FlipHeaderBlock::const_iterator it;
956 it = headers->find("X-Associated-Content");
957 if (it != headers->end()) {
958 const std::string& content = it->second;
959 std::string::size_type start = 0;
960 std::string::size_type end = 0;
961 do {
962 end = content.find("||", start);
963 if (end == std::string::npos)
964 end = content.length();
965 std::string url = content.substr(start, end - start);
966 std::string::size_type pos = url.find("??");
967 if (pos == std::string::npos)
968 break;
969 url = url.substr(pos + 2);
970 GURL gurl(url);
971 std::string path = gurl.PathForRequest();
972 if (path.length())
973 pending_streams_[path] = NULL;
974 else
975 LOG(INFO) << "Invalid X-Associated-Content path: " << url;
976 start = end + 2;
977 } while (start < content.length());
978 }
979
980 scoped_refptr<FlipStream> stream = active_streams_[stream_id];
981 CHECK(stream->stream_id() == stream_id);
982 CHECK(!stream->cancelled());
983 HttpResponseInfo response;
984 if (FlipHeadersToHttpResponse(*headers, &response)) {
985 GetSSLInfo(&response.ssl_info);
986 stream->OnResponseReceived(response);
987 } else {
988 stream->OnClose(ERR_INVALID_RESPONSE);
989 DeactivateStream(stream_id);
990 }
991 }
992
OnControl(const flip::FlipControlFrame * frame)993 void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
994 flip::FlipHeaderBlock headers;
995 uint32 type = frame->type();
996 if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) {
997 if (!flip_framer_.ParseHeaderBlock(frame, &headers)) {
998 LOG(WARNING) << "Could not parse Flip Control Frame Header";
999 // TODO(mbelshe): Error the session?
1000 return;
1001 }
1002 }
1003
1004 switch (type) {
1005 case flip::SYN_STREAM:
1006 LOG(INFO) << "Flip SynStream for stream " << frame->stream_id();
1007 OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame),
1008 &headers);
1009 break;
1010 case flip::SYN_REPLY:
1011 LOG(INFO) << "Flip SynReply for stream " << frame->stream_id();
1012 OnSynReply(
1013 reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame),
1014 &headers);
1015 break;
1016 case flip::FIN_STREAM:
1017 LOG(INFO) << "Flip Fin for stream " << frame->stream_id();
1018 OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame));
1019 break;
1020 default:
1021 DCHECK(false); // Error!
1022 }
1023 }
1024
OnFin(const flip::FlipFinStreamControlFrame * frame)1025 void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
1026 flip::FlipStreamId stream_id = frame->stream_id();
1027 bool valid_stream = IsStreamActive(stream_id);
1028 if (!valid_stream) {
1029 // NOTE: it may just be that the stream was cancelled.
1030 LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
1031 return;
1032 }
1033 scoped_refptr<FlipStream> stream = active_streams_[stream_id];
1034 CHECK(stream->stream_id() == stream_id);
1035 CHECK(!stream->cancelled());
1036 if (frame->status() == 0) {
1037 stream->OnDataReceived(NULL, 0);
1038 } else {
1039 LOG(ERROR) << "Flip stream closed: " << frame->status();
1040 // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
1041 // For now, it doesn't matter much - it is a protocol error.
1042 stream->OnClose(ERR_FAILED);
1043 }
1044
1045 DeactivateStream(stream_id);
1046 }
1047
1048 } // namespace net
1049