• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/flip/flip_session.h"
6 
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9 #include "base/message_loop.h"
10 #include "base/rand_util.h"
11 #include "base/stats_counters.h"
12 #include "base/stl_util-inl.h"
13 #include "base/string_util.h"
14 #include "net/base/connection_type_histograms.h"
15 #include "net/base/load_flags.h"
16 #include "net/base/load_log.h"
17 #include "net/base/net_util.h"
18 #include "net/flip/flip_frame_builder.h"
19 #include "net/flip/flip_protocol.h"
20 #include "net/flip/flip_stream.h"
21 #include "net/http/http_network_session.h"
22 #include "net/http/http_request_info.h"
23 #include "net/http/http_response_headers.h"
24 #include "net/http/http_response_info.h"
25 #include "net/socket/client_socket.h"
26 #include "net/socket/client_socket_factory.h"
27 #include "net/socket/ssl_client_socket.h"
28 #include "net/tools/dump_cache/url_to_filename_encoder.h"
29 
30 namespace {
31 
32 // Diagnostics function to dump the headers of a request.
33 // TODO(mbelshe): Remove this function.
DumpFlipHeaders(const flip::FlipHeaderBlock & headers)34 void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) {
35   // Because this function gets called on every request,
36   // take extra care to optimize it away if logging is turned off.
37   if (logging::LOG_INFO < logging::GetMinLogLevel())
38     return;
39 
40   flip::FlipHeaderBlock::const_iterator it = headers.begin();
41   while (it != headers.end()) {
42     std::string val = (*it).second;
43     std::string::size_type pos = 0;
44     while ((pos = val.find('\0', pos)) != val.npos)
45       val[pos] = '\n';
46     LOG(INFO) << (*it).first << "==" << val;
47     ++it;
48   }
49 }
50 
51 }  // namespace
52 
53 namespace net {
54 
55 namespace {
56 
57 #ifdef WIN32
58 // We use an artificially small buffer size on windows because the async IO
59 // system will artifiially delay IO completions when we use large buffers.
60 const int kReadBufferSize = 2 * 1024;
61 #else
62 const int kReadBufferSize = 8 * 1024;
63 #endif
64 
65 // Convert a FlipHeaderBlock into an HttpResponseInfo.
66 // |headers| input parameter with the FlipHeaderBlock.
67 // |info| output parameter for the HttpResponseInfo.
68 // Returns true if successfully converted.  False if there was a failure
69 // or if the FlipHeaderBlock was invalid.
FlipHeadersToHttpResponse(const flip::FlipHeaderBlock & headers,HttpResponseInfo * response)70 bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers,
71                                HttpResponseInfo* response) {
72   std::string version;
73   std::string status;
74 
75   // The "status" and "version" headers are required.
76   flip::FlipHeaderBlock::const_iterator it;
77   it = headers.find("status");
78   if (it == headers.end()) {
79     LOG(ERROR) << "FlipHeaderBlock without status header.";
80     return false;
81   }
82   status = it->second;
83 
84   // Grab the version.  If not provided by the server,
85   it = headers.find("version");
86   if (it == headers.end()) {
87     LOG(ERROR) << "FlipHeaderBlock without version header.";
88     return false;
89   }
90   version = it->second;
91 
92   std::string raw_headers(version);
93   raw_headers.push_back(' ');
94   raw_headers.append(status);
95   raw_headers.push_back('\0');
96   for (it = headers.begin(); it != headers.end(); ++it) {
97     // For each value, if the server sends a NUL-separated
98     // list of values, we separate that back out into
99     // individual headers for each value in the list.
100     // e.g.
101     //    Set-Cookie "foo\0bar"
102     // becomes
103     //    Set-Cookie: foo\0
104     //    Set-Cookie: bar\0
105     std::string value = it->second;
106     size_t start = 0;
107     size_t end = 0;
108     do {
109       end = value.find('\0', start);
110       std::string tval;
111       if (end != value.npos)
112         tval = value.substr(start, (end - start));
113       else
114         tval = value.substr(start);
115       raw_headers.append(it->first);
116       raw_headers.push_back(':');
117       raw_headers.append(tval);
118       raw_headers.push_back('\0');
119       start = end + 1;
120     } while (end != value.npos);
121   }
122 
123   response->headers = new HttpResponseHeaders(raw_headers);
124   response->was_fetched_via_spdy = true;
125   return true;
126 }
127 
128 // Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from
129 // a HttpRequestInfo block.
CreateFlipHeadersFromHttpRequest(const HttpRequestInfo & info,flip::FlipHeaderBlock * headers)130 void CreateFlipHeadersFromHttpRequest(
131     const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) {
132   static const char kHttpProtocolVersion[] = "HTTP/1.1";
133 
134   HttpUtil::HeadersIterator it(info.extra_headers.begin(),
135                                info.extra_headers.end(),
136                                "\r\n");
137   while (it.GetNext()) {
138     std::string name = StringToLowerASCII(it.name());
139     if (headers->find(name) == headers->end()) {
140       (*headers)[name] = it.values();
141     } else {
142       std::string new_value = (*headers)[name];
143       new_value += "\0";
144       new_value += it.values();
145       (*headers)[name] = new_value;
146     }
147   }
148 
149   // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc)
150   // TODO(mbelshe): Add authentication headers here.
151 
152   (*headers)["method"] = info.method;
153   (*headers)["url"] = info.url.spec();
154   (*headers)["version"] = kHttpProtocolVersion;
155   if (info.user_agent.length())
156     (*headers)["user-agent"] = info.user_agent;
157   if (!info.referrer.is_empty())
158     (*headers)["referer"] = info.referrer.spec();
159 
160   // Honor load flags that impact proxy caches.
161   if (info.load_flags & LOAD_BYPASS_CACHE) {
162     (*headers)["pragma"] = "no-cache";
163     (*headers)["cache-control"] = "no-cache";
164   } else if (info.load_flags & LOAD_VALIDATE_CACHE) {
165     (*headers)["cache-control"] = "max-age=0";
166   }
167 }
168 
AdjustSocketBufferSizes(ClientSocket * socket)169 void AdjustSocketBufferSizes(ClientSocket* socket) {
170   // Adjust socket buffer sizes.
171   // FLIP uses one socket, and we want a really big buffer.
172   // This greatly helps on links with packet loss - we can even
173   // outperform Vista's dynamic window sizing algorithm.
174   // TODO(mbelshe): more study.
175   const int kSocketBufferSize = 512 * 1024;
176   socket->SetReceiveBufferSize(kSocketBufferSize);
177   socket->SetSendBufferSize(kSocketBufferSize);
178 }
179 
180 }  // namespace
181 
182 // static
183 bool FlipSession::use_ssl_ = true;
184 
FlipSession(const std::string & host,HttpNetworkSession * session)185 FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session)
186     : ALLOW_THIS_IN_INITIALIZER_LIST(
187           connect_callback_(this, &FlipSession::OnTCPConnect)),
188       ALLOW_THIS_IN_INITIALIZER_LIST(
189           ssl_connect_callback_(this, &FlipSession::OnSSLConnect)),
190       ALLOW_THIS_IN_INITIALIZER_LIST(
191           read_callback_(this, &FlipSession::OnReadComplete)),
192       ALLOW_THIS_IN_INITIALIZER_LIST(
193           write_callback_(this, &FlipSession::OnWriteComplete)),
194       domain_(host),
195       session_(session),
196       connection_(new ClientSocketHandle),
197       read_buffer_(new IOBuffer(kReadBufferSize)),
198       read_pending_(false),
199       stream_hi_water_mark_(1),  // Always start at 1 for the first stream id.
200       write_pending_(false),
201       delayed_write_pending_(false),
202       is_secure_(false),
203       error_(OK),
204       state_(IDLE),
205       streams_initiated_count_(0),
206       streams_pushed_count_(0),
207       streams_pushed_and_claimed_count_(0),
208       streams_abandoned_count_(0) {
209   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
210 
211   flip_framer_.set_visitor(this);
212 
213   session_->ssl_config_service()->GetSSLConfig(&ssl_config_);
214 
215   // TODO(agl): This is a temporary hack for testing reasons. In the medium
216   // term we'll want to use NPN for all HTTPS connections and use the protocol
217   // suggested.
218   //
219   // In the event that the server supports Next Protocol Negotiation, but
220   // doesn't support either of these protocols, we'll request the first
221   // protocol in the list. Because of that, HTTP is listed first because it's
222   // what we'll actually fallback to in the case that the server doesn't
223   // support SPDY.
224   ssl_config_.next_protos = "\007http1.1\004spdy";
225 }
226 
~FlipSession()227 FlipSession::~FlipSession() {
228   // Cleanup all the streams.
229   CloseAllStreams(net::ERR_ABORTED);
230 
231   if (connection_->is_initialized()) {
232     // With Flip we can't recycle sockets.
233     connection_->socket()->Disconnect();
234   }
235 
236   // TODO(willchan): Don't hardcode port 80 here.
237   DCHECK(!session_->flip_session_pool()->HasSession(
238       HostResolver::RequestInfo(domain_, 80)));
239 
240   // Record per-session histograms here.
241   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
242       streams_initiated_count_,
243       0, 300, 50);
244   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
245       streams_pushed_count_,
246       0, 300, 50);
247   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
248       streams_pushed_and_claimed_count_,
249       0, 300, 50);
250   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
251       streams_abandoned_count_,
252       0, 300, 50);
253 }
254 
InitializeWithSocket(ClientSocketHandle * connection)255 void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) {
256   static StatsCounter flip_sessions("flip.sessions");
257   flip_sessions.Increment();
258 
259   AdjustSocketBufferSizes(connection->socket());
260 
261   state_ = CONNECTED;
262   connection_.reset(connection);
263 
264   // This is a newly initialized session that no client should have a handle to
265   // yet, so there's no need to start writing data as in OnTCPConnect(), but we
266   // should start reading data.
267   ReadSocket();
268 }
269 
Connect(const std::string & group_name,const HostResolver::RequestInfo & host,RequestPriority priority,LoadLog * load_log)270 net::Error FlipSession::Connect(const std::string& group_name,
271                                 const HostResolver::RequestInfo& host,
272                                 RequestPriority priority,
273                                 LoadLog* load_log) {
274   DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST);
275 
276   // If the connect process is started, let the caller continue.
277   if (state_ > IDLE)
278     return net::OK;
279 
280   state_ = CONNECTING;
281 
282   static StatsCounter flip_sessions("flip.sessions");
283   flip_sessions.Increment();
284 
285   int rv = connection_->Init(group_name, host, priority, &connect_callback_,
286                             session_->tcp_socket_pool(), load_log);
287   DCHECK(rv <= 0);
288 
289   // If the connect is pending, we still return ok.  The APIs enqueue
290   // work until after the connect completes asynchronously later.
291   if (rv == net::ERR_IO_PENDING)
292     return net::OK;
293   return static_cast<net::Error>(rv);
294 }
295 
GetOrCreateStream(const HttpRequestInfo & request,const UploadDataStream * upload_data,LoadLog * log)296 scoped_refptr<FlipStream> FlipSession::GetOrCreateStream(
297     const HttpRequestInfo& request,
298     const UploadDataStream* upload_data,
299     LoadLog* log) {
300   const GURL& url = request.url;
301   const std::string& path = url.PathForRequest();
302 
303   scoped_refptr<FlipStream> stream;
304 
305   // Check if we have a push stream for this path.
306   if (request.method == "GET") {
307     stream = GetPushStream(path);
308     if (stream) {
309       DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_);
310       streams_pushed_and_claimed_count_++;
311       return stream;
312     }
313   }
314 
315   // Check if we have a pending push stream for this url.
316   PendingStreamMap::iterator it;
317   it = pending_streams_.find(path);
318   if (it != pending_streams_.end()) {
319     DCHECK(!it->second);
320     // Server will assign a stream id when the push stream arrives.  Use 0 for
321     // now.
322     LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM);
323     FlipStream* stream = new FlipStream(this, 0, true, log);
324     stream->set_path(path);
325     it->second = stream;
326     return it->second;
327   }
328 
329   const flip::FlipStreamId stream_id = GetNewStreamId();
330 
331   // If we still don't have a stream, activate one now.
332   stream = new FlipStream(this, stream_id, false, log);
333   stream->set_priority(request.priority);
334   stream->set_path(path);
335   ActivateStream(stream);
336 
337   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
338       static_cast<int>(request.priority), 0, 10, 11);
339 
340   LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url;
341 
342   // TODO(mbelshe): Optimize memory allocations
343   DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST &&
344          request.priority <= FLIP_PRIORITY_LOWEST);
345 
346   // Convert from HttpRequestHeaders to Flip Headers.
347   flip::FlipHeaderBlock headers;
348   CreateFlipHeadersFromHttpRequest(request, &headers);
349 
350   flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE;
351   if (!request.upload_data || !upload_data->size())
352     flags = flip::CONTROL_FLAG_FIN;
353 
354   // Create a SYN_STREAM packet and add to the output queue.
355   scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame(
356       flip_framer_.CreateSynStream(stream_id, request.priority, flags, false,
357                                    &headers));
358   int length = flip::FlipFrame::size() + syn_frame->length();
359   IOBuffer* buffer = new IOBuffer(length);
360   memcpy(buffer->data(), syn_frame->data(), length);
361   queue_.push(FlipIOBuffer(buffer, length, request.priority, stream));
362 
363   static StatsCounter flip_requests("flip.requests");
364   flip_requests.Increment();
365 
366   LOG(INFO) << "FETCHING: " << request.url.spec();
367   streams_initiated_count_++;
368 
369   LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------";
370   DumpFlipHeaders(headers);
371 
372   // Schedule to write to the socket after we've made it back
373   // to the message loop so that we can aggregate multiple
374   // requests.
375   // TODO(mbelshe): Should we do the "first" request immediately?
376   //                maybe we should only 'do later' for subsequent
377   //                requests.
378   WriteSocketLater();
379 
380   return stream;
381 }
382 
WriteStreamData(flip::FlipStreamId stream_id,net::IOBuffer * data,int len)383 int FlipSession::WriteStreamData(flip::FlipStreamId stream_id,
384                                  net::IOBuffer* data, int len) {
385   LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len
386             << " bytes)";
387   const int kMss = 1430;  // This is somewhat arbitrary and not really fixed,
388                           // but it will always work reasonably with ethernet.
389   // Chop the world into 2-packet chunks.  This is somewhat arbitrary, but
390   // is reasonably small and ensures that we elicit ACKs quickly from TCP
391   // (because TCP tries to only ACK every other packet).
392   const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size();
393 
394   // Find our stream
395   DCHECK(IsStreamActive(stream_id));
396   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
397   CHECK(stream->stream_id() == stream_id);
398   if (!stream)
399     return ERR_INVALID_FLIP_STREAM;
400 
401   // TODO(mbelshe):  Setting of the FIN is assuming that the caller will pass
402   //                 all data to write in a single chunk.  Is this always true?
403 
404   // Set the flags on the upload.
405   flip::FlipDataFlags flags = flip::DATA_FLAG_FIN;
406   if (len > kMaxFlipFrameChunkSize) {
407     len = kMaxFlipFrameChunkSize;
408     flags = flip::DATA_FLAG_NONE;
409   }
410 
411   // TODO(mbelshe): reduce memory copies here.
412   scoped_ptr<flip::FlipDataFrame> frame(
413       flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags));
414   int length = flip::FlipFrame::size() + frame->length();
415   IOBufferWithSize* buffer = new IOBufferWithSize(length);
416   memcpy(buffer->data(), frame->data(), length);
417   queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream));
418 
419   // Whenever we queue onto the socket we need to ensure that we will write to
420   // it later.
421   WriteSocketLater();
422 
423   return ERR_IO_PENDING;
424 }
425 
CancelStream(flip::FlipStreamId stream_id)426 bool FlipSession::CancelStream(flip::FlipStreamId stream_id) {
427   LOG(INFO) << "Cancelling stream " << stream_id;
428   if (!IsStreamActive(stream_id))
429     return false;
430 
431   // TODO(mbelshe): We should send a FIN_STREAM control frame here
432   //                so that the server can cancel a large send.
433 
434   // TODO(mbelshe): Write a method for tearing down a stream
435   //                that cleans it out of the active list, the pending list,
436   //                etc.
437   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
438   DeactivateStream(stream_id);
439   return true;
440 }
441 
IsStreamActive(flip::FlipStreamId stream_id) const442 bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const {
443   return ContainsKey(active_streams_, stream_id);
444 }
445 
GetLoadState() const446 LoadState FlipSession::GetLoadState() const {
447   // NOTE: The application only queries the LoadState via the
448   //       FlipNetworkTransaction, and details are only needed when
449   //       we're in the process of connecting.
450 
451   // If we're connecting, defer to the connection to give us the actual
452   // LoadState.
453   if (state_ == CONNECTING)
454     return connection_->GetLoadState();
455 
456   // Just report that we're idle since the session could be doing
457   // many things concurrently.
458   return LOAD_STATE_IDLE;
459 }
460 
OnTCPConnect(int result)461 void FlipSession::OnTCPConnect(int result) {
462   LOG(INFO) << "Flip socket connected (result=" << result << ")";
463 
464   // We shouldn't be coming through this path if we didn't just open a fresh
465   // socket (or have an error trying to do so).
466   DCHECK(!connection_->socket() || !connection_->is_reused());
467 
468   UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0);
469 
470   if (result != net::OK) {
471     DCHECK_LT(result, 0);
472     CloseSessionOnError(static_cast<net::Error>(result));
473     return;
474   }
475 
476   AdjustSocketBufferSizes(connection_->socket());
477 
478   if (use_ssl_) {
479     // Add a SSL socket on top of our existing transport socket.
480     ClientSocket* socket = connection_->release_socket();
481     // TODO(mbelshe): Fix the hostname.  This is BROKEN without having
482     //                a real hostname.
483     socket = session_->socket_factory()->CreateSSLClientSocket(
484         socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_);
485     connection_->set_socket(socket);
486     is_secure_ = true;
487     // TODO(willchan): Plumb LoadLog into FLIP code.
488     int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL);
489     if (status != ERR_IO_PENDING)
490       OnSSLConnect(status);
491   } else {
492     DCHECK_EQ(state_, CONNECTING);
493     state_ = CONNECTED;
494 
495     // Make sure we get any pending data sent.
496     WriteSocketLater();
497     // Start reading
498     ReadSocket();
499   }
500 }
501 
OnSSLConnect(int result)502 void FlipSession::OnSSLConnect(int result) {
503   // TODO(mbelshe): We need to replicate the functionality of
504   //   HttpNetworkTransaction::DoSSLConnectComplete here, where it calls
505   //   HandleCertificateError() and such.
506   if (IsCertificateError(result))
507     result = OK;   // TODO(mbelshe): pretend we're happy anyway.
508 
509   if (result == OK) {
510     DCHECK_EQ(state_, CONNECTING);
511     state_ = CONNECTED;
512 
513     // After we've connected, send any data to the server, and then issue
514     // our read.
515     WriteSocketLater();
516     ReadSocket();
517   } else {
518     DCHECK_LT(result, 0);  // It should be an error, not a byte count.
519     CloseSessionOnError(static_cast<net::Error>(result));
520   }
521 }
522 
OnReadComplete(int bytes_read)523 void FlipSession::OnReadComplete(int bytes_read) {
524   // Parse a frame.  For now this code requires that the frame fit into our
525   // buffer (32KB).
526   // TODO(mbelshe): support arbitrarily large frames!
527 
528   LOG(INFO) << "Flip socket read: " << bytes_read << " bytes";
529 
530   read_pending_ = false;
531 
532   if (bytes_read <= 0) {
533     // Session is tearing down.
534     net::Error error = static_cast<net::Error>(bytes_read);
535     if (error == OK)
536       error = ERR_CONNECTION_CLOSED;
537     CloseSessionOnError(error);
538     return;
539   }
540 
541   // The FlipFramer will use callbacks onto |this| as it parses frames.
542   // When errors occur, those callbacks can lead to teardown of all references
543   // to |this|, so maintain a reference to self during this call for safe
544   // cleanup.
545   scoped_refptr<FlipSession> self(this);
546 
547   char *data = read_buffer_->data();
548   while (bytes_read &&
549          flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) {
550     uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read);
551     bytes_read -= bytes_processed;
552     data += bytes_processed;
553     if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE)
554       flip_framer_.Reset();
555   }
556 
557   if (state_ != CLOSED)
558     ReadSocket();
559 }
560 
OnWriteComplete(int result)561 void FlipSession::OnWriteComplete(int result) {
562   DCHECK(write_pending_);
563   DCHECK(in_flight_write_.size());
564   DCHECK(result != 0);  // This shouldn't happen for write.
565 
566   write_pending_ = false;
567 
568   LOG(INFO) << "Flip write complete (result=" << result << ") for stream: "
569             << in_flight_write_.stream()->stream_id();
570 
571   if (result >= 0) {
572     // It should not be possible to have written more bytes than our
573     // in_flight_write_.
574     DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
575 
576     in_flight_write_.buffer()->DidConsume(result);
577 
578     // We only notify the stream when we've fully written the pending frame.
579     if (!in_flight_write_.buffer()->BytesRemaining()) {
580       scoped_refptr<FlipStream> stream = in_flight_write_.stream();
581       DCHECK(stream.get());
582 
583       // Report the number of bytes written to the caller, but exclude the
584       // frame size overhead.  NOTE:  if this frame was compressed the reported
585       // bytes written is the compressed size, not the original size.
586       if (result > 0) {
587         result = in_flight_write_.buffer()->size();
588         DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size()));
589         result -= static_cast<int>(flip::FlipFrame::size());
590       }
591 
592       // It is possible that the stream was cancelled while we were writing
593       // to the socket.
594       if (!stream->cancelled())
595         stream->OnWriteComplete(result);
596 
597       // Cleanup the write which just completed.
598       in_flight_write_.release();
599     }
600 
601     // Write more data.  We're already in a continuation, so we can
602     // go ahead and write it immediately (without going back to the
603     // message loop).
604     WriteSocketLater();
605   } else {
606     in_flight_write_.release();
607 
608     // The stream is now errored.  Close it down.
609     CloseSessionOnError(static_cast<net::Error>(result));
610   }
611 }
612 
ReadSocket()613 void FlipSession::ReadSocket() {
614   if (read_pending_)
615     return;
616 
617   if (state_ == CLOSED) {
618     NOTREACHED();
619     return;
620   }
621 
622   CHECK(connection_.get());
623   CHECK(connection_->socket());
624   int bytes_read = connection_->socket()->Read(read_buffer_.get(),
625                                                kReadBufferSize,
626                                                &read_callback_);
627   switch (bytes_read) {
628     case 0:
629       // Socket is closed!
630       // TODO(mbelshe): Need to abort any active streams here.
631       DCHECK(!active_streams_.size());
632       return;
633     case net::ERR_IO_PENDING:
634       // Waiting for data.  Nothing to do now.
635       read_pending_ = true;
636       return;
637     default:
638       // Data was read, process it.
639       // Schedule the work through the message loop to avoid recursive
640       // callbacks.
641       read_pending_ = true;
642       MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
643           this, &FlipSession::OnReadComplete, bytes_read));
644       break;
645   }
646 }
647 
WriteSocketLater()648 void FlipSession::WriteSocketLater() {
649   if (delayed_write_pending_)
650     return;
651 
652   delayed_write_pending_ = true;
653   MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
654       this, &FlipSession::WriteSocket));
655 }
656 
WriteSocket()657 void FlipSession::WriteSocket() {
658   // This function should only be called via WriteSocketLater.
659   DCHECK(delayed_write_pending_);
660   delayed_write_pending_ = false;
661 
662   // If the socket isn't connected yet, just wait; we'll get called
663   // again when the socket connection completes.  If the socket is
664   // closed, just return.
665   if (state_ < CONNECTED || state_ == CLOSED)
666     return;
667 
668   if (write_pending_)   // Another write is in progress still.
669     return;
670 
671   // Loop sending frames until we've sent everything or until the write
672   // returns error (or ERR_IO_PENDING).
673   while (in_flight_write_.buffer() || queue_.size()) {
674     if (!in_flight_write_.buffer()) {
675       // Grab the next FlipFrame to send.
676       FlipIOBuffer next_buffer = queue_.top();
677       queue_.pop();
678 
679       // We've deferred compression until just before we write it to the socket,
680       // which is now.  At this time, we don't compress our data frames.
681       flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false);
682       size_t size;
683       if (uncompressed_frame.is_control_frame()) {
684         scoped_ptr<flip::FlipFrame> compressed_frame(
685             flip_framer_.CompressFrame(&uncompressed_frame));
686         size = compressed_frame->length() + flip::FlipFrame::size();
687 
688         DCHECK(size > 0);
689 
690         // TODO(mbelshe): We have too much copying of data here.
691         IOBufferWithSize* buffer = new IOBufferWithSize(size);
692         memcpy(buffer->data(), compressed_frame->data(), size);
693 
694         // Attempt to send the frame.
695         in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream());
696       } else {
697         size = uncompressed_frame.length() + flip::FlipFrame::size();
698         in_flight_write_ = next_buffer;
699       }
700     } else {
701       DCHECK(in_flight_write_.buffer()->BytesRemaining());
702     }
703 
704     write_pending_ = true;
705     int rv = connection_->socket()->Write(in_flight_write_.buffer(),
706         in_flight_write_.buffer()->BytesRemaining(), &write_callback_);
707     if (rv == net::ERR_IO_PENDING)
708       break;
709 
710     // We sent the frame successfully.
711     OnWriteComplete(rv);
712 
713     // TODO(mbelshe):  Test this error case.  Maybe we should mark the socket
714     //                 as in an error state.
715     if (rv < 0)
716       break;
717   }
718 }
719 
CloseAllStreams(net::Error code)720 void FlipSession::CloseAllStreams(net::Error code) {
721   LOG(INFO) << "Closing all FLIP Streams";
722 
723   static StatsCounter abandoned_streams("flip.abandoned_streams");
724   static StatsCounter abandoned_push_streams("flip.abandoned_push_streams");
725 
726   if (active_streams_.size()) {
727     abandoned_streams.Add(active_streams_.size());
728 
729     // Create a copy of the list, since aborting streams can invalidate
730     // our list.
731     FlipStream** list = new FlipStream*[active_streams_.size()];
732     ActiveStreamMap::const_iterator it;
733     int index = 0;
734     for (it = active_streams_.begin(); it != active_streams_.end(); ++it)
735       list[index++] = it->second;
736 
737     // Issue the aborts.
738     for (--index; index >= 0; index--) {
739       LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id()
740                  << "): " << list[index]->path();
741       list[index]->OnClose(code);
742     }
743 
744     // Clear out anything pending.
745     active_streams_.clear();
746 
747     delete[] list;
748   }
749 
750   if (pushed_streams_.size()) {
751     streams_abandoned_count_ += pushed_streams_.size();
752     abandoned_push_streams.Add(pushed_streams_.size());
753     pushed_streams_.clear();
754   }
755 }
756 
GetNewStreamId()757 int FlipSession::GetNewStreamId() {
758   int id = stream_hi_water_mark_;
759   stream_hi_water_mark_ += 2;
760   if (stream_hi_water_mark_ > 0x7fff)
761     stream_hi_water_mark_ = 1;
762   return id;
763 }
764 
CloseSessionOnError(net::Error err)765 void FlipSession::CloseSessionOnError(net::Error err) {
766   DCHECK_LT(err, OK);
767   LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")";
768 
769   // Don't close twice.  This can occur because we can have both
770   // a read and a write outstanding, and each can complete with
771   // an error.
772   if (state_ != CLOSED) {
773     state_ = CLOSED;
774     error_ = err;
775     CloseAllStreams(err);
776     session_->flip_session_pool()->Remove(this);
777   }
778 }
779 
ActivateStream(FlipStream * stream)780 void FlipSession::ActivateStream(FlipStream* stream) {
781   const flip::FlipStreamId id = stream->stream_id();
782   DCHECK(!IsStreamActive(id));
783 
784   active_streams_[id] = stream;
785 }
786 
DeactivateStream(flip::FlipStreamId id)787 void FlipSession::DeactivateStream(flip::FlipStreamId id) {
788   DCHECK(IsStreamActive(id));
789 
790   // Verify it is not on the pushed_streams_ list.
791   ActiveStreamList::iterator it;
792   for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
793     scoped_refptr<FlipStream> curr = *it;
794     if (id == curr->stream_id()) {
795       pushed_streams_.erase(it);
796       break;
797     }
798   }
799 
800   active_streams_.erase(id);
801 }
802 
GetPushStream(const std::string & path)803 scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) {
804   static StatsCounter used_push_streams("flip.claimed_push_streams");
805 
806   LOG(INFO) << "Looking for push stream: " << path;
807 
808   scoped_refptr<FlipStream> stream;
809 
810   // We just walk a linear list here.
811   ActiveStreamList::iterator it;
812   for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) {
813     stream = *it;
814     if (path == stream->path()) {
815       CHECK(stream->pushed());
816       pushed_streams_.erase(it);
817       used_push_streams.Increment();
818       LOG(INFO) << "Push Stream Claim for: " << path;
819       break;
820     }
821   }
822 
823   return stream;
824 }
825 
GetSSLInfo(SSLInfo * ssl_info)826 void FlipSession::GetSSLInfo(SSLInfo* ssl_info) {
827   if (is_secure_) {
828     SSLClientSocket* ssl_socket =
829         reinterpret_cast<SSLClientSocket*>(connection_->socket());
830     ssl_socket->GetSSLInfo(ssl_info);
831   }
832 }
833 
OnError(flip::FlipFramer * framer)834 void FlipSession::OnError(flip::FlipFramer* framer) {
835   LOG(ERROR) << "FlipSession error: " << framer->error_code();
836   CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR);
837 }
838 
OnStreamFrameData(flip::FlipStreamId stream_id,const char * data,size_t len)839 void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id,
840                                     const char* data,
841                                     size_t len) {
842   LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes";
843   bool valid_stream = IsStreamActive(stream_id);
844   if (!valid_stream) {
845     // NOTE:  it may just be that the stream was cancelled.
846     LOG(WARNING) << "Received data frame for invalid stream " << stream_id;
847     return;
848   }
849 
850   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
851   bool success = stream->OnDataReceived(data, len);
852   // |len| == 0 implies a closed stream.
853   if (!success || !len)
854     DeactivateStream(stream_id);
855 }
856 
OnSyn(const flip::FlipSynStreamControlFrame * frame,const flip::FlipHeaderBlock * headers)857 void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame,
858                         const flip::FlipHeaderBlock* headers) {
859   flip::FlipStreamId stream_id = frame->stream_id();
860 
861   // Server-initiated streams should have even sequence numbers.
862   if ((stream_id & 0x1) != 0) {
863     LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id;
864     return;
865   }
866 
867   if (IsStreamActive(stream_id)) {
868     LOG(ERROR) << "Received OnSyn for active stream " << stream_id;
869     return;
870   }
871 
872   streams_pushed_count_++;
873 
874   LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id;
875 
876   LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------";
877   DumpFlipHeaders(*headers);
878 
879   // TODO(mbelshe): DCHECK that this is a GET method?
880 
881   const std::string& path = ContainsKey(*headers, "path") ?
882       headers->find("path")->second : "";
883 
884   // Verify that the response had a URL for us.
885   DCHECK(!path.empty());
886   if (path.empty()) {
887     LOG(WARNING) << "Pushed stream did not contain a path.";
888     return;
889   }
890 
891   scoped_refptr<FlipStream> stream;
892 
893   // Check if we already have a delegate awaiting this stream.
894   PendingStreamMap::iterator it;
895   it = pending_streams_.find(path);
896   if (it != pending_streams_.end()) {
897     stream = it->second;
898     pending_streams_.erase(it);
899     if (stream)
900       pushed_streams_.push_back(stream);
901   } else {
902     pushed_streams_.push_back(stream);
903   }
904 
905   if (stream) {
906     CHECK(stream->pushed());
907     CHECK(stream->stream_id() == 0);
908     stream->set_stream_id(stream_id);
909   } else {
910     // TODO(mbelshe): can we figure out how to use a LoadLog here?
911     stream = new FlipStream(this, stream_id, true, NULL);
912   }
913 
914   // Activate a stream and parse the headers.
915   ActivateStream(stream);
916 
917   stream->set_path(path);
918 
919   // TODO(mbelshe): For now we convert from our nice hash map back
920   // to a string of headers; this is because the HttpResponseInfo
921   // is a bit rigid for its http (non-flip) design.
922   HttpResponseInfo response;
923   if (FlipHeadersToHttpResponse(*headers, &response)) {
924     GetSSLInfo(&response.ssl_info);
925     stream->OnResponseReceived(response);
926   } else {
927     stream->OnClose(ERR_INVALID_RESPONSE);
928     DeactivateStream(stream_id);
929     return;
930   }
931 
932   LOG(INFO) << "Got pushed stream for " << stream->path();
933 
934   static StatsCounter push_requests("flip.pushed_streams");
935   push_requests.Increment();
936 }
937 
OnSynReply(const flip::FlipSynReplyControlFrame * frame,const flip::FlipHeaderBlock * headers)938 void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame,
939                              const flip::FlipHeaderBlock* headers) {
940   DCHECK(headers);
941   flip::FlipStreamId stream_id = frame->stream_id();
942   bool valid_stream = IsStreamActive(stream_id);
943   if (!valid_stream) {
944     // NOTE:  it may just be that the stream was cancelled.
945     LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id;
946     return;
947   }
948 
949   LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id;
950   DumpFlipHeaders(*headers);
951 
952   // We record content declared as being pushed so that we don't
953   // request a duplicate stream which is already scheduled to be
954   // sent to us.
955   flip::FlipHeaderBlock::const_iterator it;
956   it = headers->find("X-Associated-Content");
957   if (it != headers->end()) {
958     const std::string& content = it->second;
959     std::string::size_type start = 0;
960     std::string::size_type end = 0;
961     do {
962       end = content.find("||", start);
963       if (end == std::string::npos)
964         end = content.length();
965       std::string url = content.substr(start, end - start);
966       std::string::size_type pos = url.find("??");
967       if (pos == std::string::npos)
968         break;
969       url = url.substr(pos + 2);
970       GURL gurl(url);
971       std::string path = gurl.PathForRequest();
972       if (path.length())
973         pending_streams_[path] = NULL;
974       else
975         LOG(INFO) << "Invalid X-Associated-Content path: " << url;
976       start = end + 2;
977     } while (start < content.length());
978   }
979 
980   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
981   CHECK(stream->stream_id() == stream_id);
982   CHECK(!stream->cancelled());
983   HttpResponseInfo response;
984   if (FlipHeadersToHttpResponse(*headers, &response)) {
985     GetSSLInfo(&response.ssl_info);
986     stream->OnResponseReceived(response);
987   } else {
988     stream->OnClose(ERR_INVALID_RESPONSE);
989     DeactivateStream(stream_id);
990   }
991 }
992 
OnControl(const flip::FlipControlFrame * frame)993 void FlipSession::OnControl(const flip::FlipControlFrame* frame) {
994   flip::FlipHeaderBlock headers;
995   uint32 type = frame->type();
996   if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) {
997     if (!flip_framer_.ParseHeaderBlock(frame, &headers)) {
998       LOG(WARNING) << "Could not parse Flip Control Frame Header";
999       // TODO(mbelshe):  Error the session?
1000       return;
1001     }
1002   }
1003 
1004   switch (type) {
1005     case flip::SYN_STREAM:
1006       LOG(INFO) << "Flip SynStream for stream " << frame->stream_id();
1007       OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame),
1008             &headers);
1009       break;
1010     case flip::SYN_REPLY:
1011       LOG(INFO) << "Flip SynReply for stream " << frame->stream_id();
1012       OnSynReply(
1013           reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame),
1014           &headers);
1015       break;
1016     case flip::FIN_STREAM:
1017       LOG(INFO) << "Flip Fin for stream " << frame->stream_id();
1018       OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame));
1019       break;
1020     default:
1021       DCHECK(false);  // Error!
1022   }
1023 }
1024 
OnFin(const flip::FlipFinStreamControlFrame * frame)1025 void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) {
1026   flip::FlipStreamId stream_id = frame->stream_id();
1027   bool valid_stream = IsStreamActive(stream_id);
1028   if (!valid_stream) {
1029     // NOTE:  it may just be that the stream was cancelled.
1030     LOG(WARNING) << "Received FIN for invalid stream" << stream_id;
1031     return;
1032   }
1033   scoped_refptr<FlipStream> stream = active_streams_[stream_id];
1034   CHECK(stream->stream_id() == stream_id);
1035   CHECK(!stream->cancelled());
1036   if (frame->status() == 0) {
1037     stream->OnDataReceived(NULL, 0);
1038   } else {
1039     LOG(ERROR) << "Flip stream closed: " << frame->status();
1040     // TODO(mbelshe): Map from Flip-protocol errors to something sensical.
1041     //                For now, it doesn't matter much - it is a protocol error.
1042     stream->OnClose(ERR_FAILED);
1043   }
1044 
1045   DeactivateStream(stream_id);
1046 }
1047 
1048 }  // namespace net
1049