1#!/usr/bin/env python 2# 3# Copyright 2011, Google Inc. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are 8# met: 9# 10# * Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# * Redistributions in binary form must reproduce the above 13# copyright notice, this list of conditions and the following disclaimer 14# in the documentation and/or other materials provided with the 15# distribution. 16# * Neither the name of Google Inc. nor the names of its 17# contributors may be used to endorse or promote products derived from 18# this software without specific prior written permission. 19# 20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 32 33"""Tests for dispatch module.""" 34 35 36import os 37import unittest 38 39import set_sys_path # Update sys.path to locate mod_pywebsocket module. 40 41from mod_pywebsocket import dispatch 42from mod_pywebsocket import handshake 43from test import mock 44 45 46_TEST_HANDLERS_DIR = os.path.join( 47 os.path.split(__file__)[0], 'testdata', 'handlers') 48 49_TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub') 50 51 52class DispatcherTest(unittest.TestCase): 53 """A unittest for dispatch module.""" 54 55 def test_normalize_path(self): 56 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'), 57 dispatch._normalize_path('/a/b')) 58 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'), 59 dispatch._normalize_path('\\a\\b')) 60 self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'), 61 dispatch._normalize_path('/a/c/../b')) 62 self.assertEqual(os.path.abspath('abc').replace('\\', '/'), 63 dispatch._normalize_path('abc')) 64 65 def test_converter(self): 66 converter = dispatch._create_path_to_resource_converter('/a/b') 67 # Python built by MSC inserts a drive name like 'C:\' via realpath(). 68 # Converter Generator expands provided path using realpath() and uses 69 # the path including a drive name to verify the prefix. 70 os_root = os.path.realpath('/') 71 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 72 self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py')) 73 self.assertEqual(None, converter(os_root + 'a/b/h.py')) 74 self.assertEqual(None, converter('a/b/h_wsh.py')) 75 76 converter = dispatch._create_path_to_resource_converter('a/b') 77 self.assertEqual('/h', converter(dispatch._normalize_path( 78 'a/b/h_wsh.py'))) 79 80 converter = dispatch._create_path_to_resource_converter('/a/b///') 81 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 82 self.assertEqual('/h', converter(dispatch._normalize_path( 83 '/a/b/../b/h_wsh.py'))) 84 85 converter = dispatch._create_path_to_resource_converter( 86 '/a/../a/b/../b/') 87 self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py')) 88 89 converter = dispatch._create_path_to_resource_converter(r'\a\b') 90 self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py')) 91 self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py')) 92 93 def test_enumerate_handler_file_paths(self): 94 paths = list( 95 dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR)) 96 paths.sort() 97 self.assertEqual(8, len(paths)) 98 expected_paths = [ 99 os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'), 100 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'), 101 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'), 102 os.path.join(_TEST_HANDLERS_DIR, 'sub', 103 'exception_in_transfer_wsh.py'), 104 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'), 105 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'), 106 os.path.join(_TEST_HANDLERS_DIR, 'sub', 107 'wrong_handshake_sig_wsh.py'), 108 os.path.join(_TEST_HANDLERS_DIR, 'sub', 109 'wrong_transfer_sig_wsh.py'), 110 ] 111 for expected, actual in zip(expected_paths, paths): 112 self.assertEqual(expected, actual) 113 114 def test_source_handler_file(self): 115 self.assertRaises( 116 dispatch.DispatchException, dispatch._source_handler_file, '') 117 self.assertRaises( 118 dispatch.DispatchException, dispatch._source_handler_file, 'def') 119 self.assertRaises( 120 dispatch.DispatchException, dispatch._source_handler_file, '1/0') 121 self.failUnless(dispatch._source_handler_file( 122 'def web_socket_do_extra_handshake(request):pass\n' 123 'def web_socket_transfer_data(request):pass\n')) 124 125 def test_source_warnings(self): 126 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 127 warnings = dispatcher.source_warnings() 128 warnings.sort() 129 expected_warnings = [ 130 (os.path.realpath(os.path.join( 131 _TEST_HANDLERS_DIR, 'blank_wsh.py')) + 132 ': web_socket_do_extra_handshake is not defined.'), 133 (os.path.realpath(os.path.join( 134 _TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py')) + 135 ': web_socket_do_extra_handshake is not callable.'), 136 (os.path.realpath(os.path.join( 137 _TEST_HANDLERS_DIR, 'sub', 'wrong_handshake_sig_wsh.py')) + 138 ': web_socket_do_extra_handshake is not defined.'), 139 (os.path.realpath(os.path.join( 140 _TEST_HANDLERS_DIR, 'sub', 'wrong_transfer_sig_wsh.py')) + 141 ': web_socket_transfer_data is not defined.'), 142 ] 143 self.assertEquals(4, len(warnings)) 144 for expected, actual in zip(expected_warnings, warnings): 145 self.assertEquals(expected, actual) 146 147 def test_do_extra_handshake(self): 148 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 149 request = mock.MockRequest() 150 request.ws_resource = '/origin_check' 151 request.ws_origin = 'http://example.com' 152 dispatcher.do_extra_handshake(request) # Must not raise exception. 153 154 request.ws_origin = 'http://bad.example.com' 155 try: 156 dispatcher.do_extra_handshake(request) 157 self.fail('Could not catch HandshakeException with 403 status') 158 except handshake.HandshakeException, e: 159 self.assertEquals(403, e.status) 160 except Exception, e: 161 self.fail('Unexpected exception: %r' % e) 162 163 def test_abort_extra_handshake(self): 164 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 165 request = mock.MockRequest() 166 request.ws_resource = '/abort_by_user' 167 self.assertRaises(handshake.AbortedByUserException, 168 dispatcher.do_extra_handshake, request) 169 170 def test_transfer_data(self): 171 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 172 173 request = mock.MockRequest(connection=mock.MockConn('\xff\x00')) 174 request.ws_resource = '/origin_check' 175 request.ws_protocol = 'p1' 176 dispatcher.transfer_data(request) 177 self.assertEqual('origin_check_wsh.py is called for /origin_check, p1' 178 '\xff\x00', 179 request.connection.written_data()) 180 181 request = mock.MockRequest(connection=mock.MockConn('\xff\x00')) 182 request.ws_resource = '/sub/plain' 183 request.ws_protocol = None 184 dispatcher.transfer_data(request) 185 self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None' 186 '\xff\x00', 187 request.connection.written_data()) 188 189 request = mock.MockRequest(connection=mock.MockConn('\xff\x00')) 190 request.ws_resource = '/sub/plain?' 191 request.ws_protocol = None 192 dispatcher.transfer_data(request) 193 self.assertEqual('sub/plain_wsh.py is called for /sub/plain?, None' 194 '\xff\x00', 195 request.connection.written_data()) 196 197 request = mock.MockRequest(connection=mock.MockConn('\xff\x00')) 198 request.ws_resource = '/sub/plain?q=v' 199 request.ws_protocol = None 200 dispatcher.transfer_data(request) 201 self.assertEqual('sub/plain_wsh.py is called for /sub/plain?q=v, None' 202 '\xff\x00', 203 request.connection.written_data()) 204 205 def test_transfer_data_no_handler(self): 206 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 207 for resource in ['/blank', '/sub/non_callable', 208 '/sub/no_wsh_at_the_end', '/does/not/exist']: 209 request = mock.MockRequest(connection=mock.MockConn('')) 210 request.ws_resource = resource 211 request.ws_protocol = 'p2' 212 try: 213 dispatcher.transfer_data(request) 214 self.fail() 215 except dispatch.DispatchException, e: 216 self.failUnless(str(e).find('No handler') != -1) 217 except Exception: 218 self.fail() 219 220 def test_transfer_data_handler_exception(self): 221 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 222 request = mock.MockRequest(connection=mock.MockConn('')) 223 request.ws_resource = '/sub/exception_in_transfer' 224 request.ws_protocol = 'p3' 225 try: 226 dispatcher.transfer_data(request) 227 self.fail() 228 except Exception, e: 229 self.failUnless(str(e).find('Intentional') != -1, 230 'Unexpected exception: %s' % e) 231 232 def test_abort_transfer_data(self): 233 dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 234 request = mock.MockRequest() 235 request.ws_resource = '/abort_by_user' 236 self.assertRaises(handshake.AbortedByUserException, 237 dispatcher.transfer_data, request) 238 239 def test_scan_dir(self): 240 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 241 self.assertEqual(4, len(disp._handler_suite_map)) 242 self.failUnless('/origin_check' in disp._handler_suite_map) 243 self.failUnless( 244 '/sub/exception_in_transfer' in disp._handler_suite_map) 245 self.failUnless('/sub/plain' in disp._handler_suite_map) 246 247 def test_scan_sub_dir(self): 248 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR) 249 self.assertEqual(2, len(disp._handler_suite_map)) 250 self.failIf('/origin_check' in disp._handler_suite_map) 251 self.failUnless( 252 '/sub/exception_in_transfer' in disp._handler_suite_map) 253 self.failUnless('/sub/plain' in disp._handler_suite_map) 254 255 def test_scan_sub_dir_as_root(self): 256 disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR, 257 _TEST_HANDLERS_SUB_DIR) 258 self.assertEqual(2, len(disp._handler_suite_map)) 259 self.failIf('/origin_check' in disp._handler_suite_map) 260 self.failIf('/sub/exception_in_transfer' in disp._handler_suite_map) 261 self.failIf('/sub/plain' in disp._handler_suite_map) 262 self.failUnless('/exception_in_transfer' in disp._handler_suite_map) 263 self.failUnless('/plain' in disp._handler_suite_map) 264 265 def test_scan_dir_must_under_root(self): 266 dispatch.Dispatcher('a/b', 'a/b/c') # OK 267 dispatch.Dispatcher('a/b///', 'a/b') # OK 268 self.assertRaises(dispatch.DispatchException, 269 dispatch.Dispatcher, 'a/b/c', 'a/b') 270 271 def test_resource_path_alias(self): 272 disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None) 273 disp.add_resource_path_alias('/', '/origin_check') 274 self.assertEqual(5, len(disp._handler_suite_map)) 275 self.failUnless('/origin_check' in disp._handler_suite_map) 276 self.failUnless( 277 '/sub/exception_in_transfer' in disp._handler_suite_map) 278 self.failUnless('/sub/plain' in disp._handler_suite_map) 279 self.failUnless('/' in disp._handler_suite_map) 280 self.assertRaises(dispatch.DispatchException, 281 disp.add_resource_path_alias, '/alias', '/not-exist') 282 283 284if __name__ == '__main__': 285 unittest.main() 286 287 288# vi:sts=4 sw=4 et 289