1from unittest import mock 2from test import support 3from test.support import socket_helper 4from test.support import warnings_helper 5from test.test_httpservers import NoLogRequestHandler 6from unittest import TestCase 7from wsgiref.util import setup_testing_defaults 8from wsgiref.headers import Headers 9from wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler 10from wsgiref import util 11from wsgiref.validate import validator 12from wsgiref.simple_server import WSGIServer, WSGIRequestHandler 13from wsgiref.simple_server import make_server 14from http.client import HTTPConnection 15from io import StringIO, BytesIO, BufferedReader 16from socketserver import BaseServer 17from platform import python_implementation 18 19import os 20import re 21import signal 22import sys 23import threading 24import unittest 25 26 27class MockServer(WSGIServer): 28 """Non-socket HTTP server""" 29 30 def __init__(self, server_address, RequestHandlerClass): 31 BaseServer.__init__(self, server_address, RequestHandlerClass) 32 self.server_bind() 33 34 def server_bind(self): 35 host, port = self.server_address 36 self.server_name = host 37 self.server_port = port 38 self.setup_environ() 39 40 41class MockHandler(WSGIRequestHandler): 42 """Non-socket HTTP handler""" 43 def setup(self): 44 self.connection = self.request 45 self.rfile, self.wfile = self.connection 46 47 def finish(self): 48 pass 49 50 51def hello_app(environ,start_response): 52 start_response("200 OK", [ 53 ('Content-Type','text/plain'), 54 ('Date','Mon, 05 Jun 2006 18:49:54 GMT') 55 ]) 56 return [b"Hello, world!"] 57 58 59def header_app(environ, start_response): 60 start_response("200 OK", [ 61 ('Content-Type', 'text/plain'), 62 ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT') 63 ]) 64 return [';'.join([ 65 environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'], 66 environ['PATH_INFO'] 67 ]).encode('iso-8859-1')] 68 69 70def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): 71 server = make_server("", 80, app, MockServer, MockHandler) 72 inp = BufferedReader(BytesIO(data)) 73 out = BytesIO() 74 olderr = sys.stderr 75 err = sys.stderr = StringIO() 76 77 try: 78 server.finish_request((inp, out), ("127.0.0.1",8888)) 79 finally: 80 sys.stderr = olderr 81 82 return out.getvalue(), err.getvalue() 83 84def compare_generic_iter(make_it,match): 85 """Utility to compare a generic 2.1/2.2+ iterator with an iterable 86 87 If running under Python 2.2+, this tests the iterator using iter()/next(), 88 as well as __getitem__. 'make_it' must be a function returning a fresh 89 iterator to be tested (since this may test the iterator twice).""" 90 91 it = make_it() 92 n = 0 93 for item in match: 94 if not it[n]==item: raise AssertionError 95 n+=1 96 try: 97 it[n] 98 except IndexError: 99 pass 100 else: 101 raise AssertionError("Too many items from __getitem__",it) 102 103 try: 104 iter, StopIteration 105 except NameError: 106 pass 107 else: 108 # Only test iter mode under 2.2+ 109 it = make_it() 110 if not iter(it) is it: raise AssertionError 111 for item in match: 112 if not next(it) == item: raise AssertionError 113 try: 114 next(it) 115 except StopIteration: 116 pass 117 else: 118 raise AssertionError("Too many items from .__next__()", it) 119 120 121class IntegrationTests(TestCase): 122 123 def check_hello(self, out, has_length=True): 124 pyver = (python_implementation() + "/" + 125 sys.version.split()[0]) 126 self.assertEqual(out, 127 ("HTTP/1.0 200 OK\r\n" 128 "Server: WSGIServer/0.2 " + pyver +"\r\n" 129 "Content-Type: text/plain\r\n" 130 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" + 131 (has_length and "Content-Length: 13\r\n" or "") + 132 "\r\n" 133 "Hello, world!").encode("iso-8859-1") 134 ) 135 136 def test_plain_hello(self): 137 out, err = run_amock() 138 self.check_hello(out) 139 140 def test_environ(self): 141 request = ( 142 b"GET /p%61th/?query=test HTTP/1.0\n" 143 b"X-Test-Header: Python test \n" 144 b"X-Test-Header: Python test 2\n" 145 b"Content-Length: 0\n\n" 146 ) 147 out, err = run_amock(header_app, request) 148 self.assertEqual( 149 out.splitlines()[-1], 150 b"Python test,Python test 2;query=test;/path/" 151 ) 152 153 def test_request_length(self): 154 out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n") 155 self.assertEqual(out.splitlines()[0], 156 b"HTTP/1.0 414 Request-URI Too Long") 157 158 def test_validated_hello(self): 159 out, err = run_amock(validator(hello_app)) 160 # the middleware doesn't support len(), so content-length isn't there 161 self.check_hello(out, has_length=False) 162 163 def test_simple_validation_error(self): 164 def bad_app(environ,start_response): 165 start_response("200 OK", ('Content-Type','text/plain')) 166 return ["Hello, world!"] 167 out, err = run_amock(validator(bad_app)) 168 self.assertTrue(out.endswith( 169 b"A server error occurred. Please contact the administrator." 170 )) 171 self.assertEqual( 172 err.splitlines()[-2], 173 "AssertionError: Headers (('Content-Type', 'text/plain')) must" 174 " be of type list: <class 'tuple'>" 175 ) 176 177 def test_status_validation_errors(self): 178 def create_bad_app(status): 179 def bad_app(environ, start_response): 180 start_response(status, [("Content-Type", "text/plain; charset=utf-8")]) 181 return [b"Hello, world!"] 182 return bad_app 183 184 tests = [ 185 ('200', 'AssertionError: Status must be at least 4 characters'), 186 ('20X OK', 'AssertionError: Status message must begin w/3-digit code'), 187 ('200OK', 'AssertionError: Status message must have a space after code'), 188 ] 189 190 for status, exc_message in tests: 191 with self.subTest(status=status): 192 out, err = run_amock(create_bad_app(status)) 193 self.assertTrue(out.endswith( 194 b"A server error occurred. Please contact the administrator." 195 )) 196 self.assertEqual(err.splitlines()[-2], exc_message) 197 198 def test_wsgi_input(self): 199 def bad_app(e,s): 200 e["wsgi.input"].read() 201 s("200 OK", [("Content-Type", "text/plain; charset=utf-8")]) 202 return [b"data"] 203 out, err = run_amock(validator(bad_app)) 204 self.assertTrue(out.endswith( 205 b"A server error occurred. Please contact the administrator." 206 )) 207 self.assertEqual( 208 err.splitlines()[-2], "AssertionError" 209 ) 210 211 def test_bytes_validation(self): 212 def app(e, s): 213 s("200 OK", [ 214 ("Content-Type", "text/plain; charset=utf-8"), 215 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), 216 ]) 217 return [b"data"] 218 out, err = run_amock(validator(app)) 219 self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n')) 220 ver = sys.version.split()[0].encode('ascii') 221 py = python_implementation().encode('ascii') 222 pyver = py + b"/" + ver 223 self.assertEqual( 224 b"HTTP/1.0 200 OK\r\n" 225 b"Server: WSGIServer/0.2 "+ pyver + b"\r\n" 226 b"Content-Type: text/plain; charset=utf-8\r\n" 227 b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n" 228 b"\r\n" 229 b"data", 230 out) 231 232 def test_cp1252_url(self): 233 def app(e, s): 234 s("200 OK", [ 235 ("Content-Type", "text/plain"), 236 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), 237 ]) 238 # PEP3333 says environ variables are decoded as latin1. 239 # Encode as latin1 to get original bytes 240 return [e["PATH_INFO"].encode("latin1")] 241 242 out, err = run_amock( 243 validator(app), data=b"GET /\x80%80 HTTP/1.0") 244 self.assertEqual( 245 [ 246 b"HTTP/1.0 200 OK", 247 mock.ANY, 248 b"Content-Type: text/plain", 249 b"Date: Wed, 24 Dec 2008 13:29:32 GMT", 250 b"", 251 b"/\x80\x80", 252 ], 253 out.splitlines()) 254 255 def test_interrupted_write(self): 256 # BaseHandler._write() and _flush() have to write all data, even if 257 # it takes multiple send() calls. Test this by interrupting a send() 258 # call with a Unix signal. 259 pthread_kill = support.get_attribute(signal, "pthread_kill") 260 261 def app(environ, start_response): 262 start_response("200 OK", []) 263 return [b'\0' * support.SOCK_MAX_SIZE] 264 265 class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): 266 pass 267 268 server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler) 269 self.addCleanup(server.server_close) 270 interrupted = threading.Event() 271 272 def signal_handler(signum, frame): 273 interrupted.set() 274 275 original = signal.signal(signal.SIGUSR1, signal_handler) 276 self.addCleanup(signal.signal, signal.SIGUSR1, original) 277 received = None 278 main_thread = threading.get_ident() 279 280 def run_client(): 281 http = HTTPConnection(*server.server_address) 282 http.request("GET", "/") 283 with http.getresponse() as response: 284 response.read(100) 285 # The main thread should now be blocking in a send() system 286 # call. But in theory, it could get interrupted by other 287 # signals, and then retried. So keep sending the signal in a 288 # loop, in case an earlier signal happens to be delivered at 289 # an inconvenient moment. 290 while True: 291 pthread_kill(main_thread, signal.SIGUSR1) 292 if interrupted.wait(timeout=float(1)): 293 break 294 nonlocal received 295 received = len(response.read()) 296 http.close() 297 298 background = threading.Thread(target=run_client) 299 background.start() 300 server.handle_request() 301 background.join() 302 self.assertEqual(received, support.SOCK_MAX_SIZE - 100) 303 304 305class UtilityTests(TestCase): 306 307 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out): 308 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in} 309 util.setup_testing_defaults(env) 310 self.assertEqual(util.shift_path_info(env),part) 311 self.assertEqual(env['PATH_INFO'],pi_out) 312 self.assertEqual(env['SCRIPT_NAME'],sn_out) 313 return env 314 315 def checkDefault(self, key, value, alt=None): 316 # Check defaulting when empty 317 env = {} 318 util.setup_testing_defaults(env) 319 if isinstance(value, StringIO): 320 self.assertIsInstance(env[key], StringIO) 321 elif isinstance(value,BytesIO): 322 self.assertIsInstance(env[key],BytesIO) 323 else: 324 self.assertEqual(env[key], value) 325 326 # Check existing value 327 env = {key:alt} 328 util.setup_testing_defaults(env) 329 self.assertIs(env[key], alt) 330 331 def checkCrossDefault(self,key,value,**kw): 332 util.setup_testing_defaults(kw) 333 self.assertEqual(kw[key],value) 334 335 def checkAppURI(self,uri,**kw): 336 util.setup_testing_defaults(kw) 337 self.assertEqual(util.application_uri(kw),uri) 338 339 def checkReqURI(self,uri,query=1,**kw): 340 util.setup_testing_defaults(kw) 341 self.assertEqual(util.request_uri(kw,query),uri) 342 343 @warnings_helper.ignore_warnings(category=DeprecationWarning) 344 def checkFW(self,text,size,match): 345 346 def make_it(text=text,size=size): 347 return util.FileWrapper(StringIO(text),size) 348 349 compare_generic_iter(make_it,match) 350 351 it = make_it() 352 self.assertFalse(it.filelike.closed) 353 354 for item in it: 355 pass 356 357 self.assertFalse(it.filelike.closed) 358 359 it.close() 360 self.assertTrue(it.filelike.closed) 361 362 def test_filewrapper_getitem_deprecation(self): 363 wrapper = util.FileWrapper(StringIO('foobar'), 3) 364 with self.assertWarnsRegex(DeprecationWarning, 365 r'Use iterator protocol instead'): 366 # This should have returned 'bar'. 367 self.assertEqual(wrapper[1], 'foo') 368 369 def testSimpleShifts(self): 370 self.checkShift('','/', '', '/', '') 371 self.checkShift('','/x', 'x', '/x', '') 372 self.checkShift('/','', None, '/', '') 373 self.checkShift('/a','/x/y', 'x', '/a/x', '/y') 374 self.checkShift('/a','/x/', 'x', '/a/x', '/') 375 376 def testNormalizedShifts(self): 377 self.checkShift('/a/b', '/../y', '..', '/a', '/y') 378 self.checkShift('', '/../y', '..', '', '/y') 379 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '') 380 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/') 381 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '') 382 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/') 383 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/') 384 self.checkShift('/a/b', '///', '', '/a/b/', '') 385 self.checkShift('/a/b', '/.//', '', '/a/b/', '') 386 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/') 387 self.checkShift('/a/b', '/.', None, '/a/b', '') 388 389 def testDefaults(self): 390 for key, value in [ 391 ('SERVER_NAME','127.0.0.1'), 392 ('SERVER_PORT', '80'), 393 ('SERVER_PROTOCOL','HTTP/1.0'), 394 ('HTTP_HOST','127.0.0.1'), 395 ('REQUEST_METHOD','GET'), 396 ('SCRIPT_NAME',''), 397 ('PATH_INFO','/'), 398 ('wsgi.version', (1,0)), 399 ('wsgi.run_once', 0), 400 ('wsgi.multithread', 0), 401 ('wsgi.multiprocess', 0), 402 ('wsgi.input', BytesIO()), 403 ('wsgi.errors', StringIO()), 404 ('wsgi.url_scheme','http'), 405 ]: 406 self.checkDefault(key,value) 407 408 def testCrossDefaults(self): 409 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar") 410 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on") 411 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1") 412 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes") 413 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo") 414 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo") 415 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on") 416 417 def testGuessScheme(self): 418 self.assertEqual(util.guess_scheme({}), "http") 419 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http") 420 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https") 421 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https") 422 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https") 423 424 def testAppURIs(self): 425 self.checkAppURI("http://127.0.0.1/") 426 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") 427 self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m") 428 self.checkAppURI("http://spam.example.com:2071/", 429 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071") 430 self.checkAppURI("http://spam.example.com/", 431 SERVER_NAME="spam.example.com") 432 self.checkAppURI("http://127.0.0.1/", 433 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com") 434 self.checkAppURI("https://127.0.0.1/", HTTPS="on") 435 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000", 436 HTTP_HOST=None) 437 438 def testReqURIs(self): 439 self.checkReqURI("http://127.0.0.1/") 440 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") 441 self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m") 442 self.checkReqURI("http://127.0.0.1/spammity/spam", 443 SCRIPT_NAME="/spammity", PATH_INFO="/spam") 444 self.checkReqURI("http://127.0.0.1/spammity/sp%E4m", 445 SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m") 446 self.checkReqURI("http://127.0.0.1/spammity/spam;ham", 447 SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham") 448 self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678", 449 SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678") 450 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni", 451 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") 452 self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni", 453 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni") 454 self.checkReqURI("http://127.0.0.1/spammity/spam", 0, 455 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") 456 457 def testFileWrapper(self): 458 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10]) 459 460 def testHopByHop(self): 461 for hop in ( 462 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization " 463 "TE Trailers Transfer-Encoding Upgrade" 464 ).split(): 465 for alt in hop, hop.title(), hop.upper(), hop.lower(): 466 self.assertTrue(util.is_hop_by_hop(alt)) 467 468 # Not comprehensive, just a few random header names 469 for hop in ( 470 "Accept Cache-Control Date Pragma Trailer Via Warning" 471 ).split(): 472 for alt in hop, hop.title(), hop.upper(), hop.lower(): 473 self.assertFalse(util.is_hop_by_hop(alt)) 474 475class HeaderTests(TestCase): 476 477 def testMappingInterface(self): 478 test = [('x','y')] 479 self.assertEqual(len(Headers()), 0) 480 self.assertEqual(len(Headers([])),0) 481 self.assertEqual(len(Headers(test[:])),1) 482 self.assertEqual(Headers(test[:]).keys(), ['x']) 483 self.assertEqual(Headers(test[:]).values(), ['y']) 484 self.assertEqual(Headers(test[:]).items(), test) 485 self.assertIsNot(Headers(test).items(), test) # must be copy! 486 487 h = Headers() 488 del h['foo'] # should not raise an error 489 490 h['Foo'] = 'bar' 491 for m in h.__contains__, h.get, h.get_all, h.__getitem__: 492 self.assertTrue(m('foo')) 493 self.assertTrue(m('Foo')) 494 self.assertTrue(m('FOO')) 495 self.assertFalse(m('bar')) 496 497 self.assertEqual(h['foo'],'bar') 498 h['foo'] = 'baz' 499 self.assertEqual(h['FOO'],'baz') 500 self.assertEqual(h.get_all('foo'),['baz']) 501 502 self.assertEqual(h.get("foo","whee"), "baz") 503 self.assertEqual(h.get("zoo","whee"), "whee") 504 self.assertEqual(h.setdefault("foo","whee"), "baz") 505 self.assertEqual(h.setdefault("zoo","whee"), "whee") 506 self.assertEqual(h["foo"],"baz") 507 self.assertEqual(h["zoo"],"whee") 508 509 def testRequireList(self): 510 self.assertRaises(TypeError, Headers, "foo") 511 512 def testExtras(self): 513 h = Headers() 514 self.assertEqual(str(h),'\r\n') 515 516 h.add_header('foo','bar',baz="spam") 517 self.assertEqual(h['foo'], 'bar; baz="spam"') 518 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n') 519 520 h.add_header('Foo','bar',cheese=None) 521 self.assertEqual(h.get_all('foo'), 522 ['bar; baz="spam"', 'bar; cheese']) 523 524 self.assertEqual(str(h), 525 'foo: bar; baz="spam"\r\n' 526 'Foo: bar; cheese\r\n' 527 '\r\n' 528 ) 529 530class ErrorHandler(BaseCGIHandler): 531 """Simple handler subclass for testing BaseHandler""" 532 533 # BaseHandler records the OS environment at import time, but envvars 534 # might have been changed later by other tests, which trips up 535 # HandlerTests.testEnviron(). 536 os_environ = dict(os.environ.items()) 537 538 def __init__(self,**kw): 539 setup_testing_defaults(kw) 540 BaseCGIHandler.__init__( 541 self, BytesIO(), BytesIO(), StringIO(), kw, 542 multithread=True, multiprocess=True 543 ) 544 545class TestHandler(ErrorHandler): 546 """Simple handler subclass for testing BaseHandler, w/error passthru""" 547 548 def handle_error(self): 549 raise # for testing, we want to see what's happening 550 551 552class HandlerTests(TestCase): 553 # testEnviron() can produce long error message 554 maxDiff = 80 * 50 555 556 def testEnviron(self): 557 os_environ = { 558 # very basic environment 559 'HOME': '/my/home', 560 'PATH': '/my/path', 561 'LANG': 'fr_FR.UTF-8', 562 563 # set some WSGI variables 564 'SCRIPT_NAME': 'test_script_name', 565 'SERVER_NAME': 'test_server_name', 566 } 567 568 with support.swap_attr(TestHandler, 'os_environ', os_environ): 569 # override X and HOME variables 570 handler = TestHandler(X="Y", HOME="/override/home") 571 handler.setup_environ() 572 573 # Check that wsgi_xxx attributes are copied to wsgi.xxx variables 574 # of handler.environ 575 for attr in ('version', 'multithread', 'multiprocess', 'run_once', 576 'file_wrapper'): 577 self.assertEqual(getattr(handler, 'wsgi_' + attr), 578 handler.environ['wsgi.' + attr]) 579 580 # Test handler.environ as a dict 581 expected = {} 582 setup_testing_defaults(expected) 583 # Handler inherits os_environ variables which are not overridden 584 # by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env) 585 for key, value in os_environ.items(): 586 if key not in expected: 587 expected[key] = value 588 expected.update({ 589 # X doesn't exist in os_environ 590 "X": "Y", 591 # HOME is overridden by TestHandler 592 'HOME': "/override/home", 593 594 # overridden by setup_testing_defaults() 595 "SCRIPT_NAME": "", 596 "SERVER_NAME": "127.0.0.1", 597 598 # set by BaseHandler.setup_environ() 599 'wsgi.input': handler.get_stdin(), 600 'wsgi.errors': handler.get_stderr(), 601 'wsgi.version': (1, 0), 602 'wsgi.run_once': False, 603 'wsgi.url_scheme': 'http', 604 'wsgi.multithread': True, 605 'wsgi.multiprocess': True, 606 'wsgi.file_wrapper': util.FileWrapper, 607 }) 608 self.assertDictEqual(handler.environ, expected) 609 610 def testCGIEnviron(self): 611 h = BaseCGIHandler(None,None,None,{}) 612 h.setup_environ() 613 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors': 614 self.assertIn(key, h.environ) 615 616 def testScheme(self): 617 h=TestHandler(HTTPS="on"); h.setup_environ() 618 self.assertEqual(h.environ['wsgi.url_scheme'],'https') 619 h=TestHandler(); h.setup_environ() 620 self.assertEqual(h.environ['wsgi.url_scheme'],'http') 621 622 def testAbstractMethods(self): 623 h = BaseHandler() 624 for name in [ 625 '_flush','get_stdin','get_stderr','add_cgi_vars' 626 ]: 627 self.assertRaises(NotImplementedError, getattr(h,name)) 628 self.assertRaises(NotImplementedError, h._write, "test") 629 630 def testContentLength(self): 631 # Demo one reason iteration is better than write()... ;) 632 633 def trivial_app1(e,s): 634 s('200 OK',[]) 635 return [e['wsgi.url_scheme'].encode('iso-8859-1')] 636 637 def trivial_app2(e,s): 638 s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1')) 639 return [] 640 641 def trivial_app3(e,s): 642 s('200 OK',[]) 643 return ['\u0442\u0435\u0441\u0442'.encode("utf-8")] 644 645 def trivial_app4(e,s): 646 # Simulate a response to a HEAD request 647 s('200 OK',[('Content-Length', '12345')]) 648 return [] 649 650 h = TestHandler() 651 h.run(trivial_app1) 652 self.assertEqual(h.stdout.getvalue(), 653 ("Status: 200 OK\r\n" 654 "Content-Length: 4\r\n" 655 "\r\n" 656 "http").encode("iso-8859-1")) 657 658 h = TestHandler() 659 h.run(trivial_app2) 660 self.assertEqual(h.stdout.getvalue(), 661 ("Status: 200 OK\r\n" 662 "\r\n" 663 "http").encode("iso-8859-1")) 664 665 h = TestHandler() 666 h.run(trivial_app3) 667 self.assertEqual(h.stdout.getvalue(), 668 b'Status: 200 OK\r\n' 669 b'Content-Length: 8\r\n' 670 b'\r\n' 671 b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82') 672 673 h = TestHandler() 674 h.run(trivial_app4) 675 self.assertEqual(h.stdout.getvalue(), 676 b'Status: 200 OK\r\n' 677 b'Content-Length: 12345\r\n' 678 b'\r\n') 679 680 def testBasicErrorOutput(self): 681 682 def non_error_app(e,s): 683 s('200 OK',[]) 684 return [] 685 686 def error_app(e,s): 687 raise AssertionError("This should be caught by handler") 688 689 h = ErrorHandler() 690 h.run(non_error_app) 691 self.assertEqual(h.stdout.getvalue(), 692 ("Status: 200 OK\r\n" 693 "Content-Length: 0\r\n" 694 "\r\n").encode("iso-8859-1")) 695 self.assertEqual(h.stderr.getvalue(),"") 696 697 h = ErrorHandler() 698 h.run(error_app) 699 self.assertEqual(h.stdout.getvalue(), 700 ("Status: %s\r\n" 701 "Content-Type: text/plain\r\n" 702 "Content-Length: %d\r\n" 703 "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1') 704 + h.error_body) 705 706 self.assertIn("AssertionError", h.stderr.getvalue()) 707 708 def testErrorAfterOutput(self): 709 MSG = b"Some output has been sent" 710 def error_app(e,s): 711 s("200 OK",[])(MSG) 712 raise AssertionError("This should be caught by handler") 713 714 h = ErrorHandler() 715 h.run(error_app) 716 self.assertEqual(h.stdout.getvalue(), 717 ("Status: 200 OK\r\n" 718 "\r\n".encode("iso-8859-1")+MSG)) 719 self.assertIn("AssertionError", h.stderr.getvalue()) 720 721 def testHeaderFormats(self): 722 723 def non_error_app(e,s): 724 s('200 OK',[]) 725 return [] 726 727 stdpat = ( 728 r"HTTP/%s 200 OK\r\n" 729 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n" 730 r"%s" r"Content-Length: 0\r\n" r"\r\n" 731 ) 732 shortpat = ( 733 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n" 734 ).encode("iso-8859-1") 735 736 for ssw in "FooBar/1.0", None: 737 sw = ssw and "Server: %s\r\n" % ssw or "" 738 739 for version in "1.0", "1.1": 740 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1": 741 742 h = TestHandler(SERVER_PROTOCOL=proto) 743 h.origin_server = False 744 h.http_version = version 745 h.server_software = ssw 746 h.run(non_error_app) 747 self.assertEqual(shortpat,h.stdout.getvalue()) 748 749 h = TestHandler(SERVER_PROTOCOL=proto) 750 h.origin_server = True 751 h.http_version = version 752 h.server_software = ssw 753 h.run(non_error_app) 754 if proto=="HTTP/0.9": 755 self.assertEqual(h.stdout.getvalue(),b"") 756 else: 757 self.assertTrue( 758 re.match((stdpat%(version,sw)).encode("iso-8859-1"), 759 h.stdout.getvalue()), 760 ((stdpat%(version,sw)).encode("iso-8859-1"), 761 h.stdout.getvalue()) 762 ) 763 764 def testBytesData(self): 765 def app(e, s): 766 s("200 OK", [ 767 ("Content-Type", "text/plain; charset=utf-8"), 768 ]) 769 return [b"data"] 770 771 h = TestHandler() 772 h.run(app) 773 self.assertEqual(b"Status: 200 OK\r\n" 774 b"Content-Type: text/plain; charset=utf-8\r\n" 775 b"Content-Length: 4\r\n" 776 b"\r\n" 777 b"data", 778 h.stdout.getvalue()) 779 780 def testCloseOnError(self): 781 side_effects = {'close_called': False} 782 MSG = b"Some output has been sent" 783 def error_app(e,s): 784 s("200 OK",[])(MSG) 785 class CrashyIterable(object): 786 def __iter__(self): 787 while True: 788 yield b'blah' 789 raise AssertionError("This should be caught by handler") 790 def close(self): 791 side_effects['close_called'] = True 792 return CrashyIterable() 793 794 h = ErrorHandler() 795 h.run(error_app) 796 self.assertEqual(side_effects['close_called'], True) 797 798 def testPartialWrite(self): 799 written = bytearray() 800 801 class PartialWriter: 802 def write(self, b): 803 partial = b[:7] 804 written.extend(partial) 805 return len(partial) 806 807 def flush(self): 808 pass 809 810 environ = {"SERVER_PROTOCOL": "HTTP/1.0"} 811 h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ) 812 msg = "should not do partial writes" 813 with self.assertWarnsRegex(DeprecationWarning, msg): 814 h.run(hello_app) 815 self.assertEqual(b"HTTP/1.0 200 OK\r\n" 816 b"Content-Type: text/plain\r\n" 817 b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" 818 b"Content-Length: 13\r\n" 819 b"\r\n" 820 b"Hello, world!", 821 written) 822 823 def testClientConnectionTerminations(self): 824 environ = {"SERVER_PROTOCOL": "HTTP/1.0"} 825 for exception in ( 826 ConnectionAbortedError, 827 BrokenPipeError, 828 ConnectionResetError, 829 ): 830 with self.subTest(exception=exception): 831 class AbortingWriter: 832 def write(self, b): 833 raise exception 834 835 stderr = StringIO() 836 h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ) 837 h.run(hello_app) 838 839 self.assertFalse(stderr.getvalue()) 840 841 def testDontResetInternalStateOnException(self): 842 class CustomException(ValueError): 843 pass 844 845 # We are raising CustomException here to trigger an exception 846 # during the execution of SimpleHandler.finish_response(), so 847 # we can easily test that the internal state of the handler is 848 # preserved in case of an exception. 849 class AbortingWriter: 850 def write(self, b): 851 raise CustomException 852 853 stderr = StringIO() 854 environ = {"SERVER_PROTOCOL": "HTTP/1.0"} 855 h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ) 856 h.run(hello_app) 857 858 self.assertIn("CustomException", stderr.getvalue()) 859 860 # Test that the internal state of the handler is preserved. 861 self.assertIsNotNone(h.result) 862 self.assertIsNotNone(h.headers) 863 self.assertIsNotNone(h.status) 864 self.assertIsNotNone(h.environ) 865 866 867if __name__ == "__main__": 868 unittest.main() 869