1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_job.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
25
26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
27
28 namespace {
29
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32 "cookie", "cookie2"
33 };
34 const char* const kSetCookieHeaders[] = {
35 "set-cookie", "set-cookie2"
36 };
37
WebSocketJobFactory(const GURL & url,net::SocketStream::Delegate * delegate,net::URLRequestContext * context,net::CookieStore * cookie_store)38 net::SocketStreamJob* WebSocketJobFactory(
39 const GURL& url, net::SocketStream::Delegate* delegate,
40 net::URLRequestContext* context, net::CookieStore* cookie_store) {
41 net::WebSocketJob* job = new net::WebSocketJob(delegate);
42 job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
43 return job;
44 }
45
46 class WebSocketJobInitSingleton {
47 private:
48 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton()49 WebSocketJobInitSingleton() {
50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
52 }
53 };
54
55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56 LAZY_INSTANCE_INITIALIZER;
57
58 } // anonymous namespace
59
60 namespace net {
61
62 // static
EnsureInit()63 void WebSocketJob::EnsureInit() {
64 g_websocket_job_init.Get();
65 }
66
WebSocketJob(SocketStream::Delegate * delegate)67 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
68 : delegate_(delegate),
69 state_(INITIALIZED),
70 waiting_(false),
71 handshake_request_(new WebSocketHandshakeRequestHandler),
72 handshake_response_(new WebSocketHandshakeResponseHandler),
73 started_to_send_handshake_request_(false),
74 handshake_request_sent_(0),
75 response_cookies_save_index_(0),
76 spdy_protocol_version_(0),
77 save_next_cookie_running_(false),
78 callback_pending_(false),
79 weak_ptr_factory_(this),
80 weak_ptr_factory_for_send_pending_(this) {
81 }
82
~WebSocketJob()83 WebSocketJob::~WebSocketJob() {
84 DCHECK_EQ(CLOSED, state_);
85 DCHECK(!delegate_);
86 DCHECK(!socket_.get());
87 }
88
Connect()89 void WebSocketJob::Connect() {
90 DCHECK(socket_.get());
91 DCHECK_EQ(state_, INITIALIZED);
92 state_ = CONNECTING;
93 socket_->Connect();
94 }
95
SendData(const char * data,int len)96 bool WebSocketJob::SendData(const char* data, int len) {
97 switch (state_) {
98 case INITIALIZED:
99 return false;
100
101 case CONNECTING:
102 return SendHandshakeRequest(data, len);
103
104 case OPEN:
105 {
106 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
107 memcpy(buffer->data(), data, len);
108 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
109 send_buffer_queue_.push_back(buffer);
110 return true;
111 }
112 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
113 return SendDataInternal(current_send_buffer_->data(),
114 current_send_buffer_->BytesRemaining());
115 }
116
117 case CLOSING:
118 case CLOSED:
119 return false;
120 }
121 return false;
122 }
123
Close()124 void WebSocketJob::Close() {
125 if (state_ == CLOSED)
126 return;
127
128 state_ = CLOSING;
129 if (current_send_buffer_.get()) {
130 // Will close in SendPending.
131 return;
132 }
133 state_ = CLOSED;
134 CloseInternal();
135 }
136
RestartWithAuth(const AuthCredentials & credentials)137 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
138 state_ = CONNECTING;
139 socket_->RestartWithAuth(credentials);
140 }
141
DetachDelegate()142 void WebSocketJob::DetachDelegate() {
143 state_ = CLOSED;
144 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
145
146 scoped_refptr<WebSocketJob> protect(this);
147 weak_ptr_factory_.InvalidateWeakPtrs();
148 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
149
150 delegate_ = NULL;
151 if (socket_.get())
152 socket_->DetachDelegate();
153 socket_ = NULL;
154 if (!callback_.is_null()) {
155 waiting_ = false;
156 callback_.Reset();
157 Release(); // Balanced with OnStartOpenConnection().
158 }
159 }
160
OnStartOpenConnection(SocketStream * socket,const CompletionCallback & callback)161 int WebSocketJob::OnStartOpenConnection(
162 SocketStream* socket, const CompletionCallback& callback) {
163 DCHECK(callback_.is_null());
164 state_ = CONNECTING;
165
166 addresses_ = socket->address_list();
167 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
168 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
169 }
170
171 if (delegate_) {
172 int result = delegate_->OnStartOpenConnection(socket, callback);
173 DCHECK_EQ(OK, result);
174 }
175 if (waiting_) {
176 // PutInQueue() may set |waiting_| true for throttling. In this case,
177 // Wakeup() will be called later.
178 callback_ = callback;
179 AddRef(); // Balanced when callback_ is cleared.
180 return ERR_IO_PENDING;
181 }
182 return TrySpdyStream();
183 }
184
OnConnected(SocketStream * socket,int max_pending_send_allowed)185 void WebSocketJob::OnConnected(
186 SocketStream* socket, int max_pending_send_allowed) {
187 if (state_ == CLOSED)
188 return;
189 DCHECK_EQ(CONNECTING, state_);
190 if (delegate_)
191 delegate_->OnConnected(socket, max_pending_send_allowed);
192 }
193
OnSentData(SocketStream * socket,int amount_sent)194 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
195 DCHECK_NE(INITIALIZED, state_);
196 DCHECK_GT(amount_sent, 0);
197 if (state_ == CLOSED)
198 return;
199 if (state_ == CONNECTING) {
200 OnSentHandshakeRequest(socket, amount_sent);
201 return;
202 }
203 if (delegate_) {
204 DCHECK(state_ == OPEN || state_ == CLOSING);
205 if (!current_send_buffer_.get()) {
206 VLOG(1)
207 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
208 return;
209 }
210 current_send_buffer_->DidConsume(amount_sent);
211 if (current_send_buffer_->BytesRemaining() > 0)
212 return;
213
214 // We need to report amount_sent of original buffer size, instead of
215 // amount sent to |socket|.
216 amount_sent = current_send_buffer_->size();
217 DCHECK_GT(amount_sent, 0);
218 current_send_buffer_ = NULL;
219 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
220 base::MessageLoopForIO::current()->PostTask(
221 FROM_HERE,
222 base::Bind(&WebSocketJob::SendPending,
223 weak_ptr_factory_for_send_pending_.GetWeakPtr()));
224 }
225 delegate_->OnSentData(socket, amount_sent);
226 }
227 }
228
OnReceivedData(SocketStream * socket,const char * data,int len)229 void WebSocketJob::OnReceivedData(
230 SocketStream* socket, const char* data, int len) {
231 DCHECK_NE(INITIALIZED, state_);
232 if (state_ == CLOSED)
233 return;
234 if (state_ == CONNECTING) {
235 OnReceivedHandshakeResponse(socket, data, len);
236 return;
237 }
238 DCHECK(state_ == OPEN || state_ == CLOSING);
239 if (delegate_ && len > 0)
240 delegate_->OnReceivedData(socket, data, len);
241 }
242
OnClose(SocketStream * socket)243 void WebSocketJob::OnClose(SocketStream* socket) {
244 state_ = CLOSED;
245 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
246
247 scoped_refptr<WebSocketJob> protect(this);
248 weak_ptr_factory_.InvalidateWeakPtrs();
249
250 SocketStream::Delegate* delegate = delegate_;
251 delegate_ = NULL;
252 socket_ = NULL;
253 if (!callback_.is_null()) {
254 waiting_ = false;
255 callback_.Reset();
256 Release(); // Balanced with OnStartOpenConnection().
257 }
258 if (delegate)
259 delegate->OnClose(socket);
260 }
261
OnAuthRequired(SocketStream * socket,AuthChallengeInfo * auth_info)262 void WebSocketJob::OnAuthRequired(
263 SocketStream* socket, AuthChallengeInfo* auth_info) {
264 if (delegate_)
265 delegate_->OnAuthRequired(socket, auth_info);
266 }
267
OnSSLCertificateError(SocketStream * socket,const SSLInfo & ssl_info,bool fatal)268 void WebSocketJob::OnSSLCertificateError(
269 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
270 if (delegate_)
271 delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
272 }
273
OnError(const SocketStream * socket,int error)274 void WebSocketJob::OnError(const SocketStream* socket, int error) {
275 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
276 delegate_->OnError(socket, error);
277 }
278
OnCreatedSpdyStream(int result)279 void WebSocketJob::OnCreatedSpdyStream(int result) {
280 DCHECK(spdy_websocket_stream_.get());
281 DCHECK(socket_.get());
282 DCHECK_NE(ERR_IO_PENDING, result);
283
284 if (state_ == CLOSED) {
285 result = ERR_ABORTED;
286 } else if (result == OK) {
287 state_ = CONNECTING;
288 result = ERR_PROTOCOL_SWITCHED;
289 } else {
290 spdy_websocket_stream_.reset();
291 }
292
293 CompleteIO(result);
294 }
295
OnSentSpdyHeaders()296 void WebSocketJob::OnSentSpdyHeaders() {
297 DCHECK_NE(INITIALIZED, state_);
298 if (state_ != CONNECTING)
299 return;
300 size_t original_length = handshake_request_->original_length();
301 handshake_request_.reset();
302 if (delegate_)
303 delegate_->OnSentData(socket_.get(), original_length);
304 }
305
OnSpdyResponseHeadersUpdated(const SpdyHeaderBlock & response_headers)306 void WebSocketJob::OnSpdyResponseHeadersUpdated(
307 const SpdyHeaderBlock& response_headers) {
308 DCHECK_NE(INITIALIZED, state_);
309 if (state_ != CONNECTING)
310 return;
311 // TODO(toyoshim): Fallback to non-spdy connection?
312 handshake_response_->ParseResponseHeaderBlock(response_headers,
313 challenge_,
314 spdy_protocol_version_);
315
316 SaveCookiesAndNotifyHeadersComplete();
317 }
318
OnSentSpdyData(size_t bytes_sent)319 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
320 DCHECK_NE(INITIALIZED, state_);
321 DCHECK_NE(CONNECTING, state_);
322 if (state_ == CLOSED)
323 return;
324 if (!spdy_websocket_stream_.get())
325 return;
326 OnSentData(socket_.get(), static_cast<int>(bytes_sent));
327 }
328
OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer)329 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
330 DCHECK_NE(INITIALIZED, state_);
331 DCHECK_NE(CONNECTING, state_);
332 if (state_ == CLOSED)
333 return;
334 if (!spdy_websocket_stream_.get())
335 return;
336 if (buffer) {
337 OnReceivedData(
338 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
339 } else {
340 OnReceivedData(socket_.get(), NULL, 0);
341 }
342 }
343
OnCloseSpdyStream()344 void WebSocketJob::OnCloseSpdyStream() {
345 spdy_websocket_stream_.reset();
346 OnClose(socket_.get());
347 }
348
SendHandshakeRequest(const char * data,int len)349 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
350 DCHECK_EQ(state_, CONNECTING);
351 if (started_to_send_handshake_request_)
352 return false;
353 if (!handshake_request_->ParseRequest(data, len))
354 return false;
355
356 AddCookieHeaderAndSend();
357 return true;
358 }
359
AddCookieHeaderAndSend()360 void WebSocketJob::AddCookieHeaderAndSend() {
361 bool allow = true;
362 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
363 allow = false;
364
365 if (socket_.get() && delegate_ && state_ == CONNECTING) {
366 handshake_request_->RemoveHeaders(kCookieHeaders,
367 arraysize(kCookieHeaders));
368 if (allow && socket_->cookie_store()) {
369 // Add cookies, including HttpOnly cookies.
370 CookieOptions cookie_options;
371 cookie_options.set_include_httponly();
372 socket_->cookie_store()->GetCookiesWithOptionsAsync(
373 GetURLForCookies(), cookie_options,
374 base::Bind(&WebSocketJob::LoadCookieCallback,
375 weak_ptr_factory_.GetWeakPtr()));
376 } else {
377 DoSendData();
378 }
379 }
380 }
381
LoadCookieCallback(const std::string & cookie)382 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
383 if (!cookie.empty())
384 // TODO(tyoshino): Sending cookie means that connection doesn't need
385 // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
386 // wouldn't negatively affect privacy anyway. Need to restart connection
387 // or refactor to determine cookie status prior to connecting.
388 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
389 DoSendData();
390 }
391
DoSendData()392 void WebSocketJob::DoSendData() {
393 if (spdy_websocket_stream_.get()) {
394 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
395 handshake_request_->GetRequestHeaderBlock(
396 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
397 spdy_websocket_stream_->SendRequest(headers.Pass());
398 } else {
399 const std::string& handshake_request =
400 handshake_request_->GetRawRequest();
401 handshake_request_sent_ = 0;
402 socket_->net_log()->AddEvent(
403 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
404 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
405 socket_->SendData(handshake_request.data(),
406 handshake_request.size());
407 }
408 // Just buffered in |handshake_request_|.
409 started_to_send_handshake_request_ = true;
410 }
411
OnSentHandshakeRequest(SocketStream * socket,int amount_sent)412 void WebSocketJob::OnSentHandshakeRequest(
413 SocketStream* socket, int amount_sent) {
414 DCHECK_EQ(state_, CONNECTING);
415 handshake_request_sent_ += amount_sent;
416 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
417 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
418 // handshake request has been sent.
419 // notify original size of handshake request to delegate.
420 // Reset the handshake_request_ first in case this object is deleted by the
421 // delegate.
422 size_t original_length = handshake_request_->original_length();
423 handshake_request_.reset();
424 if (delegate_)
425 delegate_->OnSentData(socket, original_length);
426 }
427 }
428
OnReceivedHandshakeResponse(SocketStream * socket,const char * data,int len)429 void WebSocketJob::OnReceivedHandshakeResponse(
430 SocketStream* socket, const char* data, int len) {
431 DCHECK_EQ(state_, CONNECTING);
432 if (handshake_response_->HasResponse()) {
433 // If we already has handshake response, received data should be frame
434 // data, not handshake message.
435 received_data_after_handshake_.insert(
436 received_data_after_handshake_.end(), data, data + len);
437 return;
438 }
439
440 size_t response_length = handshake_response_->ParseRawResponse(data, len);
441 if (!handshake_response_->HasResponse()) {
442 // not yet. we need more data.
443 return;
444 }
445 // handshake message is completed.
446 std::string raw_response = handshake_response_->GetRawResponse();
447 socket_->net_log()->AddEvent(
448 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
449 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
450 if (len - response_length > 0) {
451 // If we received extra data, it should be frame data.
452 DCHECK(received_data_after_handshake_.empty());
453 received_data_after_handshake_.assign(data + response_length, data + len);
454 }
455 SaveCookiesAndNotifyHeadersComplete();
456 }
457
SaveCookiesAndNotifyHeadersComplete()458 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
459 // handshake message is completed.
460 DCHECK(handshake_response_->HasResponse());
461
462 // Extract cookies from the handshake response into a temporary vector.
463 response_cookies_.clear();
464 response_cookies_save_index_ = 0;
465
466 handshake_response_->GetHeaders(
467 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
468
469 // Now, loop over the response cookies, and attempt to persist each.
470 SaveNextCookie();
471 }
472
NotifyHeadersComplete()473 void WebSocketJob::NotifyHeadersComplete() {
474 // Remove cookie headers, with malformed headers preserved.
475 // Actual handshake should be done in Blink.
476 handshake_response_->RemoveHeaders(
477 kSetCookieHeaders, arraysize(kSetCookieHeaders));
478 std::string handshake_response = handshake_response_->GetResponse();
479 handshake_response_.reset();
480 std::vector<char> received_data(handshake_response.begin(),
481 handshake_response.end());
482 received_data.insert(received_data.end(),
483 received_data_after_handshake_.begin(),
484 received_data_after_handshake_.end());
485 received_data_after_handshake_.clear();
486
487 state_ = OPEN;
488
489 DCHECK(!received_data.empty());
490 if (delegate_)
491 delegate_->OnReceivedData(
492 socket_.get(), &received_data.front(), received_data.size());
493
494 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
495 }
496
SaveNextCookie()497 void WebSocketJob::SaveNextCookie() {
498 if (!socket_.get() || !delegate_ || state_ != CONNECTING)
499 return;
500
501 callback_pending_ = false;
502 save_next_cookie_running_ = true;
503
504 if (socket_->cookie_store()) {
505 GURL url_for_cookies = GetURLForCookies();
506
507 CookieOptions options;
508 options.set_include_httponly();
509
510 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
511 // CookieMonster's asynchronous operation APIs queue the callback to run it
512 // on the thread where the API was called, there won't be race. I.e. unless
513 // the callback is run synchronously, it won't be run in parallel with this
514 // method.
515 while (!callback_pending_ &&
516 response_cookies_save_index_ < response_cookies_.size()) {
517 std::string cookie = response_cookies_[response_cookies_save_index_];
518 response_cookies_save_index_++;
519
520 if (!delegate_->CanSetCookie(
521 socket_.get(), url_for_cookies, cookie, &options))
522 continue;
523
524 callback_pending_ = true;
525 socket_->cookie_store()->SetCookieWithOptionsAsync(
526 url_for_cookies, cookie, options,
527 base::Bind(&WebSocketJob::OnCookieSaved,
528 weak_ptr_factory_.GetWeakPtr()));
529 }
530 }
531
532 save_next_cookie_running_ = false;
533
534 if (callback_pending_)
535 return;
536
537 response_cookies_.clear();
538 response_cookies_save_index_ = 0;
539
540 NotifyHeadersComplete();
541 }
542
OnCookieSaved(bool cookie_status)543 void WebSocketJob::OnCookieSaved(bool cookie_status) {
544 // Tell the caller of SetCookieWithOptionsAsync() that this completion
545 // callback is invoked.
546 // - If the caller checks callback_pending earlier than this callback, the
547 // caller exits to let this method continue iteration.
548 // - Otherwise, the caller continues iteration.
549 callback_pending_ = false;
550
551 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
552 // the loop. Otherwise, return.
553 if (save_next_cookie_running_)
554 return;
555
556 SaveNextCookie();
557 }
558
GetURLForCookies() const559 GURL WebSocketJob::GetURLForCookies() const {
560 GURL url = socket_->url();
561 std::string scheme = socket_->is_secure() ? "https" : "http";
562 url::Replacements<char> replacements;
563 replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
564 return url.ReplaceComponents(replacements);
565 }
566
address_list() const567 const AddressList& WebSocketJob::address_list() const {
568 return addresses_;
569 }
570
TrySpdyStream()571 int WebSocketJob::TrySpdyStream() {
572 if (!socket_.get())
573 return ERR_FAILED;
574
575 // Check if we have a SPDY session available.
576 HttpTransactionFactory* factory =
577 socket_->context()->http_transaction_factory();
578 if (!factory)
579 return OK;
580 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
581 if (!session.get() || !session->params().enable_websocket_over_spdy)
582 return OK;
583 SpdySessionPool* spdy_pool = session->spdy_session_pool();
584 PrivacyMode privacy_mode = socket_->privacy_mode();
585 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
586 socket_->proxy_server(), privacy_mode);
587 // Forbid wss downgrade to SPDY without SSL.
588 // TODO(toyoshim): Does it realize the same policy with HTTP?
589 base::WeakPtr<SpdySession> spdy_session =
590 spdy_pool->FindAvailableSession(key, *socket_->net_log());
591 if (!spdy_session)
592 return OK;
593
594 SSLInfo ssl_info;
595 bool was_npn_negotiated;
596 NextProto protocol_negotiated = kProtoUnknown;
597 bool use_ssl = spdy_session->GetSSLInfo(
598 &ssl_info, &was_npn_negotiated, &protocol_negotiated);
599 if (socket_->is_secure() && !use_ssl)
600 return OK;
601
602 // Create SpdyWebSocketStream.
603 spdy_protocol_version_ = spdy_session->GetProtocolVersion();
604 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
605
606 int result = spdy_websocket_stream_->InitializeStream(
607 socket_->url(), MEDIUM, *socket_->net_log());
608 if (result == OK) {
609 OnConnected(socket_.get(), kMaxPendingSendAllowed);
610 return ERR_PROTOCOL_SWITCHED;
611 }
612 if (result != ERR_IO_PENDING) {
613 spdy_websocket_stream_.reset();
614 return OK;
615 }
616
617 return ERR_IO_PENDING;
618 }
619
SetWaiting()620 void WebSocketJob::SetWaiting() {
621 waiting_ = true;
622 }
623
IsWaiting() const624 bool WebSocketJob::IsWaiting() const {
625 return waiting_;
626 }
627
Wakeup()628 void WebSocketJob::Wakeup() {
629 if (!waiting_)
630 return;
631 waiting_ = false;
632 DCHECK(!callback_.is_null());
633 base::MessageLoopForIO::current()->PostTask(
634 FROM_HERE,
635 base::Bind(&WebSocketJob::RetryPendingIO,
636 weak_ptr_factory_.GetWeakPtr()));
637 }
638
RetryPendingIO()639 void WebSocketJob::RetryPendingIO() {
640 int result = TrySpdyStream();
641
642 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
643 // OnCreatedSpdyStream().
644 if (result != ERR_IO_PENDING)
645 CompleteIO(result);
646 }
647
CompleteIO(int result)648 void WebSocketJob::CompleteIO(int result) {
649 // |callback_| may be null if OnClose() or DetachDelegate() was called.
650 if (!callback_.is_null()) {
651 CompletionCallback callback = callback_;
652 callback_.Reset();
653 callback.Run(result);
654 Release(); // Balanced with OnStartOpenConnection().
655 }
656 }
657
SendDataInternal(const char * data,int length)658 bool WebSocketJob::SendDataInternal(const char* data, int length) {
659 if (spdy_websocket_stream_.get())
660 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
661 if (socket_.get())
662 return socket_->SendData(data, length);
663 return false;
664 }
665
CloseInternal()666 void WebSocketJob::CloseInternal() {
667 if (spdy_websocket_stream_.get())
668 spdy_websocket_stream_->Close();
669 if (socket_.get())
670 socket_->Close();
671 }
672
SendPending()673 void WebSocketJob::SendPending() {
674 if (current_send_buffer_.get())
675 return;
676
677 // Current buffer has been sent. Try next if any.
678 if (send_buffer_queue_.empty()) {
679 // No more data to send.
680 if (state_ == CLOSING)
681 CloseInternal();
682 return;
683 }
684
685 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
686 send_buffer_queue_.pop_front();
687 current_send_buffer_ =
688 new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
689 SendDataInternal(current_send_buffer_->data(),
690 current_send_buffer_->BytesRemaining());
691 }
692
693 } // namespace net
694