1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_channel.h"
6
7 #include <limits.h> // for INT_MAX
8
9 #include <algorithm>
10 #include <deque>
11
12 #include "base/basictypes.h" // for size_t
13 #include "base/big_endian.h"
14 #include "base/bind.h"
15 #include "base/compiler_specific.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/weak_ptr.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/metrics/histogram.h"
20 #include "base/numerics/safe_conversions.h"
21 #include "base/stl_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/time/time.h"
24 #include "net/base/io_buffer.h"
25 #include "net/base/net_log.h"
26 #include "net/http/http_request_headers.h"
27 #include "net/http/http_response_headers.h"
28 #include "net/http/http_util.h"
29 #include "net/websockets/websocket_errors.h"
30 #include "net/websockets/websocket_event_interface.h"
31 #include "net/websockets/websocket_frame.h"
32 #include "net/websockets/websocket_handshake_request_info.h"
33 #include "net/websockets/websocket_handshake_response_info.h"
34 #include "net/websockets/websocket_mux.h"
35 #include "net/websockets/websocket_stream.h"
36 #include "url/origin.h"
37
38 namespace net {
39
40 namespace {
41
42 using base::StreamingUtf8Validator;
43
44 const int kDefaultSendQuotaLowWaterMark = 1 << 16;
45 const int kDefaultSendQuotaHighWaterMark = 1 << 17;
46 const size_t kWebSocketCloseCodeLength = 2;
47 // This timeout is based on TCPMaximumSegmentLifetime * 2 from
48 // MainThreadWebSocketChannel.cpp in Blink.
49 const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
50
51 typedef WebSocketEventInterface::ChannelState ChannelState;
52 const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
53 const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
54
55 // Maximum close reason length = max control frame payload -
56 // status code length
57 // = 125 - 2
58 const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
59
60 // Check a close status code for strict compliance with RFC6455. This is only
61 // used for close codes received from a renderer that we are intending to send
62 // out over the network. See ParseClose() for the restrictions on incoming close
63 // codes. The |code| parameter is type int for convenience of implementation;
64 // the real type is uint16. Code 1005 is treated specially; it cannot be set
65 // explicitly by Javascript but the renderer uses it to indicate we should send
66 // a Close frame with no payload.
IsStrictlyValidCloseStatusCode(int code)67 bool IsStrictlyValidCloseStatusCode(int code) {
68 static const int kInvalidRanges[] = {
69 // [BAD, OK)
70 0, 1000, // 1000 is the first valid code
71 1006, 1007, // 1006 MUST NOT be set.
72 1014, 3000, // 1014 unassigned; 1015 up to 2999 are reserved.
73 5000, 65536, // Codes above 5000 are invalid.
74 };
75 const int* const kInvalidRangesEnd =
76 kInvalidRanges + arraysize(kInvalidRanges);
77
78 DCHECK_GE(code, 0);
79 DCHECK_LT(code, 65536);
80 const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
81 DCHECK_NE(kInvalidRangesEnd, upper);
82 DCHECK_GT(upper, kInvalidRanges);
83 DCHECK_GT(*upper, code);
84 DCHECK_LE(*(upper - 1), code);
85 return ((upper - kInvalidRanges) % 2) == 0;
86 }
87
88 // This function avoids a bunch of boilerplate code.
AllowUnused(ChannelState ALLOW_UNUSED unused)89 void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
90
91 // Sets |name| to the name of the frame type for the given |opcode|. Note that
92 // for all of Text, Binary and Continuation opcode, this method returns
93 // "Data frame".
GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,std::string * name)94 void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
95 std::string* name) {
96 switch (opcode) {
97 case WebSocketFrameHeader::kOpCodeText: // fall-thru
98 case WebSocketFrameHeader::kOpCodeBinary: // fall-thru
99 case WebSocketFrameHeader::kOpCodeContinuation:
100 *name = "Data frame";
101 break;
102
103 case WebSocketFrameHeader::kOpCodePing:
104 *name = "Ping";
105 break;
106
107 case WebSocketFrameHeader::kOpCodePong:
108 *name = "Pong";
109 break;
110
111 case WebSocketFrameHeader::kOpCodeClose:
112 *name = "Close";
113 break;
114
115 default:
116 *name = "Unknown frame type";
117 break;
118 }
119
120 return;
121 }
122
123 } // namespace
124
125 // A class to encapsulate a set of frames and information about the size of
126 // those frames.
127 class WebSocketChannel::SendBuffer {
128 public:
SendBuffer()129 SendBuffer() : total_bytes_(0) {}
130
131 // Add a WebSocketFrame to the buffer and increase total_bytes_.
132 void AddFrame(scoped_ptr<WebSocketFrame> chunk);
133
134 // Return a pointer to the frames_ for write purposes.
frames()135 ScopedVector<WebSocketFrame>* frames() { return &frames_; }
136
137 private:
138 // The frames_ that will be sent in the next call to WriteFrames().
139 ScopedVector<WebSocketFrame> frames_;
140
141 // The total size of the payload data in |frames_|. This will be used to
142 // measure the throughput of the link.
143 // TODO(ricea): Measure the throughput of the link.
144 size_t total_bytes_;
145 };
146
AddFrame(scoped_ptr<WebSocketFrame> frame)147 void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
148 total_bytes_ += frame->header.payload_length;
149 frames_.push_back(frame.release());
150 }
151
152 // Implementation of WebSocketStream::ConnectDelegate that simply forwards the
153 // calls on to the WebSocketChannel that created it.
154 class WebSocketChannel::ConnectDelegate
155 : public WebSocketStream::ConnectDelegate {
156 public:
ConnectDelegate(WebSocketChannel * creator)157 explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
158
OnSuccess(scoped_ptr<WebSocketStream> stream)159 virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
160 creator_->OnConnectSuccess(stream.Pass());
161 // |this| may have been deleted.
162 }
163
OnFailure(const std::string & message)164 virtual void OnFailure(const std::string& message) OVERRIDE {
165 creator_->OnConnectFailure(message);
166 // |this| has been deleted.
167 }
168
OnStartOpeningHandshake(scoped_ptr<WebSocketHandshakeRequestInfo> request)169 virtual void OnStartOpeningHandshake(
170 scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
171 creator_->OnStartOpeningHandshake(request.Pass());
172 }
173
OnFinishOpeningHandshake(scoped_ptr<WebSocketHandshakeResponseInfo> response)174 virtual void OnFinishOpeningHandshake(
175 scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
176 creator_->OnFinishOpeningHandshake(response.Pass());
177 }
178
OnSSLCertificateError(scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,const SSLInfo & ssl_info,bool fatal)179 virtual void OnSSLCertificateError(
180 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks>
181 ssl_error_callbacks,
182 const SSLInfo& ssl_info,
183 bool fatal) OVERRIDE {
184 creator_->OnSSLCertificateError(
185 ssl_error_callbacks.Pass(), ssl_info, fatal);
186 }
187
188 private:
189 // A pointer to the WebSocketChannel that created this object. There is no
190 // danger of this pointer being stale, because deleting the WebSocketChannel
191 // cancels the connect process, deleting this object and preventing its
192 // callbacks from being called.
193 WebSocketChannel* const creator_;
194
195 DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
196 };
197
198 class WebSocketChannel::HandshakeNotificationSender
199 : public base::SupportsWeakPtr<HandshakeNotificationSender> {
200 public:
201 explicit HandshakeNotificationSender(WebSocketChannel* channel);
202 ~HandshakeNotificationSender();
203
204 static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
205
206 ChannelState SendImmediately(WebSocketEventInterface* event_interface);
207
handshake_request_info() const208 const WebSocketHandshakeRequestInfo* handshake_request_info() const {
209 return handshake_request_info_.get();
210 }
211
set_handshake_request_info(scoped_ptr<WebSocketHandshakeRequestInfo> request_info)212 void set_handshake_request_info(
213 scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
214 handshake_request_info_ = request_info.Pass();
215 }
216
handshake_response_info() const217 const WebSocketHandshakeResponseInfo* handshake_response_info() const {
218 return handshake_response_info_.get();
219 }
220
set_handshake_response_info(scoped_ptr<WebSocketHandshakeResponseInfo> response_info)221 void set_handshake_response_info(
222 scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
223 handshake_response_info_ = response_info.Pass();
224 }
225
226 private:
227 WebSocketChannel* owner_;
228 scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
229 scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
230 };
231
HandshakeNotificationSender(WebSocketChannel * channel)232 WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
233 WebSocketChannel* channel)
234 : owner_(channel) {}
235
~HandshakeNotificationSender()236 WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
237
Send(base::WeakPtr<HandshakeNotificationSender> sender)238 void WebSocketChannel::HandshakeNotificationSender::Send(
239 base::WeakPtr<HandshakeNotificationSender> sender) {
240 // Do nothing if |sender| is already destructed.
241 if (sender) {
242 WebSocketChannel* channel = sender->owner_;
243 AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
244 }
245 }
246
SendImmediately(WebSocketEventInterface * event_interface)247 ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
248 WebSocketEventInterface* event_interface) {
249
250 if (handshake_request_info_.get()) {
251 if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
252 handshake_request_info_.Pass()))
253 return CHANNEL_DELETED;
254 }
255
256 if (handshake_response_info_.get()) {
257 if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
258 handshake_response_info_.Pass()))
259 return CHANNEL_DELETED;
260
261 // TODO(yhirano): We can release |this| to save memory because
262 // there will be no more opening handshake notification.
263 }
264
265 return CHANNEL_ALIVE;
266 }
267
PendingReceivedFrame(bool final,WebSocketFrameHeader::OpCode opcode,const scoped_refptr<IOBuffer> & data,size_t offset,size_t size)268 WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
269 bool final,
270 WebSocketFrameHeader::OpCode opcode,
271 const scoped_refptr<IOBuffer>& data,
272 size_t offset,
273 size_t size)
274 : final_(final),
275 opcode_(opcode),
276 data_(data),
277 offset_(offset),
278 size_(size) {}
279
~PendingReceivedFrame()280 WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
281
ResetOpcode()282 void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
283 DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
284 opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
285 }
286
DidConsume(size_t bytes)287 void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
288 DCHECK_LE(offset_, size_);
289 DCHECK_LE(bytes, size_ - offset_);
290 offset_ += bytes;
291 }
292
WebSocketChannel(scoped_ptr<WebSocketEventInterface> event_interface,URLRequestContext * url_request_context)293 WebSocketChannel::WebSocketChannel(
294 scoped_ptr<WebSocketEventInterface> event_interface,
295 URLRequestContext* url_request_context)
296 : event_interface_(event_interface.Pass()),
297 url_request_context_(url_request_context),
298 send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
299 send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
300 current_send_quota_(0),
301 current_receive_quota_(0),
302 timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
303 received_close_code_(0),
304 state_(FRESHLY_CONSTRUCTED),
305 notification_sender_(new HandshakeNotificationSender(this)),
306 sending_text_message_(false),
307 receiving_text_message_(false),
308 expecting_to_handle_continuation_(false),
309 initial_frame_forwarded_(false) {}
310
~WebSocketChannel()311 WebSocketChannel::~WebSocketChannel() {
312 // The stream may hold a pointer to read_frames_, and so it needs to be
313 // destroyed first.
314 stream_.reset();
315 // The timer may have a callback pointing back to us, so stop it just in case
316 // someone decides to run the event loop from their destructor.
317 timer_.Stop();
318 }
319
SendAddChannelRequest(const GURL & socket_url,const std::vector<std::string> & requested_subprotocols,const url::Origin & origin)320 void WebSocketChannel::SendAddChannelRequest(
321 const GURL& socket_url,
322 const std::vector<std::string>& requested_subprotocols,
323 const url::Origin& origin) {
324 // Delegate to the tested version.
325 SendAddChannelRequestWithSuppliedCreator(
326 socket_url,
327 requested_subprotocols,
328 origin,
329 base::Bind(&WebSocketStream::CreateAndConnectStream));
330 }
331
SetState(State new_state)332 void WebSocketChannel::SetState(State new_state) {
333 DCHECK_NE(state_, new_state);
334
335 if (new_state == CONNECTED)
336 established_on_ = base::TimeTicks::Now();
337 if (state_ == CONNECTED && !established_on_.is_null()) {
338 UMA_HISTOGRAM_LONG_TIMES(
339 "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
340 }
341
342 state_ = new_state;
343 }
344
InClosingState() const345 bool WebSocketChannel::InClosingState() const {
346 // The state RECV_CLOSED is not supported here, because it is only used in one
347 // code path and should not leak into the code in general.
348 DCHECK_NE(RECV_CLOSED, state_)
349 << "InClosingState called with state_ == RECV_CLOSED";
350 return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
351 }
352
SendFrame(bool fin,WebSocketFrameHeader::OpCode op_code,const std::vector<char> & data)353 void WebSocketChannel::SendFrame(bool fin,
354 WebSocketFrameHeader::OpCode op_code,
355 const std::vector<char>& data) {
356 if (data.size() > INT_MAX) {
357 NOTREACHED() << "Frame size sanity check failed";
358 return;
359 }
360 if (stream_ == NULL) {
361 LOG(DFATAL) << "Got SendFrame without a connection established; "
362 << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
363 << " data.size()=" << data.size();
364 return;
365 }
366 if (InClosingState()) {
367 DVLOG(1) << "SendFrame called in state " << state_
368 << ". This may be a bug, or a harmless race.";
369 return;
370 }
371 if (state_ != CONNECTED) {
372 NOTREACHED() << "SendFrame() called in state " << state_;
373 return;
374 }
375 if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
376 // TODO(ricea): Kill renderer.
377 AllowUnused(
378 FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
379 // |this| has been deleted.
380 return;
381 }
382 if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
383 LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
384 << "; misbehaving renderer? fin=" << fin
385 << " data.size()=" << data.size();
386 return;
387 }
388 if (op_code == WebSocketFrameHeader::kOpCodeText ||
389 (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
390 sending_text_message_)) {
391 StreamingUtf8Validator::State state =
392 outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
393 if (state == StreamingUtf8Validator::INVALID ||
394 (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
395 // TODO(ricea): Kill renderer.
396 AllowUnused(
397 FailChannel("Browser sent a text frame containing invalid UTF-8",
398 kWebSocketErrorGoingAway,
399 ""));
400 // |this| has been deleted.
401 return;
402 }
403 sending_text_message_ = !fin;
404 DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
405 }
406 current_send_quota_ -= data.size();
407 // TODO(ricea): If current_send_quota_ has dropped below
408 // send_quota_low_water_mark_, it might be good to increase the "low
409 // water mark" and "high water mark", but only if the link to the WebSocket
410 // server is not saturated.
411 scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
412 std::copy(data.begin(), data.end(), buffer->data());
413 AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
414 // |this| may have been deleted.
415 }
416
SendFlowControl(int64 quota)417 void WebSocketChannel::SendFlowControl(int64 quota) {
418 DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
419 state_ == CLOSE_WAIT);
420 // TODO(ricea): Kill the renderer if it tries to send us a negative quota
421 // value or > INT_MAX.
422 DCHECK_GE(quota, 0);
423 DCHECK_LE(quota, INT_MAX);
424 if (!pending_received_frames_.empty()) {
425 DCHECK_EQ(0, current_receive_quota_);
426 }
427 while (!pending_received_frames_.empty() && quota > 0) {
428 PendingReceivedFrame& front = pending_received_frames_.front();
429 const size_t data_size = front.size() - front.offset();
430 const size_t bytes_to_send =
431 std::min(base::checked_cast<size_t>(quota), data_size);
432 const bool final = front.final() && data_size == bytes_to_send;
433 const char* data = front.data() ?
434 front.data()->data() + front.offset() : NULL;
435 DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
436 const std::vector<char> data_vector(data, data + bytes_to_send);
437 DVLOG(3) << "Sending frame previously split due to quota to the "
438 << "renderer: quota=" << quota << " data_size=" << data_size
439 << " bytes_to_send=" << bytes_to_send;
440 if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
441 CHANNEL_DELETED)
442 return;
443 if (bytes_to_send < data_size) {
444 front.DidConsume(bytes_to_send);
445 front.ResetOpcode();
446 return;
447 }
448 const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
449 DCHECK_GE(quota, signed_bytes_to_send);
450 quota -= signed_bytes_to_send;
451
452 pending_received_frames_.pop();
453 }
454 // If current_receive_quota_ == 0 then there is no pending ReadFrames()
455 // operation.
456 const bool start_read =
457 current_receive_quota_ == 0 && quota > 0 &&
458 (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
459 current_receive_quota_ += base::checked_cast<int>(quota);
460 if (start_read)
461 AllowUnused(ReadFrames());
462 // |this| may have been deleted.
463 }
464
StartClosingHandshake(uint16 code,const std::string & reason)465 void WebSocketChannel::StartClosingHandshake(uint16 code,
466 const std::string& reason) {
467 if (InClosingState()) {
468 DVLOG(1) << "StartClosingHandshake called in state " << state_
469 << ". This may be a bug, or a harmless race.";
470 return;
471 }
472 if (state_ == CONNECTING) {
473 // Abort the in-progress handshake and drop the connection immediately.
474 stream_request_.reset();
475 SetState(CLOSED);
476 AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
477 return;
478 }
479 if (state_ != CONNECTED) {
480 NOTREACHED() << "StartClosingHandshake() called in state " << state_;
481 return;
482 }
483 // Javascript actually only permits 1000 and 3000-4999, but the implementation
484 // itself may produce different codes. The length of |reason| is also checked
485 // by Javascript.
486 if (!IsStrictlyValidCloseStatusCode(code) ||
487 reason.size() > kMaximumCloseReasonLength) {
488 // "InternalServerError" is actually used for errors from any endpoint, per
489 // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
490 // reason it must be malfunctioning in some way, and based on that we
491 // interpret this as an internal error.
492 if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
493 DCHECK_EQ(CONNECTED, state_);
494 SetState(SEND_CLOSED);
495 }
496 return;
497 }
498 if (SendClose(
499 code,
500 StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
501 CHANNEL_DELETED)
502 return;
503 DCHECK_EQ(CONNECTED, state_);
504 SetState(SEND_CLOSED);
505 }
506
SendAddChannelRequestForTesting(const GURL & socket_url,const std::vector<std::string> & requested_subprotocols,const url::Origin & origin,const WebSocketStreamCreator & creator)507 void WebSocketChannel::SendAddChannelRequestForTesting(
508 const GURL& socket_url,
509 const std::vector<std::string>& requested_subprotocols,
510 const url::Origin& origin,
511 const WebSocketStreamCreator& creator) {
512 SendAddChannelRequestWithSuppliedCreator(
513 socket_url, requested_subprotocols, origin, creator);
514 }
515
SetClosingHandshakeTimeoutForTesting(base::TimeDelta delay)516 void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
517 base::TimeDelta delay) {
518 timeout_ = delay;
519 }
520
SendAddChannelRequestWithSuppliedCreator(const GURL & socket_url,const std::vector<std::string> & requested_subprotocols,const url::Origin & origin,const WebSocketStreamCreator & creator)521 void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
522 const GURL& socket_url,
523 const std::vector<std::string>& requested_subprotocols,
524 const url::Origin& origin,
525 const WebSocketStreamCreator& creator) {
526 DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
527 if (!socket_url.SchemeIsWSOrWSS()) {
528 // TODO(ricea): Kill the renderer (this error should have been caught by
529 // Javascript).
530 AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
531 // |this| is deleted here.
532 return;
533 }
534 socket_url_ = socket_url;
535 scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
536 new ConnectDelegate(this));
537 stream_request_ = creator.Run(socket_url_,
538 requested_subprotocols,
539 origin,
540 url_request_context_,
541 BoundNetLog(),
542 connect_delegate.Pass());
543 SetState(CONNECTING);
544 }
545
OnConnectSuccess(scoped_ptr<WebSocketStream> stream)546 void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
547 DCHECK(stream);
548 DCHECK_EQ(CONNECTING, state_);
549
550 stream_ = stream.Pass();
551
552 SetState(CONNECTED);
553
554 if (event_interface_->OnAddChannelResponse(
555 false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
556 CHANNEL_DELETED)
557 return;
558
559 // TODO(ricea): Get flow control information from the WebSocketStream once we
560 // have a multiplexing WebSocketStream.
561 current_send_quota_ = send_quota_high_water_mark_;
562 if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
563 CHANNEL_DELETED)
564 return;
565
566 // |stream_request_| is not used once the connection has succeeded.
567 stream_request_.reset();
568
569 AllowUnused(ReadFrames());
570 // |this| may have been deleted.
571 }
572
OnConnectFailure(const std::string & message)573 void WebSocketChannel::OnConnectFailure(const std::string& message) {
574 DCHECK_EQ(CONNECTING, state_);
575
576 // Copy the message before we delete its owner.
577 std::string message_copy = message;
578
579 SetState(CLOSED);
580 stream_request_.reset();
581
582 if (CHANNEL_DELETED ==
583 notification_sender_->SendImmediately(event_interface_.get())) {
584 // |this| has been deleted.
585 return;
586 }
587 AllowUnused(event_interface_->OnFailChannel(message_copy));
588 // |this| has been deleted.
589 }
590
OnSSLCertificateError(scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,const SSLInfo & ssl_info,bool fatal)591 void WebSocketChannel::OnSSLCertificateError(
592 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,
593 const SSLInfo& ssl_info,
594 bool fatal) {
595 AllowUnused(event_interface_->OnSSLCertificateError(
596 ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal));
597 }
598
OnStartOpeningHandshake(scoped_ptr<WebSocketHandshakeRequestInfo> request)599 void WebSocketChannel::OnStartOpeningHandshake(
600 scoped_ptr<WebSocketHandshakeRequestInfo> request) {
601 DCHECK(!notification_sender_->handshake_request_info());
602
603 // Because it is hard to handle an IPC error synchronously is difficult,
604 // we asynchronously notify the information.
605 notification_sender_->set_handshake_request_info(request.Pass());
606 ScheduleOpeningHandshakeNotification();
607 }
608
OnFinishOpeningHandshake(scoped_ptr<WebSocketHandshakeResponseInfo> response)609 void WebSocketChannel::OnFinishOpeningHandshake(
610 scoped_ptr<WebSocketHandshakeResponseInfo> response) {
611 DCHECK(!notification_sender_->handshake_response_info());
612
613 // Because it is hard to handle an IPC error synchronously is difficult,
614 // we asynchronously notify the information.
615 notification_sender_->set_handshake_response_info(response.Pass());
616 ScheduleOpeningHandshakeNotification();
617 }
618
ScheduleOpeningHandshakeNotification()619 void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
620 base::MessageLoop::current()->PostTask(
621 FROM_HERE,
622 base::Bind(HandshakeNotificationSender::Send,
623 notification_sender_->AsWeakPtr()));
624 }
625
WriteFrames()626 ChannelState WebSocketChannel::WriteFrames() {
627 int result = OK;
628 do {
629 // This use of base::Unretained is safe because this object owns the
630 // WebSocketStream and destroying it cancels all callbacks.
631 result = stream_->WriteFrames(
632 data_being_sent_->frames(),
633 base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
634 base::Unretained(this),
635 false));
636 if (result != ERR_IO_PENDING) {
637 if (OnWriteDone(true, result) == CHANNEL_DELETED)
638 return CHANNEL_DELETED;
639 // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
640 // guaranteed to be the same as before OnWriteDone() call.
641 }
642 } while (result == OK && data_being_sent_);
643 return CHANNEL_ALIVE;
644 }
645
OnWriteDone(bool synchronous,int result)646 ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
647 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
648 DCHECK_NE(CONNECTING, state_);
649 DCHECK_NE(ERR_IO_PENDING, result);
650 DCHECK(data_being_sent_);
651 switch (result) {
652 case OK:
653 if (data_to_send_next_) {
654 data_being_sent_ = data_to_send_next_.Pass();
655 if (!synchronous)
656 return WriteFrames();
657 } else {
658 data_being_sent_.reset();
659 if (current_send_quota_ < send_quota_low_water_mark_) {
660 // TODO(ricea): Increase low_water_mark and high_water_mark if
661 // throughput is high, reduce them if throughput is low. Low water
662 // mark needs to be >= the bandwidth delay product *of the IPC
663 // channel*. Because factors like context-switch time, thread wake-up
664 // time, and bus speed come into play it is complex and probably needs
665 // to be determined empirically.
666 DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
667 // TODO(ricea): Truncate quota by the quota specified by the remote
668 // server, if the protocol in use supports quota.
669 int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
670 current_send_quota_ += fresh_quota;
671 return event_interface_->OnFlowControl(fresh_quota);
672 }
673 }
674 return CHANNEL_ALIVE;
675
676 // If a recoverable error condition existed, it would go here.
677
678 default:
679 DCHECK_LT(result, 0)
680 << "WriteFrames() should only return OK or ERR_ codes";
681
682 stream_->Close();
683 SetState(CLOSED);
684 return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
685 }
686 }
687
ReadFrames()688 ChannelState WebSocketChannel::ReadFrames() {
689 int result = OK;
690 while (result == OK && current_receive_quota_ > 0) {
691 // This use of base::Unretained is safe because this object owns the
692 // WebSocketStream, and any pending reads will be cancelled when it is
693 // destroyed.
694 result = stream_->ReadFrames(
695 &read_frames_,
696 base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
697 base::Unretained(this),
698 false));
699 if (result != ERR_IO_PENDING) {
700 if (OnReadDone(true, result) == CHANNEL_DELETED)
701 return CHANNEL_DELETED;
702 }
703 DCHECK_NE(CLOSED, state_);
704 }
705 return CHANNEL_ALIVE;
706 }
707
OnReadDone(bool synchronous,int result)708 ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
709 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
710 DCHECK_NE(CONNECTING, state_);
711 DCHECK_NE(ERR_IO_PENDING, result);
712 switch (result) {
713 case OK:
714 // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
715 // with no data read, not an empty response.
716 DCHECK(!read_frames_.empty())
717 << "ReadFrames() returned OK, but nothing was read.";
718 for (size_t i = 0; i < read_frames_.size(); ++i) {
719 scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
720 read_frames_[i] = NULL;
721 if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
722 return CHANNEL_DELETED;
723 }
724 read_frames_.clear();
725 // There should always be a call to ReadFrames pending.
726 // TODO(ricea): Unless we are out of quota.
727 DCHECK_NE(CLOSED, state_);
728 if (!synchronous)
729 return ReadFrames();
730 return CHANNEL_ALIVE;
731
732 case ERR_WS_PROTOCOL_ERROR:
733 // This could be kWebSocketErrorProtocolError (specifically, non-minimal
734 // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
735 // extension-specific error.
736 return FailChannel("Invalid frame header",
737 kWebSocketErrorProtocolError,
738 "WebSocket Protocol Error");
739
740 default:
741 DCHECK_LT(result, 0)
742 << "ReadFrames() should only return OK or ERR_ codes";
743
744 stream_->Close();
745 SetState(CLOSED);
746
747 uint16 code = kWebSocketErrorAbnormalClosure;
748 std::string reason = "";
749 bool was_clean = false;
750 if (received_close_code_ != 0) {
751 code = received_close_code_;
752 reason = received_close_reason_;
753 was_clean = (result == ERR_CONNECTION_CLOSED);
754 }
755
756 return DoDropChannel(was_clean, code, reason);
757 }
758 }
759
HandleFrame(scoped_ptr<WebSocketFrame> frame)760 ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
761 if (frame->header.masked) {
762 // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
763 // masked frame."
764 return FailChannel(
765 "A server must not mask any frames that it sends to the "
766 "client.",
767 kWebSocketErrorProtocolError,
768 "Masked frame from server");
769 }
770 const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
771 DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
772 frame->header.final);
773 if (frame->header.reserved1 || frame->header.reserved2 ||
774 frame->header.reserved3) {
775 return FailChannel(base::StringPrintf(
776 "One or more reserved bits are on: reserved1 = %d, "
777 "reserved2 = %d, reserved3 = %d",
778 static_cast<int>(frame->header.reserved1),
779 static_cast<int>(frame->header.reserved2),
780 static_cast<int>(frame->header.reserved3)),
781 kWebSocketErrorProtocolError,
782 "Invalid reserved bit");
783 }
784
785 // Respond to the frame appropriately to its type.
786 return HandleFrameByState(
787 opcode, frame->header.final, frame->data, frame->header.payload_length);
788 }
789
HandleFrameByState(const WebSocketFrameHeader::OpCode opcode,bool final,const scoped_refptr<IOBuffer> & data_buffer,size_t size)790 ChannelState WebSocketChannel::HandleFrameByState(
791 const WebSocketFrameHeader::OpCode opcode,
792 bool final,
793 const scoped_refptr<IOBuffer>& data_buffer,
794 size_t size) {
795 DCHECK_NE(RECV_CLOSED, state_)
796 << "HandleFrame() does not support being called re-entrantly from within "
797 "SendClose()";
798 DCHECK_NE(CLOSED, state_);
799 if (state_ == CLOSE_WAIT) {
800 std::string frame_name;
801 GetFrameTypeForOpcode(opcode, &frame_name);
802
803 // FailChannel() won't send another Close frame.
804 return FailChannel(
805 frame_name + " received after close", kWebSocketErrorProtocolError, "");
806 }
807 switch (opcode) {
808 case WebSocketFrameHeader::kOpCodeText: // fall-thru
809 case WebSocketFrameHeader::kOpCodeBinary:
810 case WebSocketFrameHeader::kOpCodeContinuation:
811 return HandleDataFrame(opcode, final, data_buffer, size);
812
813 case WebSocketFrameHeader::kOpCodePing:
814 DVLOG(1) << "Got Ping of size " << size;
815 if (state_ == CONNECTED)
816 return SendFrameFromIOBuffer(
817 true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
818 DVLOG(3) << "Ignored ping in state " << state_;
819 return CHANNEL_ALIVE;
820
821 case WebSocketFrameHeader::kOpCodePong:
822 DVLOG(1) << "Got Pong of size " << size;
823 // There is no need to do anything with pong messages.
824 return CHANNEL_ALIVE;
825
826 case WebSocketFrameHeader::kOpCodeClose: {
827 // TODO(ricea): If there is a message which is queued for transmission to
828 // the renderer, then the renderer should not receive an
829 // OnClosingHandshake or OnDropChannel IPC until the queued message has
830 // been completedly transmitted.
831 uint16 code = kWebSocketNormalClosure;
832 std::string reason;
833 std::string message;
834 if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
835 return FailChannel(message, code, reason);
836 }
837 // TODO(ricea): Find a way to safely log the message from the close
838 // message (escape control codes and so on).
839 DVLOG(1) << "Got Close with code " << code;
840 switch (state_) {
841 case CONNECTED:
842 SetState(RECV_CLOSED);
843 if (SendClose(code, reason) == CHANNEL_DELETED)
844 return CHANNEL_DELETED;
845 DCHECK_EQ(RECV_CLOSED, state_);
846 SetState(CLOSE_WAIT);
847
848 if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
849 return CHANNEL_DELETED;
850 received_close_code_ = code;
851 received_close_reason_ = reason;
852 break;
853
854 case SEND_CLOSED:
855 SetState(CLOSE_WAIT);
856 // From RFC6455 section 7.1.5: "Each endpoint
857 // will see the status code sent by the other end as _The WebSocket
858 // Connection Close Code_."
859 received_close_code_ = code;
860 received_close_reason_ = reason;
861 break;
862
863 default:
864 LOG(DFATAL) << "Got Close in unexpected state " << state_;
865 break;
866 }
867 return CHANNEL_ALIVE;
868 }
869
870 default:
871 return FailChannel(
872 base::StringPrintf("Unrecognized frame opcode: %d", opcode),
873 kWebSocketErrorProtocolError,
874 "Unknown opcode");
875 }
876 }
877
HandleDataFrame(WebSocketFrameHeader::OpCode opcode,bool final,const scoped_refptr<IOBuffer> & data_buffer,size_t size)878 ChannelState WebSocketChannel::HandleDataFrame(
879 WebSocketFrameHeader::OpCode opcode,
880 bool final,
881 const scoped_refptr<IOBuffer>& data_buffer,
882 size_t size) {
883 if (state_ != CONNECTED) {
884 DVLOG(3) << "Ignored data packet received in state " << state_;
885 return CHANNEL_ALIVE;
886 }
887 DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
888 opcode == WebSocketFrameHeader::kOpCodeText ||
889 opcode == WebSocketFrameHeader::kOpCodeBinary);
890 const bool got_continuation =
891 (opcode == WebSocketFrameHeader::kOpCodeContinuation);
892 if (got_continuation != expecting_to_handle_continuation_) {
893 const std::string console_log = got_continuation
894 ? "Received unexpected continuation frame."
895 : "Received start of new message but previous message is unfinished.";
896 const std::string reason = got_continuation
897 ? "Unexpected continuation"
898 : "Previous data frame unfinished";
899 return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
900 }
901 expecting_to_handle_continuation_ = !final;
902 WebSocketFrameHeader::OpCode opcode_to_send = opcode;
903 if (!initial_frame_forwarded_ &&
904 opcode == WebSocketFrameHeader::kOpCodeContinuation) {
905 opcode_to_send = receiving_text_message_
906 ? WebSocketFrameHeader::kOpCodeText
907 : WebSocketFrameHeader::kOpCodeBinary;
908 }
909 if (opcode == WebSocketFrameHeader::kOpCodeText ||
910 (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
911 receiving_text_message_)) {
912 // This call is not redundant when size == 0 because it tells us what
913 // the current state is.
914 StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
915 size ? data_buffer->data() : NULL, size);
916 if (state == StreamingUtf8Validator::INVALID ||
917 (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
918 return FailChannel("Could not decode a text frame as UTF-8.",
919 kWebSocketErrorProtocolError,
920 "Invalid UTF-8 in text frame");
921 }
922 receiving_text_message_ = !final;
923 DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
924 }
925 if (size == 0U && !final)
926 return CHANNEL_ALIVE;
927
928 initial_frame_forwarded_ = !final;
929 if (size > base::checked_cast<size_t>(current_receive_quota_) ||
930 !pending_received_frames_.empty()) {
931 const bool no_quota = (current_receive_quota_ == 0);
932 DCHECK(no_quota || pending_received_frames_.empty());
933 DVLOG(3) << "Queueing frame to renderer due to quota. quota="
934 << current_receive_quota_ << " size=" << size;
935 WebSocketFrameHeader::OpCode opcode_to_queue =
936 no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
937 pending_received_frames_.push(PendingReceivedFrame(
938 final, opcode_to_queue, data_buffer, current_receive_quota_, size));
939 if (no_quota)
940 return CHANNEL_ALIVE;
941 size = current_receive_quota_;
942 final = false;
943 }
944
945 // TODO(ricea): Can this copy be eliminated?
946 const char* const data_begin = size ? data_buffer->data() : NULL;
947 const char* const data_end = data_begin + size;
948 const std::vector<char> data(data_begin, data_end);
949 current_receive_quota_ -= size;
950 DCHECK_GE(current_receive_quota_, 0);
951
952 // Sends the received frame to the renderer process.
953 return event_interface_->OnDataFrame(final, opcode_to_send, data);
954 }
955
SendFrameFromIOBuffer(bool fin,WebSocketFrameHeader::OpCode op_code,const scoped_refptr<IOBuffer> & buffer,size_t size)956 ChannelState WebSocketChannel::SendFrameFromIOBuffer(
957 bool fin,
958 WebSocketFrameHeader::OpCode op_code,
959 const scoped_refptr<IOBuffer>& buffer,
960 size_t size) {
961 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
962 DCHECK(stream_);
963
964 scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
965 WebSocketFrameHeader& header = frame->header;
966 header.final = fin;
967 header.masked = true;
968 header.payload_length = size;
969 frame->data = buffer;
970
971 if (data_being_sent_) {
972 // Either the link to the WebSocket server is saturated, or several messages
973 // are being sent in a batch.
974 // TODO(ricea): Keep some statistics to work out the situation and adjust
975 // quota appropriately.
976 if (!data_to_send_next_)
977 data_to_send_next_.reset(new SendBuffer);
978 data_to_send_next_->AddFrame(frame.Pass());
979 return CHANNEL_ALIVE;
980 }
981
982 data_being_sent_.reset(new SendBuffer);
983 data_being_sent_->AddFrame(frame.Pass());
984 return WriteFrames();
985 }
986
FailChannel(const std::string & message,uint16 code,const std::string & reason)987 ChannelState WebSocketChannel::FailChannel(const std::string& message,
988 uint16 code,
989 const std::string& reason) {
990 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
991 DCHECK_NE(CONNECTING, state_);
992 DCHECK_NE(CLOSED, state_);
993
994 // TODO(ricea): Logging.
995 if (state_ == CONNECTED) {
996 if (SendClose(code, reason) == CHANNEL_DELETED)
997 return CHANNEL_DELETED;
998 }
999
1000 // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
1001 // should close the connection itself without waiting for the closing
1002 // handshake.
1003 stream_->Close();
1004 SetState(CLOSED);
1005 return event_interface_->OnFailChannel(message);
1006 }
1007
SendClose(uint16 code,const std::string & reason)1008 ChannelState WebSocketChannel::SendClose(uint16 code,
1009 const std::string& reason) {
1010 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
1011 DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
1012 scoped_refptr<IOBuffer> body;
1013 size_t size = 0;
1014 if (code == kWebSocketErrorNoStatusReceived) {
1015 // Special case: translate kWebSocketErrorNoStatusReceived into a Close
1016 // frame with no payload.
1017 DCHECK(reason.empty());
1018 body = new IOBuffer(0);
1019 } else {
1020 const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
1021 body = new IOBuffer(payload_length);
1022 size = payload_length;
1023 base::WriteBigEndian(body->data(), code);
1024 COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
1025 they_should_both_be_two);
1026 std::copy(
1027 reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
1028 }
1029 // This use of base::Unretained() is safe because we stop the timer in the
1030 // destructor.
1031 timer_.Start(
1032 FROM_HERE,
1033 timeout_,
1034 base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
1035 if (SendFrameFromIOBuffer(
1036 true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
1037 CHANNEL_DELETED)
1038 return CHANNEL_DELETED;
1039 return CHANNEL_ALIVE;
1040 }
1041
ParseClose(const scoped_refptr<IOBuffer> & buffer,size_t size,uint16 * code,std::string * reason,std::string * message)1042 bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
1043 size_t size,
1044 uint16* code,
1045 std::string* reason,
1046 std::string* message) {
1047 reason->clear();
1048 if (size < kWebSocketCloseCodeLength) {
1049 if (size == 0U) {
1050 *code = kWebSocketErrorNoStatusReceived;
1051 return true;
1052 }
1053
1054 DVLOG(1) << "Close frame with payload size " << size << " received "
1055 << "(the first byte is " << std::hex
1056 << static_cast<int>(buffer->data()[0]) << ")";
1057 *code = kWebSocketErrorProtocolError;
1058 *message =
1059 "Received a broken close frame containing an invalid size body.";
1060 return false;
1061 }
1062
1063 const char* data = buffer->data();
1064 uint16 unchecked_code = 0;
1065 base::ReadBigEndian(data, &unchecked_code);
1066 COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
1067 they_should_both_be_two_bytes);
1068
1069 switch (unchecked_code) {
1070 case kWebSocketErrorNoStatusReceived:
1071 case kWebSocketErrorAbnormalClosure:
1072 case kWebSocketErrorTlsHandshake:
1073 *code = kWebSocketErrorProtocolError;
1074 *message =
1075 "Received a broken close frame containing a reserved status code.";
1076 return false;
1077
1078 default:
1079 *code = unchecked_code;
1080 break;
1081 }
1082
1083 std::string text(data + kWebSocketCloseCodeLength, data + size);
1084 if (StreamingUtf8Validator::Validate(text)) {
1085 reason->swap(text);
1086 return true;
1087 }
1088
1089 *code = kWebSocketErrorProtocolError;
1090 *reason = "Invalid UTF-8 in Close frame";
1091 *message = "Received a broken close frame containing invalid UTF-8.";
1092 return false;
1093 }
1094
DoDropChannel(bool was_clean,uint16 code,const std::string & reason)1095 ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
1096 uint16 code,
1097 const std::string& reason) {
1098 if (CHANNEL_DELETED ==
1099 notification_sender_->SendImmediately(event_interface_.get()))
1100 return CHANNEL_DELETED;
1101 ChannelState result =
1102 event_interface_->OnDropChannel(was_clean, code, reason);
1103 DCHECK_EQ(CHANNEL_DELETED, result);
1104 return result;
1105 }
1106
CloseTimeout()1107 void WebSocketChannel::CloseTimeout() {
1108 stream_->Close();
1109 SetState(CLOSED);
1110 AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
1111 // |this| has been deleted.
1112 }
1113
1114 } // namespace net
1115