• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <algorithm>
6 #include <limits>
7 
8 #include "net/websockets/websocket.h"
9 
10 #include "base/message_loop.h"
11 #include "net/base/host_resolver.h"
12 #include "net/websockets/websocket_handshake.h"
13 #include "net/websockets/websocket_handshake_draft75.h"
14 
15 namespace net {
16 
17 static const char kClosingFrame[2] = {'\xff', '\x00'};
18 static int64 kClosingHandshakeTimeout = 1000;  // msec.
19 
WebSocket(Request * request,WebSocketDelegate * delegate)20 WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate)
21     : ready_state_(INITIALIZED),
22       request_(request),
23       handshake_(NULL),
24       delegate_(delegate),
25       origin_loop_(MessageLoop::current()),
26       socket_stream_(NULL),
27       max_pending_send_allowed_(0),
28       current_read_buf_(NULL),
29       read_consumed_len_(0),
30       current_write_buf_(NULL),
31       server_closing_handshake_(false),
32       client_closing_handshake_(false),
33       closing_handshake_started_(false),
34       force_close_task_(NULL),
35       closing_handshake_timeout_(kClosingHandshakeTimeout) {
36   DCHECK(request_.get());
37   DCHECK(delegate_);
38   DCHECK(origin_loop_);
39 }
40 
~WebSocket()41 WebSocket::~WebSocket() {
42   DCHECK(ready_state_ == INITIALIZED || !delegate_);
43   DCHECK(!socket_stream_);
44   DCHECK(!delegate_);
45 }
46 
Connect()47 void WebSocket::Connect() {
48   DCHECK(ready_state_ == INITIALIZED);
49   DCHECK(request_.get());
50   DCHECK(delegate_);
51   DCHECK(!socket_stream_);
52   DCHECK(MessageLoop::current() == origin_loop_);
53 
54   socket_stream_ = new SocketStream(request_->url(), this);
55   socket_stream_->set_context(request_->context());
56 
57   if (request_->host_resolver())
58     socket_stream_->SetHostResolver(request_->host_resolver());
59   if (request_->client_socket_factory())
60     socket_stream_->SetClientSocketFactory(request_->client_socket_factory());
61 
62   AddRef();  // Release in DoClose().
63   ready_state_ = CONNECTING;
64   socket_stream_->Connect();
65 }
66 
Send(const std::string & msg)67 void WebSocket::Send(const std::string& msg) {
68   if (ready_state_ == CLOSING || ready_state_ == CLOSED) {
69     return;
70   }
71   if (client_closing_handshake_) {
72     // We must not send any data after we start the WebSocket closing handshake.
73     return;
74   }
75   DCHECK(ready_state_ == OPEN);
76   DCHECK(MessageLoop::current() == origin_loop_);
77 
78   IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2);
79   char* p = buf->data();
80   *p = '\0';
81   memcpy(p + 1, msg.data(), msg.size());
82   *(p + 1 + msg.size()) = '\xff';
83   pending_write_bufs_.push_back(make_scoped_refptr(buf));
84   SendPending();
85 }
86 
Close()87 void WebSocket::Close() {
88   DCHECK(MessageLoop::current() == origin_loop_);
89 
90   // If connection has not yet started, do nothing.
91   if (ready_state_ == INITIALIZED) {
92     DCHECK(!socket_stream_);
93     ready_state_ = CLOSED;
94     return;
95   }
96 
97   // If the readyState attribute is in the CLOSING or CLOSED state, do nothing
98   if (ready_state_ == CLOSING || ready_state_ == CLOSED)
99     return;
100 
101   if (request_->version() == DRAFT75) {
102     DCHECK(socket_stream_);
103     socket_stream_->Close();
104     return;
105   }
106 
107   // If the WebSocket connection is not yet established, fail the WebSocket
108   // connection and set the readyState attribute's value to CLOSING.
109   if (ready_state_ == CONNECTING) {
110     ready_state_ = CLOSING;
111     origin_loop_->PostTask(
112         FROM_HERE,
113         NewRunnableMethod(this, &WebSocket::FailConnection));
114   }
115 
116   // If the WebSocket closing handshake has not yet been started, start
117   // the WebSocket closing handshake and set the readyState attribute's value
118   // to CLOSING.
119   if (!closing_handshake_started_) {
120     ready_state_ = CLOSING;
121     origin_loop_->PostTask(
122         FROM_HERE,
123         NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
124   }
125 
126   // Otherwise, set the readyState attribute's value to CLOSING.
127   ready_state_ = CLOSING;
128 }
129 
DetachDelegate()130 void WebSocket::DetachDelegate() {
131   if (!delegate_)
132     return;
133   delegate_ = NULL;
134   if (ready_state_ == INITIALIZED) {
135     DCHECK(!socket_stream_);
136     ready_state_ = CLOSED;
137     return;
138   }
139   if (ready_state_ != CLOSED) {
140     DCHECK(socket_stream_);
141     socket_stream_->Close();
142   }
143 }
144 
OnConnected(SocketStream * socket_stream,int max_pending_send_allowed)145 void WebSocket::OnConnected(SocketStream* socket_stream,
146                             int max_pending_send_allowed) {
147   DCHECK(socket_stream == socket_stream_);
148   max_pending_send_allowed_ = max_pending_send_allowed;
149 
150   // Use |max_pending_send_allowed| as hint for initial size of read buffer.
151   current_read_buf_ = new GrowableIOBuffer();
152   current_read_buf_->SetCapacity(max_pending_send_allowed_);
153   read_consumed_len_ = 0;
154 
155   DCHECK(!current_write_buf_);
156   DCHECK(!handshake_.get());
157   switch (request_->version()) {
158     case DEFAULT_VERSION:
159       handshake_.reset(new WebSocketHandshake(
160           request_->url(), request_->origin(), request_->location(),
161           request_->protocol()));
162       break;
163     case DRAFT75:
164       handshake_.reset(new WebSocketHandshakeDraft75(
165           request_->url(), request_->origin(), request_->location(),
166           request_->protocol()));
167       break;
168     default:
169       NOTREACHED() << "Unexpected protocol version:" << request_->version();
170   }
171 
172   const std::string msg = handshake_->CreateClientHandshakeMessage();
173   IOBufferWithSize* buf = new IOBufferWithSize(msg.size());
174   memcpy(buf->data(), msg.data(), msg.size());
175   pending_write_bufs_.push_back(make_scoped_refptr(buf));
176   origin_loop_->PostTask(FROM_HERE,
177                          NewRunnableMethod(this, &WebSocket::SendPending));
178 }
179 
OnSentData(SocketStream * socket_stream,int amount_sent)180 void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) {
181   DCHECK(socket_stream == socket_stream_);
182   DCHECK(current_write_buf_);
183   current_write_buf_->DidConsume(amount_sent);
184   DCHECK_GE(current_write_buf_->BytesRemaining(), 0);
185   if (current_write_buf_->BytesRemaining() == 0) {
186     current_write_buf_ = NULL;
187     pending_write_bufs_.pop_front();
188   }
189   origin_loop_->PostTask(FROM_HERE,
190                          NewRunnableMethod(this, &WebSocket::SendPending));
191 }
192 
OnReceivedData(SocketStream * socket_stream,const char * data,int len)193 void WebSocket::OnReceivedData(SocketStream* socket_stream,
194                                const char* data, int len) {
195   DCHECK(socket_stream == socket_stream_);
196   AddToReadBuffer(data, len);
197   origin_loop_->PostTask(FROM_HERE,
198                          NewRunnableMethod(this, &WebSocket::DoReceivedData));
199 }
200 
OnClose(SocketStream * socket_stream)201 void WebSocket::OnClose(SocketStream* socket_stream) {
202   origin_loop_->PostTask(FROM_HERE,
203                          NewRunnableMethod(this, &WebSocket::DoClose));
204 }
205 
OnError(const SocketStream * socket_stream,int error)206 void WebSocket::OnError(const SocketStream* socket_stream, int error) {
207   origin_loop_->PostTask(
208       FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error));
209 }
210 
SendPending()211 void WebSocket::SendPending() {
212   DCHECK(MessageLoop::current() == origin_loop_);
213   if (!socket_stream_) {
214     DCHECK_EQ(CLOSED, ready_state_);
215     return;
216   }
217   if (!current_write_buf_) {
218     if (pending_write_bufs_.empty()) {
219       if (client_closing_handshake_) {
220         // Already sent 0xFF and 0x00 bytes.
221         // *The WebSocket closing handshake has started.*
222         closing_handshake_started_ = true;
223         if (server_closing_handshake_) {
224           // 4.2 3-8-3 If the WebSocket connection is not already closed,
225           // then close the WebSocket connection.
226           // *The WebSocket closing handshake has finished*
227           socket_stream_->Close();
228         } else {
229           // 5. Wait a user-agent-determined length of time, or until the
230           // WebSocket connection is closed.
231           force_close_task_ =
232               NewRunnableMethod(this, &WebSocket::DoForceCloseConnection);
233           origin_loop_->PostDelayedTask(
234               FROM_HERE, force_close_task_, closing_handshake_timeout_);
235         }
236       }
237       return;
238     }
239     current_write_buf_ = new DrainableIOBuffer(
240         pending_write_bufs_.front(), pending_write_bufs_.front()->size());
241   }
242   DCHECK_GT(current_write_buf_->BytesRemaining(), 0);
243   bool sent = socket_stream_->SendData(
244       current_write_buf_->data(),
245       std::min(current_write_buf_->BytesRemaining(),
246                max_pending_send_allowed_));
247   DCHECK(sent);
248 }
249 
DoReceivedData()250 void WebSocket::DoReceivedData() {
251   DCHECK(MessageLoop::current() == origin_loop_);
252   scoped_refptr<WebSocket> protect(this);
253   switch (ready_state_) {
254     case CONNECTING:
255       {
256         DCHECK(handshake_.get());
257         DCHECK(current_read_buf_);
258         const char* data =
259             current_read_buf_->StartOfBuffer() + read_consumed_len_;
260         size_t len = current_read_buf_->offset() - read_consumed_len_;
261         int eoh = handshake_->ReadServerHandshake(data, len);
262         if (eoh < 0) {
263           // Not enough data,  Retry when more data is available.
264           return;
265         }
266         SkipReadBuffer(eoh);
267       }
268       if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) {
269         // Handshake failed.
270         socket_stream_->Close();
271         return;
272       }
273       ready_state_ = OPEN;
274       if (delegate_)
275         delegate_->OnOpen(this);
276       if (current_read_buf_->offset() == read_consumed_len_) {
277         // No remaining data after handshake message.
278         break;
279       }
280       // FALL THROUGH
281     case OPEN:
282     case CLOSING:  // need to process closing-frame from server.
283       ProcessFrameData();
284       break;
285 
286     case CLOSED:
287       // Closed just after DoReceivedData is queued on |origin_loop_|.
288       break;
289     default:
290       NOTREACHED();
291       break;
292   }
293 }
294 
ProcessFrameData()295 void WebSocket::ProcessFrameData() {
296   DCHECK(current_read_buf_);
297   if (server_closing_handshake_) {
298     // Any data on the connection after the 0xFF frame is discarded.
299     return;
300   }
301   scoped_refptr<WebSocket> protect(this);
302   const char* start_frame =
303       current_read_buf_->StartOfBuffer() + read_consumed_len_;
304   const char* next_frame = start_frame;
305   const char* p = next_frame;
306   const char* end =
307       current_read_buf_->StartOfBuffer() + current_read_buf_->offset();
308   while (p < end) {
309     // Let /error/ be false.
310     bool error = false;
311 
312     // Handle the /frame type/ byte as follows.
313     unsigned char frame_byte = static_cast<unsigned char>(*p++);
314     if ((frame_byte & 0x80) == 0x80) {
315       int length = 0;
316       while (p < end) {
317         if (length > std::numeric_limits<int>::max() / 128) {
318           // frame length overflow.
319           socket_stream_->Close();
320           return;
321         }
322         unsigned char c = static_cast<unsigned char>(*p);
323         length = length * 128 + (c & 0x7f);
324         ++p;
325         if ((c & 0x80) != 0x80)
326           break;
327       }
328       // Checks if the frame body hasn't been completely received yet.
329       // It also checks the case the frame length bytes haven't been completely
330       // received yet, because p == end and length > 0 in such case.
331       if (p + length < end) {
332         p += length;
333         next_frame = p;
334         if (request_->version() != DRAFT75 &&
335             frame_byte == 0xFF && length == 0) {
336           // 4.2 Data framing 3. Handle the /frame type/ byte.
337           // 8. If the /frame type/ is 0xFF and the /length/ was 0, then
338           // run the following substeps:
339           // 1. If the WebSocket closing handshake has not yet started, then
340           // start the WebSocket closing handshake.
341           server_closing_handshake_ = true;
342           if (!closing_handshake_started_) {
343             origin_loop_->PostTask(
344                 FROM_HERE,
345                 NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
346           } else {
347             // If the WebSocket closing handshake has been started and
348             // the WebSocket connection is not already closed, then close
349             // the WebSocket connection.
350             socket_stream_->Close();
351           }
352           return;
353         }
354         // 4.2 3-8 Otherwise, let /error/ be true.
355         error = true;
356       } else {
357         // Not enough data in buffer.
358         break;
359       }
360     } else {
361       const char* msg_start = p;
362       while (p < end && *p != '\xff')
363         ++p;
364       if (p < end && *p == '\xff') {
365         if (frame_byte == 0x00) {
366           if (delegate_) {
367             delegate_->OnMessage(this, std::string(msg_start, p - msg_start));
368           }
369         } else {
370           // Otherwise, discard the data and let /error/ to be true.
371           error = true;
372         }
373         ++p;
374         next_frame = p;
375       }
376     }
377     // If /error/ is true, then *a WebSocket error has been detected.*
378     if (error && delegate_)
379       delegate_->OnError(this);
380   }
381   SkipReadBuffer(next_frame - start_frame);
382 }
383 
AddToReadBuffer(const char * data,int len)384 void WebSocket::AddToReadBuffer(const char* data, int len) {
385   DCHECK(current_read_buf_);
386   // Check if |current_read_buf_| has enough space to store |len| of |data|.
387   if (len >= current_read_buf_->RemainingCapacity()) {
388     current_read_buf_->SetCapacity(
389         current_read_buf_->offset() + len);
390   }
391 
392   DCHECK(current_read_buf_->RemainingCapacity() >= len);
393   memcpy(current_read_buf_->data(), data, len);
394   current_read_buf_->set_offset(current_read_buf_->offset() + len);
395 }
396 
SkipReadBuffer(int len)397 void WebSocket::SkipReadBuffer(int len) {
398   if (len == 0)
399     return;
400   DCHECK_GT(len, 0);
401   read_consumed_len_ += len;
402   int remaining = current_read_buf_->offset() - read_consumed_len_;
403   DCHECK_GE(remaining, 0);
404   if (remaining < read_consumed_len_ &&
405       current_read_buf_->RemainingCapacity() < read_consumed_len_) {
406     // Pre compaction:
407     // 0             v-read_consumed_len_  v-offset               v- capacity
408     // |..processed..| .. remaining ..     | .. RemainingCapacity |
409     //
410     memmove(current_read_buf_->StartOfBuffer(),
411             current_read_buf_->StartOfBuffer() + read_consumed_len_,
412             remaining);
413     read_consumed_len_ = 0;
414     current_read_buf_->set_offset(remaining);
415     // Post compaction:
416     // 0read_consumed_len_  v- offset                             v- capacity
417     // |.. remaining ..     | ..  RemainingCapacity  ...          |
418     //
419   }
420 }
421 
StartClosingHandshake()422 void WebSocket::StartClosingHandshake() {
423   // 4.2 *start the WebSocket closing handshake*.
424   if (closing_handshake_started_ || client_closing_handshake_) {
425     // 1. If the WebSocket closing handshake has started, then abort these
426     // steps.
427     return;
428   }
429   // 2.,3. Send a 0xFF and 0x00 byte to the server.
430   client_closing_handshake_ = true;
431   IOBufferWithSize* buf = new IOBufferWithSize(2);
432   memcpy(buf->data(), kClosingFrame, 2);
433   pending_write_bufs_.push_back(make_scoped_refptr(buf));
434   SendPending();
435 }
436 
DoForceCloseConnection()437 void WebSocket::DoForceCloseConnection() {
438   // 4.2 *start the WebSocket closing handshake*
439   // 6. If the WebSocket connection is not already closed, then close the
440   // WebSocket connection.  (If this happens, then the closing handshake
441   // doesn't finish.)
442   DCHECK(MessageLoop::current() == origin_loop_);
443   force_close_task_ = NULL;
444   FailConnection();
445 }
446 
FailConnection()447 void WebSocket::FailConnection() {
448   DCHECK(MessageLoop::current() == origin_loop_);
449   // 6.1 Client-initiated closure.
450   // *fail the WebSocket connection*.
451   // the user agent must close the WebSocket connection, and may report the
452   // problem to the user.
453   if (!socket_stream_)
454     return;
455   socket_stream_->Close();
456 }
457 
DoClose()458 void WebSocket::DoClose() {
459   DCHECK(MessageLoop::current() == origin_loop_);
460   if (force_close_task_) {
461     // WebSocket connection is closed while waiting a user-agent-determined
462     // length of time after *The WebSocket closing handshake has started*.
463     force_close_task_->Cancel();
464     force_close_task_ = NULL;
465   }
466   WebSocketDelegate* delegate = delegate_;
467   delegate_ = NULL;
468   ready_state_ = CLOSED;
469   if (!socket_stream_)
470     return;
471   socket_stream_ = NULL;
472   if (delegate)
473     delegate->OnClose(this,
474                       server_closing_handshake_ && closing_handshake_started_);
475   Release();
476 }
477 
DoSocketError(int error)478 void WebSocket::DoSocketError(int error) {
479   DCHECK(MessageLoop::current() == origin_loop_);
480   if (delegate_)
481     delegate_->OnSocketError(this, error);
482 }
483 
484 }  // namespace net
485