1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_job.h"
6
7 #include <algorithm>
8
9 #include "base/lazy_instance.h"
10 #include "base/string_tokenizer.h"
11 #include "googleurl/src/gurl.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/base/cookie_policy.h"
15 #include "net/base/cookie_store.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_util.h"
18 #include "net/url_request/url_request_context.h"
19 #include "net/websockets/websocket_frame_handler.h"
20 #include "net/websockets/websocket_handshake_handler.h"
21 #include "net/websockets/websocket_net_log_params.h"
22 #include "net/websockets/websocket_throttle.h"
23
24 namespace {
25
26 // lower-case header names.
27 const char* const kCookieHeaders[] = {
28 "cookie", "cookie2"
29 };
30 const char* const kSetCookieHeaders[] = {
31 "set-cookie", "set-cookie2"
32 };
33
WebSocketJobFactory(const GURL & url,net::SocketStream::Delegate * delegate)34 net::SocketStreamJob* WebSocketJobFactory(
35 const GURL& url, net::SocketStream::Delegate* delegate) {
36 net::WebSocketJob* job = new net::WebSocketJob(delegate);
37 job->InitSocketStream(new net::SocketStream(url, job));
38 return job;
39 }
40
41 class WebSocketJobInitSingleton {
42 private:
43 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton()44 WebSocketJobInitSingleton() {
45 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
46 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
47 }
48 };
49
50 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init(
51 base::LINKER_INITIALIZED);
52
53 } // anonymous namespace
54
55 namespace net {
56
57 // static
EnsureInit()58 void WebSocketJob::EnsureInit() {
59 g_websocket_job_init.Get();
60 }
61
WebSocketJob(SocketStream::Delegate * delegate)62 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
63 : delegate_(delegate),
64 state_(INITIALIZED),
65 waiting_(false),
66 callback_(NULL),
67 handshake_request_(new WebSocketHandshakeRequestHandler),
68 handshake_response_(new WebSocketHandshakeResponseHandler),
69 handshake_request_sent_(0),
70 response_cookies_save_index_(0),
71 send_frame_handler_(new WebSocketFrameHandler),
72 receive_frame_handler_(new WebSocketFrameHandler) {
73 }
74
~WebSocketJob()75 WebSocketJob::~WebSocketJob() {
76 DCHECK_EQ(CLOSED, state_);
77 DCHECK(!delegate_);
78 DCHECK(!socket_.get());
79 }
80
Connect()81 void WebSocketJob::Connect() {
82 DCHECK(socket_.get());
83 DCHECK_EQ(state_, INITIALIZED);
84 state_ = CONNECTING;
85 socket_->Connect();
86 }
87
SendData(const char * data,int len)88 bool WebSocketJob::SendData(const char* data, int len) {
89 switch (state_) {
90 case INITIALIZED:
91 return false;
92
93 case CONNECTING:
94 return SendHandshakeRequest(data, len);
95
96 case OPEN:
97 {
98 send_frame_handler_->AppendData(data, len);
99 // If current buffer is sending now, this data will be sent in
100 // SendPending() after current data was sent.
101 // Do not buffer sending data for now. Since
102 // WebCore::SocketStreamHandle controls traffic to keep number of
103 // pending bytes less than max_pending_send_allowed, so when sending
104 // larger message than max_pending_send_allowed should not be buffered.
105 // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
106 // sending more data when pending data reaches max_pending_send_allowed.
107 // TODO(ukai): Fix this to support compression for larger message.
108 int err = 0;
109 if (!send_frame_handler_->GetCurrentBuffer() &&
110 (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
111 DCHECK(!current_buffer_);
112 current_buffer_ = new DrainableIOBuffer(
113 send_frame_handler_->GetCurrentBuffer(),
114 send_frame_handler_->GetCurrentBufferSize());
115 return socket_->SendData(
116 current_buffer_->data(), current_buffer_->BytesRemaining());
117 }
118 return err >= 0;
119 }
120
121 case CLOSING:
122 case CLOSED:
123 return false;
124 }
125 return false;
126 }
127
Close()128 void WebSocketJob::Close() {
129 state_ = CLOSING;
130 if (current_buffer_) {
131 // Will close in SendPending.
132 return;
133 }
134 state_ = CLOSED;
135 socket_->Close();
136 }
137
RestartWithAuth(const string16 & username,const string16 & password)138 void WebSocketJob::RestartWithAuth(
139 const string16& username,
140 const string16& password) {
141 state_ = CONNECTING;
142 socket_->RestartWithAuth(username, password);
143 }
144
DetachDelegate()145 void WebSocketJob::DetachDelegate() {
146 state_ = CLOSED;
147 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
148 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
149
150 scoped_refptr<WebSocketJob> protect(this);
151
152 delegate_ = NULL;
153 if (socket_)
154 socket_->DetachDelegate();
155 socket_ = NULL;
156 if (callback_) {
157 waiting_ = false;
158 callback_ = NULL;
159 Release(); // Balanced with OnStartOpenConnection().
160 }
161 }
162
OnStartOpenConnection(SocketStream * socket,CompletionCallback * callback)163 int WebSocketJob::OnStartOpenConnection(
164 SocketStream* socket, CompletionCallback* callback) {
165 DCHECK(!callback_);
166 state_ = CONNECTING;
167 addresses_.Copy(socket->address_list().head(), true);
168 WebSocketThrottle::GetInstance()->PutInQueue(this);
169 if (!waiting_)
170 return OK;
171 callback_ = callback;
172 AddRef(); // Balanced when callback_ becomes NULL.
173 return ERR_IO_PENDING;
174 }
175
OnConnected(SocketStream * socket,int max_pending_send_allowed)176 void WebSocketJob::OnConnected(
177 SocketStream* socket, int max_pending_send_allowed) {
178 if (state_ == CLOSED)
179 return;
180 DCHECK_EQ(CONNECTING, state_);
181 if (delegate_)
182 delegate_->OnConnected(socket, max_pending_send_allowed);
183 }
184
OnSentData(SocketStream * socket,int amount_sent)185 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
186 DCHECK_NE(INITIALIZED, state_);
187 if (state_ == CLOSED)
188 return;
189 if (state_ == CONNECTING) {
190 OnSentHandshakeRequest(socket, amount_sent);
191 return;
192 }
193 if (delegate_) {
194 DCHECK(state_ == OPEN || state_ == CLOSING);
195 DCHECK_GT(amount_sent, 0);
196 DCHECK(current_buffer_);
197 current_buffer_->DidConsume(amount_sent);
198 if (current_buffer_->BytesRemaining() > 0)
199 return;
200
201 // We need to report amount_sent of original buffer size, instead of
202 // amount sent to |socket|.
203 amount_sent = send_frame_handler_->GetOriginalBufferSize();
204 DCHECK_GT(amount_sent, 0);
205 current_buffer_ = NULL;
206 send_frame_handler_->ReleaseCurrentBuffer();
207 delegate_->OnSentData(socket, amount_sent);
208 MessageLoopForIO::current()->PostTask(
209 FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
210 }
211 }
212
OnReceivedData(SocketStream * socket,const char * data,int len)213 void WebSocketJob::OnReceivedData(
214 SocketStream* socket, const char* data, int len) {
215 DCHECK_NE(INITIALIZED, state_);
216 if (state_ == CLOSED)
217 return;
218 if (state_ == CONNECTING) {
219 OnReceivedHandshakeResponse(socket, data, len);
220 return;
221 }
222 DCHECK(state_ == OPEN || state_ == CLOSING);
223 std::string received_data;
224 receive_frame_handler_->AppendData(data, len);
225 // Don't buffer receiving data for now.
226 // TODO(ukai): fix performance of WebSocketFrameHandler.
227 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
228 received_data +=
229 std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
230 receive_frame_handler_->GetCurrentBufferSize());
231 receive_frame_handler_->ReleaseCurrentBuffer();
232 }
233 if (delegate_ && !received_data.empty())
234 delegate_->OnReceivedData(
235 socket, received_data.data(), received_data.size());
236 }
237
OnClose(SocketStream * socket)238 void WebSocketJob::OnClose(SocketStream* socket) {
239 state_ = CLOSED;
240 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
241 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
242
243 scoped_refptr<WebSocketJob> protect(this);
244
245 SocketStream::Delegate* delegate = delegate_;
246 delegate_ = NULL;
247 socket_ = NULL;
248 if (callback_) {
249 waiting_ = false;
250 callback_ = NULL;
251 Release(); // Balanced with OnStartOpenConnection().
252 }
253 if (delegate)
254 delegate->OnClose(socket);
255 }
256
OnAuthRequired(SocketStream * socket,AuthChallengeInfo * auth_info)257 void WebSocketJob::OnAuthRequired(
258 SocketStream* socket, AuthChallengeInfo* auth_info) {
259 if (delegate_)
260 delegate_->OnAuthRequired(socket, auth_info);
261 }
262
OnError(const SocketStream * socket,int error)263 void WebSocketJob::OnError(const SocketStream* socket, int error) {
264 if (delegate_)
265 delegate_->OnError(socket, error);
266 }
267
SendHandshakeRequest(const char * data,int len)268 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
269 DCHECK_EQ(state_, CONNECTING);
270 if (!handshake_request_->ParseRequest(data, len))
271 return false;
272
273 // handshake message is completed.
274 AddCookieHeaderAndSend();
275 // Just buffered in |handshake_request_|.
276 return true;
277 }
278
AddCookieHeaderAndSend()279 void WebSocketJob::AddCookieHeaderAndSend() {
280 int policy = OK;
281 if (socket_->context()->cookie_policy()) {
282 GURL url_for_cookies = GetURLForCookies();
283 policy = socket_->context()->cookie_policy()->CanGetCookies(
284 url_for_cookies,
285 url_for_cookies);
286 }
287 DCHECK_NE(ERR_IO_PENDING, policy);
288 OnCanGetCookiesCompleted(policy);
289 }
290
OnCanGetCookiesCompleted(int policy)291 void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
292 if (socket_ && delegate_ && state_ == CONNECTING) {
293 handshake_request_->RemoveHeaders(
294 kCookieHeaders, arraysize(kCookieHeaders));
295 if (policy == OK) {
296 // Add cookies, including HttpOnly cookies.
297 if (socket_->context()->cookie_store()) {
298 CookieOptions cookie_options;
299 cookie_options.set_include_httponly();
300 std::string cookie =
301 socket_->context()->cookie_store()->GetCookiesWithOptions(
302 GetURLForCookies(), cookie_options);
303 if (!cookie.empty())
304 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
305 }
306 }
307
308 const std::string& handshake_request = handshake_request_->GetRawRequest();
309 handshake_request_sent_ = 0;
310 socket_->net_log()->AddEvent(
311 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
312 make_scoped_refptr(
313 new NetLogWebSocketHandshakeParameter(handshake_request)));
314 socket_->SendData(handshake_request.data(),
315 handshake_request.size());
316 }
317 }
318
OnSentHandshakeRequest(SocketStream * socket,int amount_sent)319 void WebSocketJob::OnSentHandshakeRequest(
320 SocketStream* socket, int amount_sent) {
321 DCHECK_EQ(state_, CONNECTING);
322 handshake_request_sent_ += amount_sent;
323 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
324 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
325 // handshake request has been sent.
326 // notify original size of handshake request to delegate.
327 if (delegate_)
328 delegate_->OnSentData(
329 socket,
330 handshake_request_->original_length());
331 handshake_request_.reset();
332 }
333 }
334
OnReceivedHandshakeResponse(SocketStream * socket,const char * data,int len)335 void WebSocketJob::OnReceivedHandshakeResponse(
336 SocketStream* socket, const char* data, int len) {
337 DCHECK_EQ(state_, CONNECTING);
338 if (handshake_response_->HasResponse()) {
339 // If we already has handshake response, received data should be frame
340 // data, not handshake message.
341 receive_frame_handler_->AppendData(data, len);
342 return;
343 }
344
345 size_t response_length = handshake_response_->ParseRawResponse(data, len);
346 if (!handshake_response_->HasResponse()) {
347 // not yet. we need more data.
348 return;
349 }
350 // handshake message is completed.
351 socket_->net_log()->AddEvent(
352 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
353 make_scoped_refptr(new NetLogWebSocketHandshakeParameter(
354 handshake_response_->GetRawResponse())));
355 if (len - response_length > 0) {
356 // If we received extra data, it should be frame data.
357 receive_frame_handler_->AppendData(data + response_length,
358 len - response_length);
359 }
360 SaveCookiesAndNotifyHeaderComplete();
361 }
362
SaveCookiesAndNotifyHeaderComplete()363 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
364 // handshake message is completed.
365 DCHECK(handshake_response_->HasResponse());
366
367 response_cookies_.clear();
368 response_cookies_save_index_ = 0;
369
370 handshake_response_->GetHeaders(
371 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
372
373 // Now, loop over the response cookies, and attempt to persist each.
374 SaveNextCookie();
375 }
376
SaveNextCookie()377 void WebSocketJob::SaveNextCookie() {
378 if (response_cookies_save_index_ == response_cookies_.size()) {
379 response_cookies_.clear();
380 response_cookies_save_index_ = 0;
381
382 // Remove cookie headers, with malformed headers preserved.
383 // Actual handshake should be done in WebKit.
384 handshake_response_->RemoveHeaders(
385 kSetCookieHeaders, arraysize(kSetCookieHeaders));
386 std::string received_data = handshake_response_->GetResponse();
387 // Don't buffer receiving data for now.
388 // TODO(ukai): fix performance of WebSocketFrameHandler.
389 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
390 received_data +=
391 std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
392 receive_frame_handler_->GetCurrentBufferSize());
393 receive_frame_handler_->ReleaseCurrentBuffer();
394 }
395
396 state_ = OPEN;
397 if (delegate_)
398 delegate_->OnReceivedData(
399 socket_, received_data.data(), received_data.size());
400
401 handshake_response_.reset();
402
403 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
404 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
405 return;
406 }
407
408 int policy = OK;
409 if (socket_->context()->cookie_policy()) {
410 GURL url_for_cookies = GetURLForCookies();
411 policy = socket_->context()->cookie_policy()->CanSetCookie(
412 url_for_cookies,
413 url_for_cookies,
414 response_cookies_[response_cookies_save_index_]);
415 }
416
417 DCHECK_NE(ERR_IO_PENDING, policy);
418 OnCanSetCookieCompleted(policy);
419 }
420
OnCanSetCookieCompleted(int policy)421 void WebSocketJob::OnCanSetCookieCompleted(int policy) {
422 if (socket_ && delegate_ && state_ == CONNECTING) {
423 if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
424 socket_->context()->cookie_store()) {
425 CookieOptions options;
426 options.set_include_httponly();
427 if (policy == OK_FOR_SESSION_ONLY)
428 options.set_force_session();
429 GURL url_for_cookies = GetURLForCookies();
430 socket_->context()->cookie_store()->SetCookieWithOptions(
431 url_for_cookies, response_cookies_[response_cookies_save_index_],
432 options);
433 }
434 response_cookies_save_index_++;
435 SaveNextCookie();
436 }
437 }
438
GetURLForCookies() const439 GURL WebSocketJob::GetURLForCookies() const {
440 GURL url = socket_->url();
441 std::string scheme = socket_->is_secure() ? "https" : "http";
442 url_canon::Replacements<char> replacements;
443 replacements.SetScheme(scheme.c_str(),
444 url_parse::Component(0, scheme.length()));
445 return url.ReplaceComponents(replacements);
446 }
447
address_list() const448 const AddressList& WebSocketJob::address_list() const {
449 return addresses_;
450 }
451
SetWaiting()452 void WebSocketJob::SetWaiting() {
453 waiting_ = true;
454 }
455
IsWaiting() const456 bool WebSocketJob::IsWaiting() const {
457 return waiting_;
458 }
459
Wakeup()460 void WebSocketJob::Wakeup() {
461 if (!waiting_)
462 return;
463 waiting_ = false;
464 DCHECK(callback_);
465 MessageLoopForIO::current()->PostTask(
466 FROM_HERE,
467 NewRunnableMethod(this,
468 &WebSocketJob::DoCallback));
469 }
470
DoCallback()471 void WebSocketJob::DoCallback() {
472 // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
473 if (callback_) {
474 net::CompletionCallback* callback = callback_;
475 callback_ = NULL;
476 callback->Run(net::OK);
477 Release(); // Balanced with OnStartOpenConnection().
478 }
479 }
480
SendPending()481 void WebSocketJob::SendPending() {
482 if (current_buffer_)
483 return;
484 // Current buffer is done. Try next buffer if any.
485 // Don't buffer sending data. See comment on case OPEN in SendData().
486 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
487 // No more data to send.
488 if (state_ == CLOSING)
489 socket_->Close();
490 return;
491 }
492 current_buffer_ = new DrainableIOBuffer(
493 send_frame_handler_->GetCurrentBuffer(),
494 send_frame_handler_->GetCurrentBufferSize());
495 socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
496 }
497
498 } // namespace net
499