• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_job.h"
6 
7 #include <algorithm>
8 
9 #include "base/lazy_instance.h"
10 #include "base/string_tokenizer.h"
11 #include "googleurl/src/gurl.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/base/cookie_policy.h"
15 #include "net/base/cookie_store.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_util.h"
18 #include "net/url_request/url_request_context.h"
19 #include "net/websockets/websocket_frame_handler.h"
20 #include "net/websockets/websocket_handshake_handler.h"
21 #include "net/websockets/websocket_net_log_params.h"
22 #include "net/websockets/websocket_throttle.h"
23 
24 namespace {
25 
26 // lower-case header names.
27 const char* const kCookieHeaders[] = {
28   "cookie", "cookie2"
29 };
30 const char* const kSetCookieHeaders[] = {
31   "set-cookie", "set-cookie2"
32 };
33 
WebSocketJobFactory(const GURL & url,net::SocketStream::Delegate * delegate)34 net::SocketStreamJob* WebSocketJobFactory(
35     const GURL& url, net::SocketStream::Delegate* delegate) {
36   net::WebSocketJob* job = new net::WebSocketJob(delegate);
37   job->InitSocketStream(new net::SocketStream(url, job));
38   return job;
39 }
40 
41 class WebSocketJobInitSingleton {
42  private:
43   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
WebSocketJobInitSingleton()44   WebSocketJobInitSingleton() {
45     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
46     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
47   }
48 };
49 
50 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init(
51     base::LINKER_INITIALIZED);
52 
53 }  // anonymous namespace
54 
55 namespace net {
56 
57 // static
EnsureInit()58 void WebSocketJob::EnsureInit() {
59   g_websocket_job_init.Get();
60 }
61 
WebSocketJob(SocketStream::Delegate * delegate)62 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
63     : delegate_(delegate),
64       state_(INITIALIZED),
65       waiting_(false),
66       callback_(NULL),
67       handshake_request_(new WebSocketHandshakeRequestHandler),
68       handshake_response_(new WebSocketHandshakeResponseHandler),
69       handshake_request_sent_(0),
70       response_cookies_save_index_(0),
71       send_frame_handler_(new WebSocketFrameHandler),
72       receive_frame_handler_(new WebSocketFrameHandler) {
73 }
74 
~WebSocketJob()75 WebSocketJob::~WebSocketJob() {
76   DCHECK_EQ(CLOSED, state_);
77   DCHECK(!delegate_);
78   DCHECK(!socket_.get());
79 }
80 
Connect()81 void WebSocketJob::Connect() {
82   DCHECK(socket_.get());
83   DCHECK_EQ(state_, INITIALIZED);
84   state_ = CONNECTING;
85   socket_->Connect();
86 }
87 
SendData(const char * data,int len)88 bool WebSocketJob::SendData(const char* data, int len) {
89   switch (state_) {
90     case INITIALIZED:
91       return false;
92 
93     case CONNECTING:
94       return SendHandshakeRequest(data, len);
95 
96     case OPEN:
97       {
98         send_frame_handler_->AppendData(data, len);
99         // If current buffer is sending now, this data will be sent in
100         // SendPending() after current data was sent.
101         // Do not buffer sending data for now.  Since
102         // WebCore::SocketStreamHandle controls traffic to keep number of
103         // pending bytes less than max_pending_send_allowed, so when sending
104         // larger message than max_pending_send_allowed should not be buffered.
105         // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
106         // sending more data when pending data reaches max_pending_send_allowed.
107         // TODO(ukai): Fix this to support compression for larger message.
108         int err = 0;
109         if (!send_frame_handler_->GetCurrentBuffer() &&
110             (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
111           DCHECK(!current_buffer_);
112           current_buffer_ = new DrainableIOBuffer(
113               send_frame_handler_->GetCurrentBuffer(),
114               send_frame_handler_->GetCurrentBufferSize());
115           return socket_->SendData(
116               current_buffer_->data(), current_buffer_->BytesRemaining());
117         }
118         return err >= 0;
119       }
120 
121     case CLOSING:
122     case CLOSED:
123       return false;
124   }
125   return false;
126 }
127 
Close()128 void WebSocketJob::Close() {
129   state_ = CLOSING;
130   if (current_buffer_) {
131     // Will close in SendPending.
132     return;
133   }
134   state_ = CLOSED;
135   socket_->Close();
136 }
137 
RestartWithAuth(const string16 & username,const string16 & password)138 void WebSocketJob::RestartWithAuth(
139     const string16& username,
140     const string16& password) {
141   state_ = CONNECTING;
142   socket_->RestartWithAuth(username, password);
143 }
144 
DetachDelegate()145 void WebSocketJob::DetachDelegate() {
146   state_ = CLOSED;
147   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
148   WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
149 
150   scoped_refptr<WebSocketJob> protect(this);
151 
152   delegate_ = NULL;
153   if (socket_)
154     socket_->DetachDelegate();
155   socket_ = NULL;
156   if (callback_) {
157     waiting_ = false;
158     callback_ = NULL;
159     Release();  // Balanced with OnStartOpenConnection().
160   }
161 }
162 
OnStartOpenConnection(SocketStream * socket,CompletionCallback * callback)163 int WebSocketJob::OnStartOpenConnection(
164     SocketStream* socket, CompletionCallback* callback) {
165   DCHECK(!callback_);
166   state_ = CONNECTING;
167   addresses_.Copy(socket->address_list().head(), true);
168   WebSocketThrottle::GetInstance()->PutInQueue(this);
169   if (!waiting_)
170     return OK;
171   callback_ = callback;
172   AddRef();  // Balanced when callback_ becomes NULL.
173   return ERR_IO_PENDING;
174 }
175 
OnConnected(SocketStream * socket,int max_pending_send_allowed)176 void WebSocketJob::OnConnected(
177     SocketStream* socket, int max_pending_send_allowed) {
178   if (state_ == CLOSED)
179     return;
180   DCHECK_EQ(CONNECTING, state_);
181   if (delegate_)
182     delegate_->OnConnected(socket, max_pending_send_allowed);
183 }
184 
OnSentData(SocketStream * socket,int amount_sent)185 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
186   DCHECK_NE(INITIALIZED, state_);
187   if (state_ == CLOSED)
188     return;
189   if (state_ == CONNECTING) {
190     OnSentHandshakeRequest(socket, amount_sent);
191     return;
192   }
193   if (delegate_) {
194     DCHECK(state_ == OPEN || state_ == CLOSING);
195     DCHECK_GT(amount_sent, 0);
196     DCHECK(current_buffer_);
197     current_buffer_->DidConsume(amount_sent);
198     if (current_buffer_->BytesRemaining() > 0)
199       return;
200 
201     // We need to report amount_sent of original buffer size, instead of
202     // amount sent to |socket|.
203     amount_sent = send_frame_handler_->GetOriginalBufferSize();
204     DCHECK_GT(amount_sent, 0);
205     current_buffer_ = NULL;
206     send_frame_handler_->ReleaseCurrentBuffer();
207     delegate_->OnSentData(socket, amount_sent);
208     MessageLoopForIO::current()->PostTask(
209         FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
210   }
211 }
212 
OnReceivedData(SocketStream * socket,const char * data,int len)213 void WebSocketJob::OnReceivedData(
214     SocketStream* socket, const char* data, int len) {
215   DCHECK_NE(INITIALIZED, state_);
216   if (state_ == CLOSED)
217     return;
218   if (state_ == CONNECTING) {
219     OnReceivedHandshakeResponse(socket, data, len);
220     return;
221   }
222   DCHECK(state_ == OPEN || state_ == CLOSING);
223   std::string received_data;
224   receive_frame_handler_->AppendData(data, len);
225   // Don't buffer receiving data for now.
226   // TODO(ukai): fix performance of WebSocketFrameHandler.
227   while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
228     received_data +=
229         std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
230                     receive_frame_handler_->GetCurrentBufferSize());
231     receive_frame_handler_->ReleaseCurrentBuffer();
232   }
233   if (delegate_ && !received_data.empty())
234       delegate_->OnReceivedData(
235           socket, received_data.data(), received_data.size());
236 }
237 
OnClose(SocketStream * socket)238 void WebSocketJob::OnClose(SocketStream* socket) {
239   state_ = CLOSED;
240   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
241   WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
242 
243   scoped_refptr<WebSocketJob> protect(this);
244 
245   SocketStream::Delegate* delegate = delegate_;
246   delegate_ = NULL;
247   socket_ = NULL;
248   if (callback_) {
249     waiting_ = false;
250     callback_ = NULL;
251     Release();  // Balanced with OnStartOpenConnection().
252   }
253   if (delegate)
254     delegate->OnClose(socket);
255 }
256 
OnAuthRequired(SocketStream * socket,AuthChallengeInfo * auth_info)257 void WebSocketJob::OnAuthRequired(
258     SocketStream* socket, AuthChallengeInfo* auth_info) {
259   if (delegate_)
260     delegate_->OnAuthRequired(socket, auth_info);
261 }
262 
OnError(const SocketStream * socket,int error)263 void WebSocketJob::OnError(const SocketStream* socket, int error) {
264   if (delegate_)
265     delegate_->OnError(socket, error);
266 }
267 
SendHandshakeRequest(const char * data,int len)268 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
269   DCHECK_EQ(state_, CONNECTING);
270   if (!handshake_request_->ParseRequest(data, len))
271     return false;
272 
273   // handshake message is completed.
274   AddCookieHeaderAndSend();
275   // Just buffered in |handshake_request_|.
276   return true;
277 }
278 
AddCookieHeaderAndSend()279 void WebSocketJob::AddCookieHeaderAndSend() {
280   int policy = OK;
281   if (socket_->context()->cookie_policy()) {
282     GURL url_for_cookies = GetURLForCookies();
283     policy = socket_->context()->cookie_policy()->CanGetCookies(
284         url_for_cookies,
285         url_for_cookies);
286   }
287   DCHECK_NE(ERR_IO_PENDING, policy);
288   OnCanGetCookiesCompleted(policy);
289 }
290 
OnCanGetCookiesCompleted(int policy)291 void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
292   if (socket_ && delegate_ && state_ == CONNECTING) {
293     handshake_request_->RemoveHeaders(
294         kCookieHeaders, arraysize(kCookieHeaders));
295     if (policy == OK) {
296       // Add cookies, including HttpOnly cookies.
297       if (socket_->context()->cookie_store()) {
298         CookieOptions cookie_options;
299         cookie_options.set_include_httponly();
300         std::string cookie =
301             socket_->context()->cookie_store()->GetCookiesWithOptions(
302                 GetURLForCookies(), cookie_options);
303         if (!cookie.empty())
304           handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
305       }
306     }
307 
308     const std::string& handshake_request = handshake_request_->GetRawRequest();
309     handshake_request_sent_ = 0;
310     socket_->net_log()->AddEvent(
311         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
312         make_scoped_refptr(
313             new NetLogWebSocketHandshakeParameter(handshake_request)));
314     socket_->SendData(handshake_request.data(),
315                       handshake_request.size());
316   }
317 }
318 
OnSentHandshakeRequest(SocketStream * socket,int amount_sent)319 void WebSocketJob::OnSentHandshakeRequest(
320     SocketStream* socket, int amount_sent) {
321   DCHECK_EQ(state_, CONNECTING);
322   handshake_request_sent_ += amount_sent;
323   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
324   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
325     // handshake request has been sent.
326     // notify original size of handshake request to delegate.
327     if (delegate_)
328       delegate_->OnSentData(
329           socket,
330           handshake_request_->original_length());
331     handshake_request_.reset();
332   }
333 }
334 
OnReceivedHandshakeResponse(SocketStream * socket,const char * data,int len)335 void WebSocketJob::OnReceivedHandshakeResponse(
336     SocketStream* socket, const char* data, int len) {
337   DCHECK_EQ(state_, CONNECTING);
338   if (handshake_response_->HasResponse()) {
339     // If we already has handshake response, received data should be frame
340     // data, not handshake message.
341     receive_frame_handler_->AppendData(data, len);
342     return;
343   }
344 
345   size_t response_length = handshake_response_->ParseRawResponse(data, len);
346   if (!handshake_response_->HasResponse()) {
347     // not yet. we need more data.
348     return;
349   }
350   // handshake message is completed.
351   socket_->net_log()->AddEvent(
352       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
353       make_scoped_refptr(new NetLogWebSocketHandshakeParameter(
354           handshake_response_->GetRawResponse())));
355   if (len - response_length > 0) {
356     // If we received extra data, it should be frame data.
357     receive_frame_handler_->AppendData(data + response_length,
358                                        len - response_length);
359   }
360   SaveCookiesAndNotifyHeaderComplete();
361 }
362 
SaveCookiesAndNotifyHeaderComplete()363 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
364   // handshake message is completed.
365   DCHECK(handshake_response_->HasResponse());
366 
367   response_cookies_.clear();
368   response_cookies_save_index_ = 0;
369 
370   handshake_response_->GetHeaders(
371       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
372 
373   // Now, loop over the response cookies, and attempt to persist each.
374   SaveNextCookie();
375 }
376 
SaveNextCookie()377 void WebSocketJob::SaveNextCookie() {
378   if (response_cookies_save_index_ == response_cookies_.size()) {
379     response_cookies_.clear();
380     response_cookies_save_index_ = 0;
381 
382     // Remove cookie headers, with malformed headers preserved.
383     // Actual handshake should be done in WebKit.
384     handshake_response_->RemoveHeaders(
385         kSetCookieHeaders, arraysize(kSetCookieHeaders));
386     std::string received_data = handshake_response_->GetResponse();
387     // Don't buffer receiving data for now.
388     // TODO(ukai): fix performance of WebSocketFrameHandler.
389     while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
390       received_data +=
391           std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
392                       receive_frame_handler_->GetCurrentBufferSize());
393       receive_frame_handler_->ReleaseCurrentBuffer();
394     }
395 
396     state_ = OPEN;
397     if (delegate_)
398       delegate_->OnReceivedData(
399           socket_, received_data.data(), received_data.size());
400 
401     handshake_response_.reset();
402 
403     WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
404     WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
405     return;
406   }
407 
408   int policy = OK;
409   if (socket_->context()->cookie_policy()) {
410     GURL url_for_cookies = GetURLForCookies();
411     policy = socket_->context()->cookie_policy()->CanSetCookie(
412         url_for_cookies,
413         url_for_cookies,
414         response_cookies_[response_cookies_save_index_]);
415   }
416 
417   DCHECK_NE(ERR_IO_PENDING, policy);
418   OnCanSetCookieCompleted(policy);
419 }
420 
OnCanSetCookieCompleted(int policy)421 void WebSocketJob::OnCanSetCookieCompleted(int policy) {
422   if (socket_ && delegate_ && state_ == CONNECTING) {
423     if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
424         socket_->context()->cookie_store()) {
425       CookieOptions options;
426       options.set_include_httponly();
427       if (policy == OK_FOR_SESSION_ONLY)
428         options.set_force_session();
429       GURL url_for_cookies = GetURLForCookies();
430       socket_->context()->cookie_store()->SetCookieWithOptions(
431           url_for_cookies, response_cookies_[response_cookies_save_index_],
432           options);
433     }
434     response_cookies_save_index_++;
435     SaveNextCookie();
436   }
437 }
438 
GetURLForCookies() const439 GURL WebSocketJob::GetURLForCookies() const {
440   GURL url = socket_->url();
441   std::string scheme = socket_->is_secure() ? "https" : "http";
442   url_canon::Replacements<char> replacements;
443   replacements.SetScheme(scheme.c_str(),
444                          url_parse::Component(0, scheme.length()));
445   return url.ReplaceComponents(replacements);
446 }
447 
address_list() const448 const AddressList& WebSocketJob::address_list() const {
449   return addresses_;
450 }
451 
SetWaiting()452 void WebSocketJob::SetWaiting() {
453   waiting_ = true;
454 }
455 
IsWaiting() const456 bool WebSocketJob::IsWaiting() const {
457   return waiting_;
458 }
459 
Wakeup()460 void WebSocketJob::Wakeup() {
461   if (!waiting_)
462     return;
463   waiting_ = false;
464   DCHECK(callback_);
465   MessageLoopForIO::current()->PostTask(
466       FROM_HERE,
467       NewRunnableMethod(this,
468                         &WebSocketJob::DoCallback));
469 }
470 
DoCallback()471 void WebSocketJob::DoCallback() {
472   // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
473   if (callback_) {
474     net::CompletionCallback* callback = callback_;
475     callback_ = NULL;
476     callback->Run(net::OK);
477     Release();  // Balanced with OnStartOpenConnection().
478   }
479 }
480 
SendPending()481 void WebSocketJob::SendPending() {
482   if (current_buffer_)
483     return;
484   // Current buffer is done.  Try next buffer if any.
485   // Don't buffer sending data. See comment on case OPEN in SendData().
486   if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
487     // No more data to send.
488     if (state_ == CLOSING)
489       socket_->Close();
490     return;
491   }
492   current_buffer_ = new DrainableIOBuffer(
493       send_frame_handler_->GetCurrentBuffer(),
494       send_frame_handler_->GetCurrentBufferSize());
495   socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
496 }
497 
498 }  // namespace net
499