#!/usr/bin/env python # # Copyright 2011, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Tests for handshake module.""" import unittest import set_sys_path # Update sys.path to locate mod_pywebsocket module. from mod_pywebsocket.handshake import draft75 as handshake from test import mock _GOOD_REQUEST = ( 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }) _GOOD_RESPONSE_DEFAULT_PORT = ( 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' 'Upgrade: WebSocket\r\n' 'Connection: Upgrade\r\n' 'WebSocket-Origin: http://example.com\r\n' 'WebSocket-Location: ws://example.com/demo\r\n' 'WebSocket-Protocol: sample\r\n' '\r\n') _GOOD_RESPONSE_SECURE = ( 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' 'Upgrade: WebSocket\r\n' 'Connection: Upgrade\r\n' 'WebSocket-Origin: http://example.com\r\n' 'WebSocket-Location: wss://example.com/demo\r\n' 'WebSocket-Protocol: sample\r\n' '\r\n') _GOOD_REQUEST_NONDEFAULT_PORT = ( 8081, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com:8081', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }) _GOOD_RESPONSE_NONDEFAULT_PORT = ( 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' 'Upgrade: WebSocket\r\n' 'Connection: Upgrade\r\n' 'WebSocket-Origin: http://example.com\r\n' 'WebSocket-Location: ws://example.com:8081/demo\r\n' 'WebSocket-Protocol: sample\r\n' '\r\n') _GOOD_RESPONSE_SECURE_NONDEF = ( 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' 'Upgrade: WebSocket\r\n' 'Connection: Upgrade\r\n' 'WebSocket-Origin: http://example.com\r\n' 'WebSocket-Location: wss://example.com:8081/demo\r\n' 'WebSocket-Protocol: sample\r\n' '\r\n') _GOOD_REQUEST_NO_PROTOCOL = ( 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', }) _GOOD_RESPONSE_NO_PROTOCOL = ( 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' 'Upgrade: WebSocket\r\n' 'Connection: Upgrade\r\n' 'WebSocket-Origin: http://example.com\r\n' 'WebSocket-Location: ws://example.com/demo\r\n' '\r\n') _GOOD_REQUEST_WITH_OPTIONAL_HEADERS = ( 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', 'AKey': 'AValue', 'EmptyValue': '', }) _BAD_REQUESTS = ( ( # HTTP request 80, '/demo', { 'Host': 'www.google.com', 'User-Agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5;' ' en-US; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3' ' GTB6 GTBA', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,' '*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Keep-Alive': '300', 'Connection': 'keep-alive', }), ( # Missing Upgrade 80, '/demo', { 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }), ( # Wrong Upgrade 80, '/demo', { 'Upgrade': 'NonWebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }), ( # Empty WebSocket-Protocol 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': '', }), ( # Wrong port number format 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com:0x50', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }), ( # Header/connection port mismatch 8080, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'sample', }), ( # Illegal WebSocket-Protocol 80, '/demo', { 'Upgrade': 'WebSocket', 'Connection': 'Upgrade', 'Host': 'example.com', 'Origin': 'http://example.com', 'WebSocket-Protocol': 'illegal\x09protocol', })) _STRICTLY_GOOD_REQUESTS = ( ( 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # WebSocket-Protocol 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'WebSocket-Protocol: sample\r\n', '\r\n', ), ( # WebSocket-Protocol and Cookie 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'WebSocket-Protocol: sample\r\n', 'Cookie: xyz\r\n' '\r\n', ), ( # Cookie 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'Cookie: abc/xyz\r\n' 'Cookie2: $Version=1\r\n' 'Cookie: abc\r\n' '\r\n', ), ( 'GET / HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ) _NOT_STRICTLY_GOOD_REQUESTS = ( ( # Extra space after GET 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # Resource name doesn't stat with '/' 'GET demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # No space after : 'GET /demo HTTP/1.1\r\n', 'Upgrade:WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # Lower case Upgrade header 'GET /demo HTTP/1.1\r\n', 'upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # Connection comes before Upgrade 'GET /demo HTTP/1.1\r\n', 'Connection: Upgrade\r\n', 'Upgrade: WebSocket\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # Origin comes before Host 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Origin: http://example.com\r\n', 'Host: example.com\r\n', '\r\n', ), ( # Host continued to the next line 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example\r\n', ' .com\r\n', 'Origin: http://example.com\r\n', '\r\n', ), ( # Cookie comes before WebSocket-Protocol 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'Cookie: xyz\r\n' 'WebSocket-Protocol: sample\r\n', '\r\n', ), ( # Unknown header 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'Content-Type: text/html\r\n' '\r\n', ), ( # Cookie with continuation lines 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'Cookie: xyz\r\n', ' abc\r\n', ' defg\r\n', '\r\n', ), ( # Wrong-case cookie 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'cookie: abc/xyz\r\n' '\r\n', ), ( # Cookie, no space after colon 'GET /demo HTTP/1.1\r\n', 'Upgrade: WebSocket\r\n', 'Connection: Upgrade\r\n', 'Host: example.com\r\n', 'Origin: http://example.com\r\n', 'Cookie:abc/xyz\r\n' '\r\n', ), ) def _create_request(request_def): conn = mock.MockConn('') conn.local_addr = ('0.0.0.0', request_def[0]) return mock.MockRequest( uri=request_def[1], headers_in=request_def[2], connection=conn) def _create_get_memorized_lines(lines): """Creates a function that returns the given string.""" def get_memorized_lines(): return lines return get_memorized_lines def _create_requests_with_lines(request_lines_set): requests = [] for lines in request_lines_set: request = _create_request(_GOOD_REQUEST) request.connection.get_memorized_lines = _create_get_memorized_lines( lines) requests.append(request) return requests class HandshakerTest(unittest.TestCase): """A unittest for draft75 module.""" def test_good_request_default_port(self): request = _create_request(_GOOD_REQUEST) handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual(_GOOD_RESPONSE_DEFAULT_PORT, request.connection.written_data()) self.assertEqual('/demo', request.ws_resource) self.assertEqual('http://example.com', request.ws_origin) self.assertEqual('ws://example.com/demo', request.ws_location) self.assertEqual('sample', request.ws_protocol) def test_good_request_secure_default_port(self): request = _create_request(_GOOD_REQUEST) request.connection.local_addr = ('0.0.0.0', 443) request.is_https_ = True handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual(_GOOD_RESPONSE_SECURE, request.connection.written_data()) self.assertEqual('sample', request.ws_protocol) def test_good_request_nondefault_port(self): request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT) handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual(_GOOD_RESPONSE_NONDEFAULT_PORT, request.connection.written_data()) self.assertEqual('sample', request.ws_protocol) def test_good_request_secure_non_default_port(self): request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT) request.is_https_ = True handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual(_GOOD_RESPONSE_SECURE_NONDEF, request.connection.written_data()) self.assertEqual('sample', request.ws_protocol) def test_good_request_default_no_protocol(self): request = _create_request(_GOOD_REQUEST_NO_PROTOCOL) handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual(_GOOD_RESPONSE_NO_PROTOCOL, request.connection.written_data()) self.assertEqual(None, request.ws_protocol) def test_good_request_optional_headers(self): request = _create_request(_GOOD_REQUEST_WITH_OPTIONAL_HEADERS) handshaker = handshake.Handshaker(request, mock.MockDispatcher()) handshaker.do_handshake() self.assertEqual('AValue', request.headers_in['AKey']) self.assertEqual('', request.headers_in['EmptyValue']) def test_bad_requests(self): for request in map(_create_request, _BAD_REQUESTS): handshaker = handshake.Handshaker(request, mock.MockDispatcher()) self.assertRaises( handshake.HandshakeException, handshaker.do_handshake) def test_strictly_good_requests(self): for request in _create_requests_with_lines(_STRICTLY_GOOD_REQUESTS): strict_handshaker = handshake.Handshaker(request, mock.MockDispatcher(), True) strict_handshaker.do_handshake() def test_not_strictly_good_requests(self): for request in _create_requests_with_lines( _NOT_STRICTLY_GOOD_REQUESTS): strict_handshaker = handshake.Handshaker(request, mock.MockDispatcher(), True) self.assertRaises(handshake.HandshakeException, strict_handshaker.do_handshake) if __name__ == '__main__': unittest.main() # vi:sts=4 sw=4 et