1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <algorithm>
6 #include <limits>
7
8 #include "net/websockets/websocket.h"
9
10 #include "base/message_loop.h"
11 #include "net/base/host_resolver.h"
12 #include "net/websockets/websocket_handshake.h"
13 #include "net/websockets/websocket_handshake_draft75.h"
14
15 namespace net {
16
17 static const char kClosingFrame[2] = {'\xff', '\x00'};
18 static int64 kClosingHandshakeTimeout = 1000; // msec.
19
WebSocket(Request * request,WebSocketDelegate * delegate)20 WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate)
21 : ready_state_(INITIALIZED),
22 request_(request),
23 handshake_(NULL),
24 delegate_(delegate),
25 origin_loop_(MessageLoop::current()),
26 socket_stream_(NULL),
27 max_pending_send_allowed_(0),
28 current_read_buf_(NULL),
29 read_consumed_len_(0),
30 current_write_buf_(NULL),
31 server_closing_handshake_(false),
32 client_closing_handshake_(false),
33 closing_handshake_started_(false),
34 force_close_task_(NULL),
35 closing_handshake_timeout_(kClosingHandshakeTimeout) {
36 DCHECK(request_.get());
37 DCHECK(delegate_);
38 DCHECK(origin_loop_);
39 }
40
~WebSocket()41 WebSocket::~WebSocket() {
42 DCHECK(ready_state_ == INITIALIZED || !delegate_);
43 DCHECK(!socket_stream_);
44 DCHECK(!delegate_);
45 }
46
Connect()47 void WebSocket::Connect() {
48 DCHECK(ready_state_ == INITIALIZED);
49 DCHECK(request_.get());
50 DCHECK(delegate_);
51 DCHECK(!socket_stream_);
52 DCHECK(MessageLoop::current() == origin_loop_);
53
54 socket_stream_ = new SocketStream(request_->url(), this);
55 socket_stream_->set_context(request_->context());
56
57 if (request_->host_resolver())
58 socket_stream_->SetHostResolver(request_->host_resolver());
59 if (request_->client_socket_factory())
60 socket_stream_->SetClientSocketFactory(request_->client_socket_factory());
61
62 AddRef(); // Release in DoClose().
63 ready_state_ = CONNECTING;
64 socket_stream_->Connect();
65 }
66
Send(const std::string & msg)67 void WebSocket::Send(const std::string& msg) {
68 if (ready_state_ == CLOSING || ready_state_ == CLOSED) {
69 return;
70 }
71 if (client_closing_handshake_) {
72 // We must not send any data after we start the WebSocket closing handshake.
73 return;
74 }
75 DCHECK(ready_state_ == OPEN);
76 DCHECK(MessageLoop::current() == origin_loop_);
77
78 IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2);
79 char* p = buf->data();
80 *p = '\0';
81 memcpy(p + 1, msg.data(), msg.size());
82 *(p + 1 + msg.size()) = '\xff';
83 pending_write_bufs_.push_back(make_scoped_refptr(buf));
84 SendPending();
85 }
86
Close()87 void WebSocket::Close() {
88 DCHECK(MessageLoop::current() == origin_loop_);
89
90 // If connection has not yet started, do nothing.
91 if (ready_state_ == INITIALIZED) {
92 DCHECK(!socket_stream_);
93 ready_state_ = CLOSED;
94 return;
95 }
96
97 // If the readyState attribute is in the CLOSING or CLOSED state, do nothing
98 if (ready_state_ == CLOSING || ready_state_ == CLOSED)
99 return;
100
101 if (request_->version() == DRAFT75) {
102 DCHECK(socket_stream_);
103 socket_stream_->Close();
104 return;
105 }
106
107 // If the WebSocket connection is not yet established, fail the WebSocket
108 // connection and set the readyState attribute's value to CLOSING.
109 if (ready_state_ == CONNECTING) {
110 ready_state_ = CLOSING;
111 origin_loop_->PostTask(
112 FROM_HERE,
113 NewRunnableMethod(this, &WebSocket::FailConnection));
114 }
115
116 // If the WebSocket closing handshake has not yet been started, start
117 // the WebSocket closing handshake and set the readyState attribute's value
118 // to CLOSING.
119 if (!closing_handshake_started_) {
120 ready_state_ = CLOSING;
121 origin_loop_->PostTask(
122 FROM_HERE,
123 NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
124 }
125
126 // Otherwise, set the readyState attribute's value to CLOSING.
127 ready_state_ = CLOSING;
128 }
129
DetachDelegate()130 void WebSocket::DetachDelegate() {
131 if (!delegate_)
132 return;
133 delegate_ = NULL;
134 if (ready_state_ == INITIALIZED) {
135 DCHECK(!socket_stream_);
136 ready_state_ = CLOSED;
137 return;
138 }
139 if (ready_state_ != CLOSED) {
140 DCHECK(socket_stream_);
141 socket_stream_->Close();
142 }
143 }
144
OnConnected(SocketStream * socket_stream,int max_pending_send_allowed)145 void WebSocket::OnConnected(SocketStream* socket_stream,
146 int max_pending_send_allowed) {
147 DCHECK(socket_stream == socket_stream_);
148 max_pending_send_allowed_ = max_pending_send_allowed;
149
150 // Use |max_pending_send_allowed| as hint for initial size of read buffer.
151 current_read_buf_ = new GrowableIOBuffer();
152 current_read_buf_->SetCapacity(max_pending_send_allowed_);
153 read_consumed_len_ = 0;
154
155 DCHECK(!current_write_buf_);
156 DCHECK(!handshake_.get());
157 switch (request_->version()) {
158 case DEFAULT_VERSION:
159 handshake_.reset(new WebSocketHandshake(
160 request_->url(), request_->origin(), request_->location(),
161 request_->protocol()));
162 break;
163 case DRAFT75:
164 handshake_.reset(new WebSocketHandshakeDraft75(
165 request_->url(), request_->origin(), request_->location(),
166 request_->protocol()));
167 break;
168 default:
169 NOTREACHED() << "Unexpected protocol version:" << request_->version();
170 }
171
172 const std::string msg = handshake_->CreateClientHandshakeMessage();
173 IOBufferWithSize* buf = new IOBufferWithSize(msg.size());
174 memcpy(buf->data(), msg.data(), msg.size());
175 pending_write_bufs_.push_back(make_scoped_refptr(buf));
176 origin_loop_->PostTask(FROM_HERE,
177 NewRunnableMethod(this, &WebSocket::SendPending));
178 }
179
OnSentData(SocketStream * socket_stream,int amount_sent)180 void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) {
181 DCHECK(socket_stream == socket_stream_);
182 DCHECK(current_write_buf_);
183 current_write_buf_->DidConsume(amount_sent);
184 DCHECK_GE(current_write_buf_->BytesRemaining(), 0);
185 if (current_write_buf_->BytesRemaining() == 0) {
186 current_write_buf_ = NULL;
187 pending_write_bufs_.pop_front();
188 }
189 origin_loop_->PostTask(FROM_HERE,
190 NewRunnableMethod(this, &WebSocket::SendPending));
191 }
192
OnReceivedData(SocketStream * socket_stream,const char * data,int len)193 void WebSocket::OnReceivedData(SocketStream* socket_stream,
194 const char* data, int len) {
195 DCHECK(socket_stream == socket_stream_);
196 AddToReadBuffer(data, len);
197 origin_loop_->PostTask(FROM_HERE,
198 NewRunnableMethod(this, &WebSocket::DoReceivedData));
199 }
200
OnClose(SocketStream * socket_stream)201 void WebSocket::OnClose(SocketStream* socket_stream) {
202 origin_loop_->PostTask(FROM_HERE,
203 NewRunnableMethod(this, &WebSocket::DoClose));
204 }
205
OnError(const SocketStream * socket_stream,int error)206 void WebSocket::OnError(const SocketStream* socket_stream, int error) {
207 origin_loop_->PostTask(
208 FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error));
209 }
210
SendPending()211 void WebSocket::SendPending() {
212 DCHECK(MessageLoop::current() == origin_loop_);
213 if (!socket_stream_) {
214 DCHECK_EQ(CLOSED, ready_state_);
215 return;
216 }
217 if (!current_write_buf_) {
218 if (pending_write_bufs_.empty()) {
219 if (client_closing_handshake_) {
220 // Already sent 0xFF and 0x00 bytes.
221 // *The WebSocket closing handshake has started.*
222 closing_handshake_started_ = true;
223 if (server_closing_handshake_) {
224 // 4.2 3-8-3 If the WebSocket connection is not already closed,
225 // then close the WebSocket connection.
226 // *The WebSocket closing handshake has finished*
227 socket_stream_->Close();
228 } else {
229 // 5. Wait a user-agent-determined length of time, or until the
230 // WebSocket connection is closed.
231 force_close_task_ =
232 NewRunnableMethod(this, &WebSocket::DoForceCloseConnection);
233 origin_loop_->PostDelayedTask(
234 FROM_HERE, force_close_task_, closing_handshake_timeout_);
235 }
236 }
237 return;
238 }
239 current_write_buf_ = new DrainableIOBuffer(
240 pending_write_bufs_.front(), pending_write_bufs_.front()->size());
241 }
242 DCHECK_GT(current_write_buf_->BytesRemaining(), 0);
243 bool sent = socket_stream_->SendData(
244 current_write_buf_->data(),
245 std::min(current_write_buf_->BytesRemaining(),
246 max_pending_send_allowed_));
247 DCHECK(sent);
248 }
249
DoReceivedData()250 void WebSocket::DoReceivedData() {
251 DCHECK(MessageLoop::current() == origin_loop_);
252 scoped_refptr<WebSocket> protect(this);
253 switch (ready_state_) {
254 case CONNECTING:
255 {
256 DCHECK(handshake_.get());
257 DCHECK(current_read_buf_);
258 const char* data =
259 current_read_buf_->StartOfBuffer() + read_consumed_len_;
260 size_t len = current_read_buf_->offset() - read_consumed_len_;
261 int eoh = handshake_->ReadServerHandshake(data, len);
262 if (eoh < 0) {
263 // Not enough data, Retry when more data is available.
264 return;
265 }
266 SkipReadBuffer(eoh);
267 }
268 if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) {
269 // Handshake failed.
270 socket_stream_->Close();
271 return;
272 }
273 ready_state_ = OPEN;
274 if (delegate_)
275 delegate_->OnOpen(this);
276 if (current_read_buf_->offset() == read_consumed_len_) {
277 // No remaining data after handshake message.
278 break;
279 }
280 // FALL THROUGH
281 case OPEN:
282 case CLOSING: // need to process closing-frame from server.
283 ProcessFrameData();
284 break;
285
286 case CLOSED:
287 // Closed just after DoReceivedData is queued on |origin_loop_|.
288 break;
289 default:
290 NOTREACHED();
291 break;
292 }
293 }
294
ProcessFrameData()295 void WebSocket::ProcessFrameData() {
296 DCHECK(current_read_buf_);
297 if (server_closing_handshake_) {
298 // Any data on the connection after the 0xFF frame is discarded.
299 return;
300 }
301 scoped_refptr<WebSocket> protect(this);
302 const char* start_frame =
303 current_read_buf_->StartOfBuffer() + read_consumed_len_;
304 const char* next_frame = start_frame;
305 const char* p = next_frame;
306 const char* end =
307 current_read_buf_->StartOfBuffer() + current_read_buf_->offset();
308 while (p < end) {
309 // Let /error/ be false.
310 bool error = false;
311
312 // Handle the /frame type/ byte as follows.
313 unsigned char frame_byte = static_cast<unsigned char>(*p++);
314 if ((frame_byte & 0x80) == 0x80) {
315 int length = 0;
316 while (p < end) {
317 if (length > std::numeric_limits<int>::max() / 128) {
318 // frame length overflow.
319 socket_stream_->Close();
320 return;
321 }
322 unsigned char c = static_cast<unsigned char>(*p);
323 length = length * 128 + (c & 0x7f);
324 ++p;
325 if ((c & 0x80) != 0x80)
326 break;
327 }
328 // Checks if the frame body hasn't been completely received yet.
329 // It also checks the case the frame length bytes haven't been completely
330 // received yet, because p == end and length > 0 in such case.
331 if (p + length < end) {
332 p += length;
333 next_frame = p;
334 if (request_->version() != DRAFT75 &&
335 frame_byte == 0xFF && length == 0) {
336 // 4.2 Data framing 3. Handle the /frame type/ byte.
337 // 8. If the /frame type/ is 0xFF and the /length/ was 0, then
338 // run the following substeps:
339 // 1. If the WebSocket closing handshake has not yet started, then
340 // start the WebSocket closing handshake.
341 server_closing_handshake_ = true;
342 if (!closing_handshake_started_) {
343 origin_loop_->PostTask(
344 FROM_HERE,
345 NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
346 } else {
347 // If the WebSocket closing handshake has been started and
348 // the WebSocket connection is not already closed, then close
349 // the WebSocket connection.
350 socket_stream_->Close();
351 }
352 return;
353 }
354 // 4.2 3-8 Otherwise, let /error/ be true.
355 error = true;
356 } else {
357 // Not enough data in buffer.
358 break;
359 }
360 } else {
361 const char* msg_start = p;
362 while (p < end && *p != '\xff')
363 ++p;
364 if (p < end && *p == '\xff') {
365 if (frame_byte == 0x00) {
366 if (delegate_) {
367 delegate_->OnMessage(this, std::string(msg_start, p - msg_start));
368 }
369 } else {
370 // Otherwise, discard the data and let /error/ to be true.
371 error = true;
372 }
373 ++p;
374 next_frame = p;
375 }
376 }
377 // If /error/ is true, then *a WebSocket error has been detected.*
378 if (error && delegate_)
379 delegate_->OnError(this);
380 }
381 SkipReadBuffer(next_frame - start_frame);
382 }
383
AddToReadBuffer(const char * data,int len)384 void WebSocket::AddToReadBuffer(const char* data, int len) {
385 DCHECK(current_read_buf_);
386 // Check if |current_read_buf_| has enough space to store |len| of |data|.
387 if (len >= current_read_buf_->RemainingCapacity()) {
388 current_read_buf_->SetCapacity(
389 current_read_buf_->offset() + len);
390 }
391
392 DCHECK(current_read_buf_->RemainingCapacity() >= len);
393 memcpy(current_read_buf_->data(), data, len);
394 current_read_buf_->set_offset(current_read_buf_->offset() + len);
395 }
396
SkipReadBuffer(int len)397 void WebSocket::SkipReadBuffer(int len) {
398 if (len == 0)
399 return;
400 DCHECK_GT(len, 0);
401 read_consumed_len_ += len;
402 int remaining = current_read_buf_->offset() - read_consumed_len_;
403 DCHECK_GE(remaining, 0);
404 if (remaining < read_consumed_len_ &&
405 current_read_buf_->RemainingCapacity() < read_consumed_len_) {
406 // Pre compaction:
407 // 0 v-read_consumed_len_ v-offset v- capacity
408 // |..processed..| .. remaining .. | .. RemainingCapacity |
409 //
410 memmove(current_read_buf_->StartOfBuffer(),
411 current_read_buf_->StartOfBuffer() + read_consumed_len_,
412 remaining);
413 read_consumed_len_ = 0;
414 current_read_buf_->set_offset(remaining);
415 // Post compaction:
416 // 0read_consumed_len_ v- offset v- capacity
417 // |.. remaining .. | .. RemainingCapacity ... |
418 //
419 }
420 }
421
StartClosingHandshake()422 void WebSocket::StartClosingHandshake() {
423 // 4.2 *start the WebSocket closing handshake*.
424 if (closing_handshake_started_ || client_closing_handshake_) {
425 // 1. If the WebSocket closing handshake has started, then abort these
426 // steps.
427 return;
428 }
429 // 2.,3. Send a 0xFF and 0x00 byte to the server.
430 client_closing_handshake_ = true;
431 IOBufferWithSize* buf = new IOBufferWithSize(2);
432 memcpy(buf->data(), kClosingFrame, 2);
433 pending_write_bufs_.push_back(make_scoped_refptr(buf));
434 SendPending();
435 }
436
DoForceCloseConnection()437 void WebSocket::DoForceCloseConnection() {
438 // 4.2 *start the WebSocket closing handshake*
439 // 6. If the WebSocket connection is not already closed, then close the
440 // WebSocket connection. (If this happens, then the closing handshake
441 // doesn't finish.)
442 DCHECK(MessageLoop::current() == origin_loop_);
443 force_close_task_ = NULL;
444 FailConnection();
445 }
446
FailConnection()447 void WebSocket::FailConnection() {
448 DCHECK(MessageLoop::current() == origin_loop_);
449 // 6.1 Client-initiated closure.
450 // *fail the WebSocket connection*.
451 // the user agent must close the WebSocket connection, and may report the
452 // problem to the user.
453 if (!socket_stream_)
454 return;
455 socket_stream_->Close();
456 }
457
DoClose()458 void WebSocket::DoClose() {
459 DCHECK(MessageLoop::current() == origin_loop_);
460 if (force_close_task_) {
461 // WebSocket connection is closed while waiting a user-agent-determined
462 // length of time after *The WebSocket closing handshake has started*.
463 force_close_task_->Cancel();
464 force_close_task_ = NULL;
465 }
466 WebSocketDelegate* delegate = delegate_;
467 delegate_ = NULL;
468 ready_state_ = CLOSED;
469 if (!socket_stream_)
470 return;
471 socket_stream_ = NULL;
472 if (delegate)
473 delegate->OnClose(this,
474 server_closing_handshake_ && closing_handshake_started_);
475 Release();
476 }
477
DoSocketError(int error)478 void WebSocket::DoSocketError(int error) {
479 DCHECK(MessageLoop::current() == origin_loop_);
480 if (delegate_)
481 delegate_->OnSocketError(this, error);
482 }
483
484 } // namespace net
485