1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_job.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
25
26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
27
28 namespace {
29
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32 "cookie", "cookie2"
33 };
34 const char* const kSetCookieHeaders[] = {
35 "set-cookie", "set-cookie2"
36 };
37
WebSocketJobFactory(const GURL & url,net::SocketStream::Delegate * delegate)38 net::SocketStreamJob* WebSocketJobFactory(
39 const GURL& url, net::SocketStream::Delegate* delegate) {
40 net::WebSocketJob* job = new net::WebSocketJob(delegate);
41 job->InitSocketStream(new net::SocketStream(url, job));
42 return job;
43 }
44
45 class WebSocketJobInitSingleton {
46 private:
47 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton()48 WebSocketJobInitSingleton() {
49 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
50 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
51 }
52 };
53
54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
55 LAZY_INSTANCE_INITIALIZER;
56
57 } // anonymous namespace
58
59 namespace net {
60
61 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
62
63 // static
EnsureInit()64 void WebSocketJob::EnsureInit() {
65 g_websocket_job_init.Get();
66 }
67
68 // static
set_websocket_over_spdy_enabled(bool enabled)69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
70 websocket_over_spdy_enabled_ = enabled;
71 }
72
WebSocketJob(SocketStream::Delegate * delegate)73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
74 : delegate_(delegate),
75 state_(INITIALIZED),
76 waiting_(false),
77 handshake_request_(new WebSocketHandshakeRequestHandler),
78 handshake_response_(new WebSocketHandshakeResponseHandler),
79 started_to_send_handshake_request_(false),
80 handshake_request_sent_(0),
81 response_cookies_save_index_(0),
82 spdy_protocol_version_(0),
83 save_next_cookie_running_(false),
84 callback_pending_(false),
85 weak_ptr_factory_(this),
86 weak_ptr_factory_for_send_pending_(this) {
87 }
88
~WebSocketJob()89 WebSocketJob::~WebSocketJob() {
90 DCHECK_EQ(CLOSED, state_);
91 DCHECK(!delegate_);
92 DCHECK(!socket_.get());
93 }
94
Connect()95 void WebSocketJob::Connect() {
96 DCHECK(socket_.get());
97 DCHECK_EQ(state_, INITIALIZED);
98 state_ = CONNECTING;
99 socket_->Connect();
100 }
101
SendData(const char * data,int len)102 bool WebSocketJob::SendData(const char* data, int len) {
103 switch (state_) {
104 case INITIALIZED:
105 return false;
106
107 case CONNECTING:
108 return SendHandshakeRequest(data, len);
109
110 case OPEN:
111 {
112 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
113 memcpy(buffer->data(), data, len);
114 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
115 send_buffer_queue_.push_back(buffer);
116 return true;
117 }
118 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
119 return SendDataInternal(current_send_buffer_->data(),
120 current_send_buffer_->BytesRemaining());
121 }
122
123 case CLOSING:
124 case CLOSED:
125 return false;
126 }
127 return false;
128 }
129
Close()130 void WebSocketJob::Close() {
131 if (state_ == CLOSED)
132 return;
133
134 state_ = CLOSING;
135 if (current_send_buffer_.get()) {
136 // Will close in SendPending.
137 return;
138 }
139 state_ = CLOSED;
140 CloseInternal();
141 }
142
RestartWithAuth(const AuthCredentials & credentials)143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
144 state_ = CONNECTING;
145 socket_->RestartWithAuth(credentials);
146 }
147
DetachDelegate()148 void WebSocketJob::DetachDelegate() {
149 state_ = CLOSED;
150 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
151
152 scoped_refptr<WebSocketJob> protect(this);
153 weak_ptr_factory_.InvalidateWeakPtrs();
154 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
155
156 delegate_ = NULL;
157 if (socket_.get())
158 socket_->DetachDelegate();
159 socket_ = NULL;
160 if (!callback_.is_null()) {
161 waiting_ = false;
162 callback_.Reset();
163 Release(); // Balanced with OnStartOpenConnection().
164 }
165 }
166
OnStartOpenConnection(SocketStream * socket,const CompletionCallback & callback)167 int WebSocketJob::OnStartOpenConnection(
168 SocketStream* socket, const CompletionCallback& callback) {
169 DCHECK(callback_.is_null());
170 state_ = CONNECTING;
171
172 addresses_ = socket->address_list();
173 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
174 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
175 }
176
177 if (delegate_) {
178 int result = delegate_->OnStartOpenConnection(socket, callback);
179 DCHECK_EQ(OK, result);
180 }
181 if (waiting_) {
182 // PutInQueue() may set |waiting_| true for throttling. In this case,
183 // Wakeup() will be called later.
184 callback_ = callback;
185 AddRef(); // Balanced when callback_ is cleared.
186 return ERR_IO_PENDING;
187 }
188 return TrySpdyStream();
189 }
190
OnConnected(SocketStream * socket,int max_pending_send_allowed)191 void WebSocketJob::OnConnected(
192 SocketStream* socket, int max_pending_send_allowed) {
193 if (state_ == CLOSED)
194 return;
195 DCHECK_EQ(CONNECTING, state_);
196 if (delegate_)
197 delegate_->OnConnected(socket, max_pending_send_allowed);
198 }
199
OnSentData(SocketStream * socket,int amount_sent)200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
201 DCHECK_NE(INITIALIZED, state_);
202 DCHECK_GT(amount_sent, 0);
203 if (state_ == CLOSED)
204 return;
205 if (state_ == CONNECTING) {
206 OnSentHandshakeRequest(socket, amount_sent);
207 return;
208 }
209 if (delegate_) {
210 DCHECK(state_ == OPEN || state_ == CLOSING);
211 if (!current_send_buffer_.get()) {
212 VLOG(1)
213 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
214 return;
215 }
216 current_send_buffer_->DidConsume(amount_sent);
217 if (current_send_buffer_->BytesRemaining() > 0)
218 return;
219
220 // We need to report amount_sent of original buffer size, instead of
221 // amount sent to |socket|.
222 amount_sent = current_send_buffer_->size();
223 DCHECK_GT(amount_sent, 0);
224 current_send_buffer_ = NULL;
225 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
226 base::MessageLoopForIO::current()->PostTask(
227 FROM_HERE,
228 base::Bind(&WebSocketJob::SendPending,
229 weak_ptr_factory_for_send_pending_.GetWeakPtr()));
230 }
231 delegate_->OnSentData(socket, amount_sent);
232 }
233 }
234
OnReceivedData(SocketStream * socket,const char * data,int len)235 void WebSocketJob::OnReceivedData(
236 SocketStream* socket, const char* data, int len) {
237 DCHECK_NE(INITIALIZED, state_);
238 if (state_ == CLOSED)
239 return;
240 if (state_ == CONNECTING) {
241 OnReceivedHandshakeResponse(socket, data, len);
242 return;
243 }
244 DCHECK(state_ == OPEN || state_ == CLOSING);
245 if (delegate_ && len > 0)
246 delegate_->OnReceivedData(socket, data, len);
247 }
248
OnClose(SocketStream * socket)249 void WebSocketJob::OnClose(SocketStream* socket) {
250 state_ = CLOSED;
251 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
252
253 scoped_refptr<WebSocketJob> protect(this);
254 weak_ptr_factory_.InvalidateWeakPtrs();
255
256 SocketStream::Delegate* delegate = delegate_;
257 delegate_ = NULL;
258 socket_ = NULL;
259 if (!callback_.is_null()) {
260 waiting_ = false;
261 callback_.Reset();
262 Release(); // Balanced with OnStartOpenConnection().
263 }
264 if (delegate)
265 delegate->OnClose(socket);
266 }
267
OnAuthRequired(SocketStream * socket,AuthChallengeInfo * auth_info)268 void WebSocketJob::OnAuthRequired(
269 SocketStream* socket, AuthChallengeInfo* auth_info) {
270 if (delegate_)
271 delegate_->OnAuthRequired(socket, auth_info);
272 }
273
OnSSLCertificateError(SocketStream * socket,const SSLInfo & ssl_info,bool fatal)274 void WebSocketJob::OnSSLCertificateError(
275 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
276 if (delegate_)
277 delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
278 }
279
OnError(const SocketStream * socket,int error)280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
281 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
282 delegate_->OnError(socket, error);
283 }
284
OnCreatedSpdyStream(int result)285 void WebSocketJob::OnCreatedSpdyStream(int result) {
286 DCHECK(spdy_websocket_stream_.get());
287 DCHECK(socket_.get());
288 DCHECK_NE(ERR_IO_PENDING, result);
289
290 if (state_ == CLOSED) {
291 result = ERR_ABORTED;
292 } else if (result == OK) {
293 state_ = CONNECTING;
294 result = ERR_PROTOCOL_SWITCHED;
295 } else {
296 spdy_websocket_stream_.reset();
297 }
298
299 CompleteIO(result);
300 }
301
OnSentSpdyHeaders()302 void WebSocketJob::OnSentSpdyHeaders() {
303 DCHECK_NE(INITIALIZED, state_);
304 if (state_ != CONNECTING)
305 return;
306 if (delegate_)
307 delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
308 handshake_request_.reset();
309 }
310
OnSpdyResponseHeadersUpdated(const SpdyHeaderBlock & response_headers)311 void WebSocketJob::OnSpdyResponseHeadersUpdated(
312 const SpdyHeaderBlock& response_headers) {
313 DCHECK_NE(INITIALIZED, state_);
314 if (state_ != CONNECTING)
315 return;
316 // TODO(toyoshim): Fallback to non-spdy connection?
317 handshake_response_->ParseResponseHeaderBlock(response_headers,
318 challenge_,
319 spdy_protocol_version_);
320
321 SaveCookiesAndNotifyHeadersComplete();
322 }
323
OnSentSpdyData(size_t bytes_sent)324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
325 DCHECK_NE(INITIALIZED, state_);
326 DCHECK_NE(CONNECTING, state_);
327 if (state_ == CLOSED)
328 return;
329 if (!spdy_websocket_stream_.get())
330 return;
331 OnSentData(socket_.get(), static_cast<int>(bytes_sent));
332 }
333
OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer)334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
335 DCHECK_NE(INITIALIZED, state_);
336 DCHECK_NE(CONNECTING, state_);
337 if (state_ == CLOSED)
338 return;
339 if (!spdy_websocket_stream_.get())
340 return;
341 if (buffer) {
342 OnReceivedData(
343 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
344 } else {
345 OnReceivedData(socket_.get(), NULL, 0);
346 }
347 }
348
OnCloseSpdyStream()349 void WebSocketJob::OnCloseSpdyStream() {
350 spdy_websocket_stream_.reset();
351 OnClose(socket_.get());
352 }
353
SendHandshakeRequest(const char * data,int len)354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
355 DCHECK_EQ(state_, CONNECTING);
356 if (started_to_send_handshake_request_)
357 return false;
358 if (!handshake_request_->ParseRequest(data, len))
359 return false;
360
361 AddCookieHeaderAndSend();
362 return true;
363 }
364
AddCookieHeaderAndSend()365 void WebSocketJob::AddCookieHeaderAndSend() {
366 bool allow = true;
367 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
368 allow = false;
369
370 if (socket_.get() && delegate_ && state_ == CONNECTING) {
371 handshake_request_->RemoveHeaders(kCookieHeaders,
372 arraysize(kCookieHeaders));
373 if (allow && socket_->context()->cookie_store()) {
374 // Add cookies, including HttpOnly cookies.
375 CookieOptions cookie_options;
376 cookie_options.set_include_httponly();
377 socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
378 GetURLForCookies(), cookie_options,
379 base::Bind(&WebSocketJob::LoadCookieCallback,
380 weak_ptr_factory_.GetWeakPtr()));
381 } else {
382 DoSendData();
383 }
384 }
385 }
386
LoadCookieCallback(const std::string & cookie)387 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
388 if (!cookie.empty())
389 // TODO(tyoshino): Sending cookie means that connection doesn't need
390 // kPrivacyModeEnabled as cookies may be server-bound and channel id
391 // wouldn't negatively affect privacy anyway. Need to restart connection
392 // or refactor to determine cookie status prior to connecting.
393 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
394 DoSendData();
395 }
396
DoSendData()397 void WebSocketJob::DoSendData() {
398 if (spdy_websocket_stream_.get()) {
399 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
400 handshake_request_->GetRequestHeaderBlock(
401 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
402 spdy_websocket_stream_->SendRequest(headers.Pass());
403 } else {
404 const std::string& handshake_request =
405 handshake_request_->GetRawRequest();
406 handshake_request_sent_ = 0;
407 socket_->net_log()->AddEvent(
408 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
409 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
410 socket_->SendData(handshake_request.data(),
411 handshake_request.size());
412 }
413 // Just buffered in |handshake_request_|.
414 started_to_send_handshake_request_ = true;
415 }
416
OnSentHandshakeRequest(SocketStream * socket,int amount_sent)417 void WebSocketJob::OnSentHandshakeRequest(
418 SocketStream* socket, int amount_sent) {
419 DCHECK_EQ(state_, CONNECTING);
420 handshake_request_sent_ += amount_sent;
421 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
422 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
423 // handshake request has been sent.
424 // notify original size of handshake request to delegate.
425 if (delegate_)
426 delegate_->OnSentData(
427 socket,
428 handshake_request_->original_length());
429 handshake_request_.reset();
430 }
431 }
432
OnReceivedHandshakeResponse(SocketStream * socket,const char * data,int len)433 void WebSocketJob::OnReceivedHandshakeResponse(
434 SocketStream* socket, const char* data, int len) {
435 DCHECK_EQ(state_, CONNECTING);
436 if (handshake_response_->HasResponse()) {
437 // If we already has handshake response, received data should be frame
438 // data, not handshake message.
439 received_data_after_handshake_.insert(
440 received_data_after_handshake_.end(), data, data + len);
441 return;
442 }
443
444 size_t response_length = handshake_response_->ParseRawResponse(data, len);
445 if (!handshake_response_->HasResponse()) {
446 // not yet. we need more data.
447 return;
448 }
449 // handshake message is completed.
450 std::string raw_response = handshake_response_->GetRawResponse();
451 socket_->net_log()->AddEvent(
452 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
453 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
454 if (len - response_length > 0) {
455 // If we received extra data, it should be frame data.
456 DCHECK(received_data_after_handshake_.empty());
457 received_data_after_handshake_.assign(data + response_length, data + len);
458 }
459 SaveCookiesAndNotifyHeadersComplete();
460 }
461
SaveCookiesAndNotifyHeadersComplete()462 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
463 // handshake message is completed.
464 DCHECK(handshake_response_->HasResponse());
465
466 // Extract cookies from the handshake response into a temporary vector.
467 response_cookies_.clear();
468 response_cookies_save_index_ = 0;
469
470 handshake_response_->GetHeaders(
471 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
472
473 // Now, loop over the response cookies, and attempt to persist each.
474 SaveNextCookie();
475 }
476
NotifyHeadersComplete()477 void WebSocketJob::NotifyHeadersComplete() {
478 // Remove cookie headers, with malformed headers preserved.
479 // Actual handshake should be done in Blink.
480 handshake_response_->RemoveHeaders(
481 kSetCookieHeaders, arraysize(kSetCookieHeaders));
482 std::string handshake_response = handshake_response_->GetResponse();
483 handshake_response_.reset();
484 std::vector<char> received_data(handshake_response.begin(),
485 handshake_response.end());
486 received_data.insert(received_data.end(),
487 received_data_after_handshake_.begin(),
488 received_data_after_handshake_.end());
489 received_data_after_handshake_.clear();
490
491 state_ = OPEN;
492
493 DCHECK(!received_data.empty());
494 if (delegate_)
495 delegate_->OnReceivedData(
496 socket_.get(), &received_data.front(), received_data.size());
497
498 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
499 }
500
SaveNextCookie()501 void WebSocketJob::SaveNextCookie() {
502 if (!socket_.get() || !delegate_ || state_ != CONNECTING)
503 return;
504
505 callback_pending_ = false;
506 save_next_cookie_running_ = true;
507
508 if (socket_->context()->cookie_store()) {
509 GURL url_for_cookies = GetURLForCookies();
510
511 CookieOptions options;
512 options.set_include_httponly();
513
514 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
515 // CookieMonster's asynchronous operation APIs queue the callback to run it
516 // on the thread where the API was called, there won't be race. I.e. unless
517 // the callback is run synchronously, it won't be run in parallel with this
518 // method.
519 while (!callback_pending_ &&
520 response_cookies_save_index_ < response_cookies_.size()) {
521 std::string cookie = response_cookies_[response_cookies_save_index_];
522 response_cookies_save_index_++;
523
524 if (!delegate_->CanSetCookie(
525 socket_.get(), url_for_cookies, cookie, &options))
526 continue;
527
528 callback_pending_ = true;
529 socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
530 url_for_cookies, cookie, options,
531 base::Bind(&WebSocketJob::OnCookieSaved,
532 weak_ptr_factory_.GetWeakPtr()));
533 }
534 }
535
536 save_next_cookie_running_ = false;
537
538 if (callback_pending_)
539 return;
540
541 response_cookies_.clear();
542 response_cookies_save_index_ = 0;
543
544 NotifyHeadersComplete();
545 }
546
OnCookieSaved(bool cookie_status)547 void WebSocketJob::OnCookieSaved(bool cookie_status) {
548 // Tell the caller of SetCookieWithOptionsAsync() that this completion
549 // callback is invoked.
550 // - If the caller checks callback_pending earlier than this callback, the
551 // caller exits to let this method continue iteration.
552 // - Otherwise, the caller continues iteration.
553 callback_pending_ = false;
554
555 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
556 // the loop. Otherwise, return.
557 if (save_next_cookie_running_)
558 return;
559
560 SaveNextCookie();
561 }
562
GetURLForCookies() const563 GURL WebSocketJob::GetURLForCookies() const {
564 GURL url = socket_->url();
565 std::string scheme = socket_->is_secure() ? "https" : "http";
566 url_canon::Replacements<char> replacements;
567 replacements.SetScheme(scheme.c_str(),
568 url_parse::Component(0, scheme.length()));
569 return url.ReplaceComponents(replacements);
570 }
571
address_list() const572 const AddressList& WebSocketJob::address_list() const {
573 return addresses_;
574 }
575
TrySpdyStream()576 int WebSocketJob::TrySpdyStream() {
577 if (!socket_.get())
578 return ERR_FAILED;
579
580 if (!websocket_over_spdy_enabled_)
581 return OK;
582
583 // Check if we have a SPDY session available.
584 HttpTransactionFactory* factory =
585 socket_->context()->http_transaction_factory();
586 if (!factory)
587 return OK;
588 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
589 if (!session.get())
590 return OK;
591 SpdySessionPool* spdy_pool = session->spdy_session_pool();
592 PrivacyMode privacy_mode = socket_->privacy_mode();
593 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
594 socket_->proxy_server(), privacy_mode);
595 // Forbid wss downgrade to SPDY without SSL.
596 // TODO(toyoshim): Does it realize the same policy with HTTP?
597 base::WeakPtr<SpdySession> spdy_session =
598 spdy_pool->FindAvailableSession(key, *socket_->net_log());
599 if (!spdy_session)
600 return OK;
601
602 SSLInfo ssl_info;
603 bool was_npn_negotiated;
604 NextProto protocol_negotiated = kProtoUnknown;
605 bool use_ssl = spdy_session->GetSSLInfo(
606 &ssl_info, &was_npn_negotiated, &protocol_negotiated);
607 if (socket_->is_secure() && !use_ssl)
608 return OK;
609
610 // Create SpdyWebSocketStream.
611 spdy_protocol_version_ = spdy_session->GetProtocolVersion();
612 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
613
614 int result = spdy_websocket_stream_->InitializeStream(
615 socket_->url(), MEDIUM, *socket_->net_log());
616 if (result == OK) {
617 OnConnected(socket_.get(), kMaxPendingSendAllowed);
618 return ERR_PROTOCOL_SWITCHED;
619 }
620 if (result != ERR_IO_PENDING) {
621 spdy_websocket_stream_.reset();
622 return OK;
623 }
624
625 return ERR_IO_PENDING;
626 }
627
SetWaiting()628 void WebSocketJob::SetWaiting() {
629 waiting_ = true;
630 }
631
IsWaiting() const632 bool WebSocketJob::IsWaiting() const {
633 return waiting_;
634 }
635
Wakeup()636 void WebSocketJob::Wakeup() {
637 if (!waiting_)
638 return;
639 waiting_ = false;
640 DCHECK(!callback_.is_null());
641 base::MessageLoopForIO::current()->PostTask(
642 FROM_HERE,
643 base::Bind(&WebSocketJob::RetryPendingIO,
644 weak_ptr_factory_.GetWeakPtr()));
645 }
646
RetryPendingIO()647 void WebSocketJob::RetryPendingIO() {
648 int result = TrySpdyStream();
649
650 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
651 // OnCreatedSpdyStream().
652 if (result != ERR_IO_PENDING)
653 CompleteIO(result);
654 }
655
CompleteIO(int result)656 void WebSocketJob::CompleteIO(int result) {
657 // |callback_| may be null if OnClose() or DetachDelegate() was called.
658 if (!callback_.is_null()) {
659 CompletionCallback callback = callback_;
660 callback_.Reset();
661 callback.Run(result);
662 Release(); // Balanced with OnStartOpenConnection().
663 }
664 }
665
SendDataInternal(const char * data,int length)666 bool WebSocketJob::SendDataInternal(const char* data, int length) {
667 if (spdy_websocket_stream_.get())
668 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
669 if (socket_.get())
670 return socket_->SendData(data, length);
671 return false;
672 }
673
CloseInternal()674 void WebSocketJob::CloseInternal() {
675 if (spdy_websocket_stream_.get())
676 spdy_websocket_stream_->Close();
677 if (socket_.get())
678 socket_->Close();
679 }
680
SendPending()681 void WebSocketJob::SendPending() {
682 if (current_send_buffer_.get())
683 return;
684
685 // Current buffer has been sent. Try next if any.
686 if (send_buffer_queue_.empty()) {
687 // No more data to send.
688 if (state_ == CLOSING)
689 CloseInternal();
690 return;
691 }
692
693 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
694 send_buffer_queue_.pop_front();
695 current_send_buffer_ =
696 new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
697 SendDataInternal(current_send_buffer_->data(),
698 current_send_buffer_->BytesRemaining());
699 }
700
701 } // namespace net
702