• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_job.h"
6 
7 #include <algorithm>
8 
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
25 
26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27 
28 namespace {
29 
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32   "cookie", "cookie2"
33 };
34 const char* const kSetCookieHeaders[] = {
35   "set-cookie", "set-cookie2"
36 };
37 
WebSocketJobFactory(const GURL & url,net::SocketStream::Delegate * delegate,net::URLRequestContext * context,net::CookieStore * cookie_store)38 net::SocketStreamJob* WebSocketJobFactory(
39     const GURL& url, net::SocketStream::Delegate* delegate,
40     net::URLRequestContext* context, net::CookieStore* cookie_store) {
41   net::WebSocketJob* job = new net::WebSocketJob(delegate);
42   job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
43   return job;
44 }
45 
46 class WebSocketJobInitSingleton {
47  private:
48   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton()49   WebSocketJobInitSingleton() {
50     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
52   }
53 };
54 
55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56     LAZY_INSTANCE_INITIALIZER;
57 
58 }  // anonymous namespace
59 
60 namespace net {
61 
62 // static
EnsureInit()63 void WebSocketJob::EnsureInit() {
64   g_websocket_job_init.Get();
65 }
66 
WebSocketJob(SocketStream::Delegate * delegate)67 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
68     : delegate_(delegate),
69       state_(INITIALIZED),
70       waiting_(false),
71       handshake_request_(new WebSocketHandshakeRequestHandler),
72       handshake_response_(new WebSocketHandshakeResponseHandler),
73       started_to_send_handshake_request_(false),
74       handshake_request_sent_(0),
75       response_cookies_save_index_(0),
76       spdy_protocol_version_(0),
77       save_next_cookie_running_(false),
78       callback_pending_(false),
79       weak_ptr_factory_(this),
80       weak_ptr_factory_for_send_pending_(this) {
81 }
82 
~WebSocketJob()83 WebSocketJob::~WebSocketJob() {
84   DCHECK_EQ(CLOSED, state_);
85   DCHECK(!delegate_);
86   DCHECK(!socket_.get());
87 }
88 
Connect()89 void WebSocketJob::Connect() {
90   DCHECK(socket_.get());
91   DCHECK_EQ(state_, INITIALIZED);
92   state_ = CONNECTING;
93   socket_->Connect();
94 }
95 
SendData(const char * data,int len)96 bool WebSocketJob::SendData(const char* data, int len) {
97   switch (state_) {
98     case INITIALIZED:
99       return false;
100 
101     case CONNECTING:
102       return SendHandshakeRequest(data, len);
103 
104     case OPEN:
105       {
106         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
107         memcpy(buffer->data(), data, len);
108         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
109           send_buffer_queue_.push_back(buffer);
110           return true;
111         }
112         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
113         return SendDataInternal(current_send_buffer_->data(),
114                                 current_send_buffer_->BytesRemaining());
115       }
116 
117     case CLOSING:
118     case CLOSED:
119       return false;
120   }
121   return false;
122 }
123 
Close()124 void WebSocketJob::Close() {
125   if (state_ == CLOSED)
126     return;
127 
128   state_ = CLOSING;
129   if (current_send_buffer_.get()) {
130     // Will close in SendPending.
131     return;
132   }
133   state_ = CLOSED;
134   CloseInternal();
135 }
136 
RestartWithAuth(const AuthCredentials & credentials)137 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
138   state_ = CONNECTING;
139   socket_->RestartWithAuth(credentials);
140 }
141 
DetachDelegate()142 void WebSocketJob::DetachDelegate() {
143   state_ = CLOSED;
144   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
145 
146   scoped_refptr<WebSocketJob> protect(this);
147   weak_ptr_factory_.InvalidateWeakPtrs();
148   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
149 
150   delegate_ = NULL;
151   if (socket_.get())
152     socket_->DetachDelegate();
153   socket_ = NULL;
154   if (!callback_.is_null()) {
155     waiting_ = false;
156     callback_.Reset();
157     Release();  // Balanced with OnStartOpenConnection().
158   }
159 }
160 
OnStartOpenConnection(SocketStream * socket,const CompletionCallback & callback)161 int WebSocketJob::OnStartOpenConnection(
162     SocketStream* socket, const CompletionCallback& callback) {
163   DCHECK(callback_.is_null());
164   state_ = CONNECTING;
165 
166   addresses_ = socket->address_list();
167   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
168     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
169   }
170 
171   if (delegate_) {
172     int result = delegate_->OnStartOpenConnection(socket, callback);
173     DCHECK_EQ(OK, result);
174   }
175   if (waiting_) {
176     // PutInQueue() may set |waiting_| true for throttling. In this case,
177     // Wakeup() will be called later.
178     callback_ = callback;
179     AddRef();  // Balanced when callback_ is cleared.
180     return ERR_IO_PENDING;
181   }
182   return TrySpdyStream();
183 }
184 
OnConnected(SocketStream * socket,int max_pending_send_allowed)185 void WebSocketJob::OnConnected(
186     SocketStream* socket, int max_pending_send_allowed) {
187   if (state_ == CLOSED)
188     return;
189   DCHECK_EQ(CONNECTING, state_);
190   if (delegate_)
191     delegate_->OnConnected(socket, max_pending_send_allowed);
192 }
193 
OnSentData(SocketStream * socket,int amount_sent)194 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
195   DCHECK_NE(INITIALIZED, state_);
196   DCHECK_GT(amount_sent, 0);
197   if (state_ == CLOSED)
198     return;
199   if (state_ == CONNECTING) {
200     OnSentHandshakeRequest(socket, amount_sent);
201     return;
202   }
203   if (delegate_) {
204     DCHECK(state_ == OPEN || state_ == CLOSING);
205     if (!current_send_buffer_.get()) {
206       VLOG(1)
207           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
208       return;
209     }
210     current_send_buffer_->DidConsume(amount_sent);
211     if (current_send_buffer_->BytesRemaining() > 0)
212       return;
213 
214     // We need to report amount_sent of original buffer size, instead of
215     // amount sent to |socket|.
216     amount_sent = current_send_buffer_->size();
217     DCHECK_GT(amount_sent, 0);
218     current_send_buffer_ = NULL;
219     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
220       base::MessageLoopForIO::current()->PostTask(
221           FROM_HERE,
222           base::Bind(&WebSocketJob::SendPending,
223                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
224     }
225     delegate_->OnSentData(socket, amount_sent);
226   }
227 }
228 
OnReceivedData(SocketStream * socket,const char * data,int len)229 void WebSocketJob::OnReceivedData(
230     SocketStream* socket, const char* data, int len) {
231   DCHECK_NE(INITIALIZED, state_);
232   if (state_ == CLOSED)
233     return;
234   if (state_ == CONNECTING) {
235     OnReceivedHandshakeResponse(socket, data, len);
236     return;
237   }
238   DCHECK(state_ == OPEN || state_ == CLOSING);
239   if (delegate_ && len > 0)
240     delegate_->OnReceivedData(socket, data, len);
241 }
242 
OnClose(SocketStream * socket)243 void WebSocketJob::OnClose(SocketStream* socket) {
244   state_ = CLOSED;
245   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
246 
247   scoped_refptr<WebSocketJob> protect(this);
248   weak_ptr_factory_.InvalidateWeakPtrs();
249 
250   SocketStream::Delegate* delegate = delegate_;
251   delegate_ = NULL;
252   socket_ = NULL;
253   if (!callback_.is_null()) {
254     waiting_ = false;
255     callback_.Reset();
256     Release();  // Balanced with OnStartOpenConnection().
257   }
258   if (delegate)
259     delegate->OnClose(socket);
260 }
261 
OnAuthRequired(SocketStream * socket,AuthChallengeInfo * auth_info)262 void WebSocketJob::OnAuthRequired(
263     SocketStream* socket, AuthChallengeInfo* auth_info) {
264   if (delegate_)
265     delegate_->OnAuthRequired(socket, auth_info);
266 }
267 
OnSSLCertificateError(SocketStream * socket,const SSLInfo & ssl_info,bool fatal)268 void WebSocketJob::OnSSLCertificateError(
269     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
270   if (delegate_)
271     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
272 }
273 
OnError(const SocketStream * socket,int error)274 void WebSocketJob::OnError(const SocketStream* socket, int error) {
275   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
276     delegate_->OnError(socket, error);
277 }
278 
OnCreatedSpdyStream(int result)279 void WebSocketJob::OnCreatedSpdyStream(int result) {
280   DCHECK(spdy_websocket_stream_.get());
281   DCHECK(socket_.get());
282   DCHECK_NE(ERR_IO_PENDING, result);
283 
284   if (state_ == CLOSED) {
285     result = ERR_ABORTED;
286   } else if (result == OK) {
287     state_ = CONNECTING;
288     result = ERR_PROTOCOL_SWITCHED;
289   } else {
290     spdy_websocket_stream_.reset();
291   }
292 
293   CompleteIO(result);
294 }
295 
OnSentSpdyHeaders()296 void WebSocketJob::OnSentSpdyHeaders() {
297   DCHECK_NE(INITIALIZED, state_);
298   if (state_ != CONNECTING)
299     return;
300   size_t original_length = handshake_request_->original_length();
301   handshake_request_.reset();
302   if (delegate_)
303     delegate_->OnSentData(socket_.get(), original_length);
304 }
305 
OnSpdyResponseHeadersUpdated(const SpdyHeaderBlock & response_headers)306 void WebSocketJob::OnSpdyResponseHeadersUpdated(
307     const SpdyHeaderBlock& response_headers) {
308   DCHECK_NE(INITIALIZED, state_);
309   if (state_ != CONNECTING)
310     return;
311   // TODO(toyoshim): Fallback to non-spdy connection?
312   handshake_response_->ParseResponseHeaderBlock(response_headers,
313                                                 challenge_,
314                                                 spdy_protocol_version_);
315 
316   SaveCookiesAndNotifyHeadersComplete();
317 }
318 
OnSentSpdyData(size_t bytes_sent)319 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
320   DCHECK_NE(INITIALIZED, state_);
321   DCHECK_NE(CONNECTING, state_);
322   if (state_ == CLOSED)
323     return;
324   if (!spdy_websocket_stream_.get())
325     return;
326   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
327 }
328 
OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer)329 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
330   DCHECK_NE(INITIALIZED, state_);
331   DCHECK_NE(CONNECTING, state_);
332   if (state_ == CLOSED)
333     return;
334   if (!spdy_websocket_stream_.get())
335     return;
336   if (buffer) {
337     OnReceivedData(
338         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
339   } else {
340     OnReceivedData(socket_.get(), NULL, 0);
341   }
342 }
343 
OnCloseSpdyStream()344 void WebSocketJob::OnCloseSpdyStream() {
345   spdy_websocket_stream_.reset();
346   OnClose(socket_.get());
347 }
348 
SendHandshakeRequest(const char * data,int len)349 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
350   DCHECK_EQ(state_, CONNECTING);
351   if (started_to_send_handshake_request_)
352     return false;
353   if (!handshake_request_->ParseRequest(data, len))
354     return false;
355 
356   AddCookieHeaderAndSend();
357   return true;
358 }
359 
AddCookieHeaderAndSend()360 void WebSocketJob::AddCookieHeaderAndSend() {
361   bool allow = true;
362   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
363     allow = false;
364 
365   if (socket_.get() && delegate_ && state_ == CONNECTING) {
366     handshake_request_->RemoveHeaders(kCookieHeaders,
367                                       arraysize(kCookieHeaders));
368     if (allow && socket_->cookie_store()) {
369       // Add cookies, including HttpOnly cookies.
370       CookieOptions cookie_options;
371       cookie_options.set_include_httponly();
372       socket_->cookie_store()->GetCookiesWithOptionsAsync(
373           GetURLForCookies(), cookie_options,
374           base::Bind(&WebSocketJob::LoadCookieCallback,
375                      weak_ptr_factory_.GetWeakPtr()));
376     } else {
377       DoSendData();
378     }
379   }
380 }
381 
LoadCookieCallback(const std::string & cookie)382 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
383   if (!cookie.empty())
384     // TODO(tyoshino): Sending cookie means that connection doesn't need
385     // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
386     // wouldn't negatively affect privacy anyway. Need to restart connection
387     // or refactor to determine cookie status prior to connecting.
388     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
389   DoSendData();
390 }
391 
DoSendData()392 void WebSocketJob::DoSendData() {
393   if (spdy_websocket_stream_.get()) {
394     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
395     handshake_request_->GetRequestHeaderBlock(
396         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
397     spdy_websocket_stream_->SendRequest(headers.Pass());
398   } else {
399     const std::string& handshake_request =
400         handshake_request_->GetRawRequest();
401     handshake_request_sent_ = 0;
402     socket_->net_log()->AddEvent(
403         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
404         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
405     socket_->SendData(handshake_request.data(),
406                       handshake_request.size());
407   }
408   // Just buffered in |handshake_request_|.
409   started_to_send_handshake_request_ = true;
410 }
411 
OnSentHandshakeRequest(SocketStream * socket,int amount_sent)412 void WebSocketJob::OnSentHandshakeRequest(
413     SocketStream* socket, int amount_sent) {
414   DCHECK_EQ(state_, CONNECTING);
415   handshake_request_sent_ += amount_sent;
416   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
417   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
418     // handshake request has been sent.
419     // notify original size of handshake request to delegate.
420     // Reset the handshake_request_ first in case this object is deleted by the
421     // delegate.
422     size_t original_length = handshake_request_->original_length();
423     handshake_request_.reset();
424     if (delegate_)
425       delegate_->OnSentData(socket, original_length);
426   }
427 }
428 
OnReceivedHandshakeResponse(SocketStream * socket,const char * data,int len)429 void WebSocketJob::OnReceivedHandshakeResponse(
430     SocketStream* socket, const char* data, int len) {
431   DCHECK_EQ(state_, CONNECTING);
432   if (handshake_response_->HasResponse()) {
433     // If we already has handshake response, received data should be frame
434     // data, not handshake message.
435     received_data_after_handshake_.insert(
436         received_data_after_handshake_.end(), data, data + len);
437     return;
438   }
439 
440   size_t response_length = handshake_response_->ParseRawResponse(data, len);
441   if (!handshake_response_->HasResponse()) {
442     // not yet. we need more data.
443     return;
444   }
445   // handshake message is completed.
446   std::string raw_response = handshake_response_->GetRawResponse();
447   socket_->net_log()->AddEvent(
448       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
449       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
450   if (len - response_length > 0) {
451     // If we received extra data, it should be frame data.
452     DCHECK(received_data_after_handshake_.empty());
453     received_data_after_handshake_.assign(data + response_length, data + len);
454   }
455   SaveCookiesAndNotifyHeadersComplete();
456 }
457 
SaveCookiesAndNotifyHeadersComplete()458 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
459   // handshake message is completed.
460   DCHECK(handshake_response_->HasResponse());
461 
462   // Extract cookies from the handshake response into a temporary vector.
463   response_cookies_.clear();
464   response_cookies_save_index_ = 0;
465 
466   handshake_response_->GetHeaders(
467       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
468 
469   // Now, loop over the response cookies, and attempt to persist each.
470   SaveNextCookie();
471 }
472 
NotifyHeadersComplete()473 void WebSocketJob::NotifyHeadersComplete() {
474   // Remove cookie headers, with malformed headers preserved.
475   // Actual handshake should be done in Blink.
476   handshake_response_->RemoveHeaders(
477       kSetCookieHeaders, arraysize(kSetCookieHeaders));
478   std::string handshake_response = handshake_response_->GetResponse();
479   handshake_response_.reset();
480   std::vector<char> received_data(handshake_response.begin(),
481                                   handshake_response.end());
482   received_data.insert(received_data.end(),
483                        received_data_after_handshake_.begin(),
484                        received_data_after_handshake_.end());
485   received_data_after_handshake_.clear();
486 
487   state_ = OPEN;
488 
489   DCHECK(!received_data.empty());
490   if (delegate_)
491     delegate_->OnReceivedData(
492         socket_.get(), &received_data.front(), received_data.size());
493 
494   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
495 }
496 
SaveNextCookie()497 void WebSocketJob::SaveNextCookie() {
498   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
499     return;
500 
501   callback_pending_ = false;
502   save_next_cookie_running_ = true;
503 
504   if (socket_->cookie_store()) {
505     GURL url_for_cookies = GetURLForCookies();
506 
507     CookieOptions options;
508     options.set_include_httponly();
509 
510     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
511     // CookieMonster's asynchronous operation APIs queue the callback to run it
512     // on the thread where the API was called, there won't be race. I.e. unless
513     // the callback is run synchronously, it won't be run in parallel with this
514     // method.
515     while (!callback_pending_ &&
516            response_cookies_save_index_ < response_cookies_.size()) {
517       std::string cookie = response_cookies_[response_cookies_save_index_];
518       response_cookies_save_index_++;
519 
520       if (!delegate_->CanSetCookie(
521               socket_.get(), url_for_cookies, cookie, &options))
522         continue;
523 
524       callback_pending_ = true;
525       socket_->cookie_store()->SetCookieWithOptionsAsync(
526           url_for_cookies, cookie, options,
527           base::Bind(&WebSocketJob::OnCookieSaved,
528                      weak_ptr_factory_.GetWeakPtr()));
529     }
530   }
531 
532   save_next_cookie_running_ = false;
533 
534   if (callback_pending_)
535     return;
536 
537   response_cookies_.clear();
538   response_cookies_save_index_ = 0;
539 
540   NotifyHeadersComplete();
541 }
542 
OnCookieSaved(bool cookie_status)543 void WebSocketJob::OnCookieSaved(bool cookie_status) {
544   // Tell the caller of SetCookieWithOptionsAsync() that this completion
545   // callback is invoked.
546   // - If the caller checks callback_pending earlier than this callback, the
547   //   caller exits to let this method continue iteration.
548   // - Otherwise, the caller continues iteration.
549   callback_pending_ = false;
550 
551   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
552   // the loop. Otherwise, return.
553   if (save_next_cookie_running_)
554     return;
555 
556   SaveNextCookie();
557 }
558 
GetURLForCookies() const559 GURL WebSocketJob::GetURLForCookies() const {
560   GURL url = socket_->url();
561   std::string scheme = socket_->is_secure() ? "https" : "http";
562   url::Replacements<char> replacements;
563   replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
564   return url.ReplaceComponents(replacements);
565 }
566 
address_list() const567 const AddressList& WebSocketJob::address_list() const {
568   return addresses_;
569 }
570 
TrySpdyStream()571 int WebSocketJob::TrySpdyStream() {
572   if (!socket_.get())
573     return ERR_FAILED;
574 
575   // Check if we have a SPDY session available.
576   HttpTransactionFactory* factory =
577       socket_->context()->http_transaction_factory();
578   if (!factory)
579     return OK;
580   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
581   if (!session.get() || !session->params().enable_websocket_over_spdy)
582     return OK;
583   SpdySessionPool* spdy_pool = session->spdy_session_pool();
584   PrivacyMode privacy_mode = socket_->privacy_mode();
585   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
586                            socket_->proxy_server(), privacy_mode);
587   // Forbid wss downgrade to SPDY without SSL.
588   // TODO(toyoshim): Does it realize the same policy with HTTP?
589   base::WeakPtr<SpdySession> spdy_session =
590       spdy_pool->FindAvailableSession(key, *socket_->net_log());
591   if (!spdy_session)
592     return OK;
593 
594   SSLInfo ssl_info;
595   bool was_npn_negotiated;
596   NextProto protocol_negotiated = kProtoUnknown;
597   bool use_ssl = spdy_session->GetSSLInfo(
598       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
599   if (socket_->is_secure() && !use_ssl)
600     return OK;
601 
602   // Create SpdyWebSocketStream.
603   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
604   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
605 
606   int result = spdy_websocket_stream_->InitializeStream(
607       socket_->url(), MEDIUM, *socket_->net_log());
608   if (result == OK) {
609     OnConnected(socket_.get(), kMaxPendingSendAllowed);
610     return ERR_PROTOCOL_SWITCHED;
611   }
612   if (result != ERR_IO_PENDING) {
613     spdy_websocket_stream_.reset();
614     return OK;
615   }
616 
617   return ERR_IO_PENDING;
618 }
619 
SetWaiting()620 void WebSocketJob::SetWaiting() {
621   waiting_ = true;
622 }
623 
IsWaiting() const624 bool WebSocketJob::IsWaiting() const {
625   return waiting_;
626 }
627 
Wakeup()628 void WebSocketJob::Wakeup() {
629   if (!waiting_)
630     return;
631   waiting_ = false;
632   DCHECK(!callback_.is_null());
633   base::MessageLoopForIO::current()->PostTask(
634       FROM_HERE,
635       base::Bind(&WebSocketJob::RetryPendingIO,
636                  weak_ptr_factory_.GetWeakPtr()));
637 }
638 
RetryPendingIO()639 void WebSocketJob::RetryPendingIO() {
640   int result = TrySpdyStream();
641 
642   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
643   // OnCreatedSpdyStream().
644   if (result != ERR_IO_PENDING)
645     CompleteIO(result);
646 }
647 
CompleteIO(int result)648 void WebSocketJob::CompleteIO(int result) {
649   // |callback_| may be null if OnClose() or DetachDelegate() was called.
650   if (!callback_.is_null()) {
651     CompletionCallback callback = callback_;
652     callback_.Reset();
653     callback.Run(result);
654     Release();  // Balanced with OnStartOpenConnection().
655   }
656 }
657 
SendDataInternal(const char * data,int length)658 bool WebSocketJob::SendDataInternal(const char* data, int length) {
659   if (spdy_websocket_stream_.get())
660     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
661   if (socket_.get())
662     return socket_->SendData(data, length);
663   return false;
664 }
665 
CloseInternal()666 void WebSocketJob::CloseInternal() {
667   if (spdy_websocket_stream_.get())
668     spdy_websocket_stream_->Close();
669   if (socket_.get())
670     socket_->Close();
671 }
672 
SendPending()673 void WebSocketJob::SendPending() {
674   if (current_send_buffer_.get())
675     return;
676 
677   // Current buffer has been sent. Try next if any.
678   if (send_buffer_queue_.empty()) {
679     // No more data to send.
680     if (state_ == CLOSING)
681       CloseInternal();
682     return;
683   }
684 
685   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
686   send_buffer_queue_.pop_front();
687   current_send_buffer_ =
688       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
689   SendDataInternal(current_send_buffer_->data(),
690                    current_send_buffer_->BytesRemaining());
691 }
692 
693 }  // namespace net
694