• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from unittest import mock
2from test import support
3from test.support import socket_helper
4from test.support import warnings_helper
5from test.test_httpservers import NoLogRequestHandler
6from unittest import TestCase
7from wsgiref.util import setup_testing_defaults
8from wsgiref.headers import Headers
9from wsgiref.handlers import BaseHandler, BaseCGIHandler, SimpleHandler
10from wsgiref import util
11from wsgiref.validate import validator
12from wsgiref.simple_server import WSGIServer, WSGIRequestHandler
13from wsgiref.simple_server import make_server
14from http.client import HTTPConnection
15from io import StringIO, BytesIO, BufferedReader
16from socketserver import BaseServer
17from platform import python_implementation
18
19import os
20import re
21import signal
22import sys
23import threading
24import unittest
25
26
27class MockServer(WSGIServer):
28    """Non-socket HTTP server"""
29
30    def __init__(self, server_address, RequestHandlerClass):
31        BaseServer.__init__(self, server_address, RequestHandlerClass)
32        self.server_bind()
33
34    def server_bind(self):
35        host, port = self.server_address
36        self.server_name = host
37        self.server_port = port
38        self.setup_environ()
39
40
41class MockHandler(WSGIRequestHandler):
42    """Non-socket HTTP handler"""
43    def setup(self):
44        self.connection = self.request
45        self.rfile, self.wfile = self.connection
46
47    def finish(self):
48        pass
49
50
51def hello_app(environ,start_response):
52    start_response("200 OK", [
53        ('Content-Type','text/plain'),
54        ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
55    ])
56    return [b"Hello, world!"]
57
58
59def header_app(environ, start_response):
60    start_response("200 OK", [
61        ('Content-Type', 'text/plain'),
62        ('Date', 'Mon, 05 Jun 2006 18:49:54 GMT')
63    ])
64    return [';'.join([
65        environ['HTTP_X_TEST_HEADER'], environ['QUERY_STRING'],
66        environ['PATH_INFO']
67    ]).encode('iso-8859-1')]
68
69
70def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
71    server = make_server("", 80, app, MockServer, MockHandler)
72    inp = BufferedReader(BytesIO(data))
73    out = BytesIO()
74    olderr = sys.stderr
75    err = sys.stderr = StringIO()
76
77    try:
78        server.finish_request((inp, out), ("127.0.0.1",8888))
79    finally:
80        sys.stderr = olderr
81
82    return out.getvalue(), err.getvalue()
83
84def compare_generic_iter(make_it,match):
85    """Utility to compare a generic 2.1/2.2+ iterator with an iterable
86
87    If running under Python 2.2+, this tests the iterator using iter()/next(),
88    as well as __getitem__.  'make_it' must be a function returning a fresh
89    iterator to be tested (since this may test the iterator twice)."""
90
91    it = make_it()
92    n = 0
93    for item in match:
94        if not it[n]==item: raise AssertionError
95        n+=1
96    try:
97        it[n]
98    except IndexError:
99        pass
100    else:
101        raise AssertionError("Too many items from __getitem__",it)
102
103    try:
104        iter, StopIteration
105    except NameError:
106        pass
107    else:
108        # Only test iter mode under 2.2+
109        it = make_it()
110        if not iter(it) is it: raise AssertionError
111        for item in match:
112            if not next(it) == item: raise AssertionError
113        try:
114            next(it)
115        except StopIteration:
116            pass
117        else:
118            raise AssertionError("Too many items from .__next__()", it)
119
120
121class IntegrationTests(TestCase):
122
123    def check_hello(self, out, has_length=True):
124        pyver = (python_implementation() + "/" +
125                sys.version.split()[0])
126        self.assertEqual(out,
127            ("HTTP/1.0 200 OK\r\n"
128            "Server: WSGIServer/0.2 " + pyver +"\r\n"
129            "Content-Type: text/plain\r\n"
130            "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
131            (has_length and  "Content-Length: 13\r\n" or "") +
132            "\r\n"
133            "Hello, world!").encode("iso-8859-1")
134        )
135
136    def test_plain_hello(self):
137        out, err = run_amock()
138        self.check_hello(out)
139
140    def test_environ(self):
141        request = (
142            b"GET /p%61th/?query=test HTTP/1.0\n"
143            b"X-Test-Header: Python test \n"
144            b"X-Test-Header: Python test 2\n"
145            b"Content-Length: 0\n\n"
146        )
147        out, err = run_amock(header_app, request)
148        self.assertEqual(
149            out.splitlines()[-1],
150            b"Python test,Python test 2;query=test;/path/"
151        )
152
153    def test_request_length(self):
154        out, err = run_amock(data=b"GET " + (b"x" * 65537) + b" HTTP/1.0\n\n")
155        self.assertEqual(out.splitlines()[0],
156                         b"HTTP/1.0 414 Request-URI Too Long")
157
158    def test_validated_hello(self):
159        out, err = run_amock(validator(hello_app))
160        # the middleware doesn't support len(), so content-length isn't there
161        self.check_hello(out, has_length=False)
162
163    def test_simple_validation_error(self):
164        def bad_app(environ,start_response):
165            start_response("200 OK", ('Content-Type','text/plain'))
166            return ["Hello, world!"]
167        out, err = run_amock(validator(bad_app))
168        self.assertTrue(out.endswith(
169            b"A server error occurred.  Please contact the administrator."
170        ))
171        self.assertEqual(
172            err.splitlines()[-2],
173            "AssertionError: Headers (('Content-Type', 'text/plain')) must"
174            " be of type list: <class 'tuple'>"
175        )
176
177    def test_status_validation_errors(self):
178        def create_bad_app(status):
179            def bad_app(environ, start_response):
180                start_response(status, [("Content-Type", "text/plain; charset=utf-8")])
181                return [b"Hello, world!"]
182            return bad_app
183
184        tests = [
185            ('200', 'AssertionError: Status must be at least 4 characters'),
186            ('20X OK', 'AssertionError: Status message must begin w/3-digit code'),
187            ('200OK', 'AssertionError: Status message must have a space after code'),
188        ]
189
190        for status, exc_message in tests:
191            with self.subTest(status=status):
192                out, err = run_amock(create_bad_app(status))
193                self.assertTrue(out.endswith(
194                    b"A server error occurred.  Please contact the administrator."
195                ))
196                self.assertEqual(err.splitlines()[-2], exc_message)
197
198    def test_wsgi_input(self):
199        def bad_app(e,s):
200            e["wsgi.input"].read()
201            s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
202            return [b"data"]
203        out, err = run_amock(validator(bad_app))
204        self.assertTrue(out.endswith(
205            b"A server error occurred.  Please contact the administrator."
206        ))
207        self.assertEqual(
208            err.splitlines()[-2], "AssertionError"
209        )
210
211    def test_bytes_validation(self):
212        def app(e, s):
213            s("200 OK", [
214                ("Content-Type", "text/plain; charset=utf-8"),
215                ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
216                ])
217            return [b"data"]
218        out, err = run_amock(validator(app))
219        self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
220        ver = sys.version.split()[0].encode('ascii')
221        py  = python_implementation().encode('ascii')
222        pyver = py + b"/" + ver
223        self.assertEqual(
224                b"HTTP/1.0 200 OK\r\n"
225                b"Server: WSGIServer/0.2 "+ pyver + b"\r\n"
226                b"Content-Type: text/plain; charset=utf-8\r\n"
227                b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
228                b"\r\n"
229                b"data",
230                out)
231
232    def test_cp1252_url(self):
233        def app(e, s):
234            s("200 OK", [
235                ("Content-Type", "text/plain"),
236                ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
237                ])
238            # PEP3333 says environ variables are decoded as latin1.
239            # Encode as latin1 to get original bytes
240            return [e["PATH_INFO"].encode("latin1")]
241
242        out, err = run_amock(
243            validator(app), data=b"GET /\x80%80 HTTP/1.0")
244        self.assertEqual(
245            [
246                b"HTTP/1.0 200 OK",
247                mock.ANY,
248                b"Content-Type: text/plain",
249                b"Date: Wed, 24 Dec 2008 13:29:32 GMT",
250                b"",
251                b"/\x80\x80",
252            ],
253            out.splitlines())
254
255    def test_interrupted_write(self):
256        # BaseHandler._write() and _flush() have to write all data, even if
257        # it takes multiple send() calls.  Test this by interrupting a send()
258        # call with a Unix signal.
259        pthread_kill = support.get_attribute(signal, "pthread_kill")
260
261        def app(environ, start_response):
262            start_response("200 OK", [])
263            return [b'\0' * support.SOCK_MAX_SIZE]
264
265        class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler):
266            pass
267
268        server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler)
269        self.addCleanup(server.server_close)
270        interrupted = threading.Event()
271
272        def signal_handler(signum, frame):
273            interrupted.set()
274
275        original = signal.signal(signal.SIGUSR1, signal_handler)
276        self.addCleanup(signal.signal, signal.SIGUSR1, original)
277        received = None
278        main_thread = threading.get_ident()
279
280        def run_client():
281            http = HTTPConnection(*server.server_address)
282            http.request("GET", "/")
283            with http.getresponse() as response:
284                response.read(100)
285                # The main thread should now be blocking in a send() system
286                # call.  But in theory, it could get interrupted by other
287                # signals, and then retried.  So keep sending the signal in a
288                # loop, in case an earlier signal happens to be delivered at
289                # an inconvenient moment.
290                while True:
291                    pthread_kill(main_thread, signal.SIGUSR1)
292                    if interrupted.wait(timeout=float(1)):
293                        break
294                nonlocal received
295                received = len(response.read())
296            http.close()
297
298        background = threading.Thread(target=run_client)
299        background.start()
300        server.handle_request()
301        background.join()
302        self.assertEqual(received, support.SOCK_MAX_SIZE - 100)
303
304
305class UtilityTests(TestCase):
306
307    def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
308        env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
309        util.setup_testing_defaults(env)
310        self.assertEqual(util.shift_path_info(env),part)
311        self.assertEqual(env['PATH_INFO'],pi_out)
312        self.assertEqual(env['SCRIPT_NAME'],sn_out)
313        return env
314
315    def checkDefault(self, key, value, alt=None):
316        # Check defaulting when empty
317        env = {}
318        util.setup_testing_defaults(env)
319        if isinstance(value, StringIO):
320            self.assertIsInstance(env[key], StringIO)
321        elif isinstance(value,BytesIO):
322            self.assertIsInstance(env[key],BytesIO)
323        else:
324            self.assertEqual(env[key], value)
325
326        # Check existing value
327        env = {key:alt}
328        util.setup_testing_defaults(env)
329        self.assertIs(env[key], alt)
330
331    def checkCrossDefault(self,key,value,**kw):
332        util.setup_testing_defaults(kw)
333        self.assertEqual(kw[key],value)
334
335    def checkAppURI(self,uri,**kw):
336        util.setup_testing_defaults(kw)
337        self.assertEqual(util.application_uri(kw),uri)
338
339    def checkReqURI(self,uri,query=1,**kw):
340        util.setup_testing_defaults(kw)
341        self.assertEqual(util.request_uri(kw,query),uri)
342
343    @warnings_helper.ignore_warnings(category=DeprecationWarning)
344    def checkFW(self,text,size,match):
345
346        def make_it(text=text,size=size):
347            return util.FileWrapper(StringIO(text),size)
348
349        compare_generic_iter(make_it,match)
350
351        it = make_it()
352        self.assertFalse(it.filelike.closed)
353
354        for item in it:
355            pass
356
357        self.assertFalse(it.filelike.closed)
358
359        it.close()
360        self.assertTrue(it.filelike.closed)
361
362    def test_filewrapper_getitem_deprecation(self):
363        wrapper = util.FileWrapper(StringIO('foobar'), 3)
364        with self.assertWarnsRegex(DeprecationWarning,
365                                   r'Use iterator protocol instead'):
366            # This should have returned 'bar'.
367            self.assertEqual(wrapper[1], 'foo')
368
369    def testSimpleShifts(self):
370        self.checkShift('','/', '', '/', '')
371        self.checkShift('','/x', 'x', '/x', '')
372        self.checkShift('/','', None, '/', '')
373        self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
374        self.checkShift('/a','/x/',  'x', '/a/x', '/')
375
376    def testNormalizedShifts(self):
377        self.checkShift('/a/b', '/../y', '..', '/a', '/y')
378        self.checkShift('', '/../y', '..', '', '/y')
379        self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
380        self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
381        self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
382        self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
383        self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
384        self.checkShift('/a/b', '///', '', '/a/b/', '')
385        self.checkShift('/a/b', '/.//', '', '/a/b/', '')
386        self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
387        self.checkShift('/a/b', '/.', None, '/a/b', '')
388
389    def testDefaults(self):
390        for key, value in [
391            ('SERVER_NAME','127.0.0.1'),
392            ('SERVER_PORT', '80'),
393            ('SERVER_PROTOCOL','HTTP/1.0'),
394            ('HTTP_HOST','127.0.0.1'),
395            ('REQUEST_METHOD','GET'),
396            ('SCRIPT_NAME',''),
397            ('PATH_INFO','/'),
398            ('wsgi.version', (1,0)),
399            ('wsgi.run_once', 0),
400            ('wsgi.multithread', 0),
401            ('wsgi.multiprocess', 0),
402            ('wsgi.input', BytesIO()),
403            ('wsgi.errors', StringIO()),
404            ('wsgi.url_scheme','http'),
405        ]:
406            self.checkDefault(key,value)
407
408    def testCrossDefaults(self):
409        self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
410        self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
411        self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
412        self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
413        self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
414        self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
415        self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
416
417    def testGuessScheme(self):
418        self.assertEqual(util.guess_scheme({}), "http")
419        self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
420        self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
421        self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
422        self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
423
424    def testAppURIs(self):
425        self.checkAppURI("http://127.0.0.1/")
426        self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
427        self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
428        self.checkAppURI("http://spam.example.com:2071/",
429            HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
430        self.checkAppURI("http://spam.example.com/",
431            SERVER_NAME="spam.example.com")
432        self.checkAppURI("http://127.0.0.1/",
433            HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
434        self.checkAppURI("https://127.0.0.1/", HTTPS="on")
435        self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
436            HTTP_HOST=None)
437
438    def testReqURIs(self):
439        self.checkReqURI("http://127.0.0.1/")
440        self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
441        self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
442        self.checkReqURI("http://127.0.0.1/spammity/spam",
443            SCRIPT_NAME="/spammity", PATH_INFO="/spam")
444        self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
445            SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
446        self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
447            SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
448        self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
449            SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
450        self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
451            SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
452        self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
453            SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
454        self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
455            SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
456
457    def testFileWrapper(self):
458        self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
459
460    def testHopByHop(self):
461        for hop in (
462            "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
463            "TE Trailers Transfer-Encoding Upgrade"
464        ).split():
465            for alt in hop, hop.title(), hop.upper(), hop.lower():
466                self.assertTrue(util.is_hop_by_hop(alt))
467
468        # Not comprehensive, just a few random header names
469        for hop in (
470            "Accept Cache-Control Date Pragma Trailer Via Warning"
471        ).split():
472            for alt in hop, hop.title(), hop.upper(), hop.lower():
473                self.assertFalse(util.is_hop_by_hop(alt))
474
475class HeaderTests(TestCase):
476
477    def testMappingInterface(self):
478        test = [('x','y')]
479        self.assertEqual(len(Headers()), 0)
480        self.assertEqual(len(Headers([])),0)
481        self.assertEqual(len(Headers(test[:])),1)
482        self.assertEqual(Headers(test[:]).keys(), ['x'])
483        self.assertEqual(Headers(test[:]).values(), ['y'])
484        self.assertEqual(Headers(test[:]).items(), test)
485        self.assertIsNot(Headers(test).items(), test)  # must be copy!
486
487        h = Headers()
488        del h['foo']   # should not raise an error
489
490        h['Foo'] = 'bar'
491        for m in h.__contains__, h.get, h.get_all, h.__getitem__:
492            self.assertTrue(m('foo'))
493            self.assertTrue(m('Foo'))
494            self.assertTrue(m('FOO'))
495            self.assertFalse(m('bar'))
496
497        self.assertEqual(h['foo'],'bar')
498        h['foo'] = 'baz'
499        self.assertEqual(h['FOO'],'baz')
500        self.assertEqual(h.get_all('foo'),['baz'])
501
502        self.assertEqual(h.get("foo","whee"), "baz")
503        self.assertEqual(h.get("zoo","whee"), "whee")
504        self.assertEqual(h.setdefault("foo","whee"), "baz")
505        self.assertEqual(h.setdefault("zoo","whee"), "whee")
506        self.assertEqual(h["foo"],"baz")
507        self.assertEqual(h["zoo"],"whee")
508
509    def testRequireList(self):
510        self.assertRaises(TypeError, Headers, "foo")
511
512    def testExtras(self):
513        h = Headers()
514        self.assertEqual(str(h),'\r\n')
515
516        h.add_header('foo','bar',baz="spam")
517        self.assertEqual(h['foo'], 'bar; baz="spam"')
518        self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
519
520        h.add_header('Foo','bar',cheese=None)
521        self.assertEqual(h.get_all('foo'),
522            ['bar; baz="spam"', 'bar; cheese'])
523
524        self.assertEqual(str(h),
525            'foo: bar; baz="spam"\r\n'
526            'Foo: bar; cheese\r\n'
527            '\r\n'
528        )
529
530class ErrorHandler(BaseCGIHandler):
531    """Simple handler subclass for testing BaseHandler"""
532
533    # BaseHandler records the OS environment at import time, but envvars
534    # might have been changed later by other tests, which trips up
535    # HandlerTests.testEnviron().
536    os_environ = dict(os.environ.items())
537
538    def __init__(self,**kw):
539        setup_testing_defaults(kw)
540        BaseCGIHandler.__init__(
541            self, BytesIO(), BytesIO(), StringIO(), kw,
542            multithread=True, multiprocess=True
543        )
544
545class TestHandler(ErrorHandler):
546    """Simple handler subclass for testing BaseHandler, w/error passthru"""
547
548    def handle_error(self):
549        raise   # for testing, we want to see what's happening
550
551
552class HandlerTests(TestCase):
553    # testEnviron() can produce long error message
554    maxDiff = 80 * 50
555
556    def testEnviron(self):
557        os_environ = {
558            # very basic environment
559            'HOME': '/my/home',
560            'PATH': '/my/path',
561            'LANG': 'fr_FR.UTF-8',
562
563            # set some WSGI variables
564            'SCRIPT_NAME': 'test_script_name',
565            'SERVER_NAME': 'test_server_name',
566        }
567
568        with support.swap_attr(TestHandler, 'os_environ', os_environ):
569            # override X and HOME variables
570            handler = TestHandler(X="Y", HOME="/override/home")
571            handler.setup_environ()
572
573        # Check that wsgi_xxx attributes are copied to wsgi.xxx variables
574        # of handler.environ
575        for attr in ('version', 'multithread', 'multiprocess', 'run_once',
576                     'file_wrapper'):
577            self.assertEqual(getattr(handler, 'wsgi_' + attr),
578                             handler.environ['wsgi.' + attr])
579
580        # Test handler.environ as a dict
581        expected = {}
582        setup_testing_defaults(expected)
583        # Handler inherits os_environ variables which are not overridden
584        # by SimpleHandler.add_cgi_vars() (SimpleHandler.base_env)
585        for key, value in os_environ.items():
586            if key not in expected:
587                expected[key] = value
588        expected.update({
589            # X doesn't exist in os_environ
590            "X": "Y",
591            # HOME is overridden by TestHandler
592            'HOME': "/override/home",
593
594            # overridden by setup_testing_defaults()
595            "SCRIPT_NAME": "",
596            "SERVER_NAME": "127.0.0.1",
597
598            # set by BaseHandler.setup_environ()
599            'wsgi.input': handler.get_stdin(),
600            'wsgi.errors': handler.get_stderr(),
601            'wsgi.version': (1, 0),
602            'wsgi.run_once': False,
603            'wsgi.url_scheme': 'http',
604            'wsgi.multithread': True,
605            'wsgi.multiprocess': True,
606            'wsgi.file_wrapper': util.FileWrapper,
607        })
608        self.assertDictEqual(handler.environ, expected)
609
610    def testCGIEnviron(self):
611        h = BaseCGIHandler(None,None,None,{})
612        h.setup_environ()
613        for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
614            self.assertIn(key, h.environ)
615
616    def testScheme(self):
617        h=TestHandler(HTTPS="on"); h.setup_environ()
618        self.assertEqual(h.environ['wsgi.url_scheme'],'https')
619        h=TestHandler(); h.setup_environ()
620        self.assertEqual(h.environ['wsgi.url_scheme'],'http')
621
622    def testAbstractMethods(self):
623        h = BaseHandler()
624        for name in [
625            '_flush','get_stdin','get_stderr','add_cgi_vars'
626        ]:
627            self.assertRaises(NotImplementedError, getattr(h,name))
628        self.assertRaises(NotImplementedError, h._write, "test")
629
630    def testContentLength(self):
631        # Demo one reason iteration is better than write()...  ;)
632
633        def trivial_app1(e,s):
634            s('200 OK',[])
635            return [e['wsgi.url_scheme'].encode('iso-8859-1')]
636
637        def trivial_app2(e,s):
638            s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
639            return []
640
641        def trivial_app3(e,s):
642            s('200 OK',[])
643            return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
644
645        def trivial_app4(e,s):
646            # Simulate a response to a HEAD request
647            s('200 OK',[('Content-Length', '12345')])
648            return []
649
650        h = TestHandler()
651        h.run(trivial_app1)
652        self.assertEqual(h.stdout.getvalue(),
653            ("Status: 200 OK\r\n"
654            "Content-Length: 4\r\n"
655            "\r\n"
656            "http").encode("iso-8859-1"))
657
658        h = TestHandler()
659        h.run(trivial_app2)
660        self.assertEqual(h.stdout.getvalue(),
661            ("Status: 200 OK\r\n"
662            "\r\n"
663            "http").encode("iso-8859-1"))
664
665        h = TestHandler()
666        h.run(trivial_app3)
667        self.assertEqual(h.stdout.getvalue(),
668            b'Status: 200 OK\r\n'
669            b'Content-Length: 8\r\n'
670            b'\r\n'
671            b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
672
673        h = TestHandler()
674        h.run(trivial_app4)
675        self.assertEqual(h.stdout.getvalue(),
676            b'Status: 200 OK\r\n'
677            b'Content-Length: 12345\r\n'
678            b'\r\n')
679
680    def testBasicErrorOutput(self):
681
682        def non_error_app(e,s):
683            s('200 OK',[])
684            return []
685
686        def error_app(e,s):
687            raise AssertionError("This should be caught by handler")
688
689        h = ErrorHandler()
690        h.run(non_error_app)
691        self.assertEqual(h.stdout.getvalue(),
692            ("Status: 200 OK\r\n"
693            "Content-Length: 0\r\n"
694            "\r\n").encode("iso-8859-1"))
695        self.assertEqual(h.stderr.getvalue(),"")
696
697        h = ErrorHandler()
698        h.run(error_app)
699        self.assertEqual(h.stdout.getvalue(),
700            ("Status: %s\r\n"
701            "Content-Type: text/plain\r\n"
702            "Content-Length: %d\r\n"
703            "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
704            + h.error_body)
705
706        self.assertIn("AssertionError", h.stderr.getvalue())
707
708    def testErrorAfterOutput(self):
709        MSG = b"Some output has been sent"
710        def error_app(e,s):
711            s("200 OK",[])(MSG)
712            raise AssertionError("This should be caught by handler")
713
714        h = ErrorHandler()
715        h.run(error_app)
716        self.assertEqual(h.stdout.getvalue(),
717            ("Status: 200 OK\r\n"
718            "\r\n".encode("iso-8859-1")+MSG))
719        self.assertIn("AssertionError", h.stderr.getvalue())
720
721    def testHeaderFormats(self):
722
723        def non_error_app(e,s):
724            s('200 OK',[])
725            return []
726
727        stdpat = (
728            r"HTTP/%s 200 OK\r\n"
729            r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
730            r"%s" r"Content-Length: 0\r\n" r"\r\n"
731        )
732        shortpat = (
733            "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
734        ).encode("iso-8859-1")
735
736        for ssw in "FooBar/1.0", None:
737            sw = ssw and "Server: %s\r\n" % ssw or ""
738
739            for version in "1.0", "1.1":
740                for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
741
742                    h = TestHandler(SERVER_PROTOCOL=proto)
743                    h.origin_server = False
744                    h.http_version = version
745                    h.server_software = ssw
746                    h.run(non_error_app)
747                    self.assertEqual(shortpat,h.stdout.getvalue())
748
749                    h = TestHandler(SERVER_PROTOCOL=proto)
750                    h.origin_server = True
751                    h.http_version = version
752                    h.server_software = ssw
753                    h.run(non_error_app)
754                    if proto=="HTTP/0.9":
755                        self.assertEqual(h.stdout.getvalue(),b"")
756                    else:
757                        self.assertTrue(
758                            re.match((stdpat%(version,sw)).encode("iso-8859-1"),
759                                h.stdout.getvalue()),
760                            ((stdpat%(version,sw)).encode("iso-8859-1"),
761                                h.stdout.getvalue())
762                        )
763
764    def testBytesData(self):
765        def app(e, s):
766            s("200 OK", [
767                ("Content-Type", "text/plain; charset=utf-8"),
768                ])
769            return [b"data"]
770
771        h = TestHandler()
772        h.run(app)
773        self.assertEqual(b"Status: 200 OK\r\n"
774            b"Content-Type: text/plain; charset=utf-8\r\n"
775            b"Content-Length: 4\r\n"
776            b"\r\n"
777            b"data",
778            h.stdout.getvalue())
779
780    def testCloseOnError(self):
781        side_effects = {'close_called': False}
782        MSG = b"Some output has been sent"
783        def error_app(e,s):
784            s("200 OK",[])(MSG)
785            class CrashyIterable(object):
786                def __iter__(self):
787                    while True:
788                        yield b'blah'
789                        raise AssertionError("This should be caught by handler")
790                def close(self):
791                    side_effects['close_called'] = True
792            return CrashyIterable()
793
794        h = ErrorHandler()
795        h.run(error_app)
796        self.assertEqual(side_effects['close_called'], True)
797
798    def testPartialWrite(self):
799        written = bytearray()
800
801        class PartialWriter:
802            def write(self, b):
803                partial = b[:7]
804                written.extend(partial)
805                return len(partial)
806
807            def flush(self):
808                pass
809
810        environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
811        h = SimpleHandler(BytesIO(), PartialWriter(), sys.stderr, environ)
812        msg = "should not do partial writes"
813        with self.assertWarnsRegex(DeprecationWarning, msg):
814            h.run(hello_app)
815        self.assertEqual(b"HTTP/1.0 200 OK\r\n"
816            b"Content-Type: text/plain\r\n"
817            b"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n"
818            b"Content-Length: 13\r\n"
819            b"\r\n"
820            b"Hello, world!",
821            written)
822
823    def testClientConnectionTerminations(self):
824        environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
825        for exception in (
826            ConnectionAbortedError,
827            BrokenPipeError,
828            ConnectionResetError,
829        ):
830            with self.subTest(exception=exception):
831                class AbortingWriter:
832                    def write(self, b):
833                        raise exception
834
835                stderr = StringIO()
836                h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ)
837                h.run(hello_app)
838
839                self.assertFalse(stderr.getvalue())
840
841    def testDontResetInternalStateOnException(self):
842        class CustomException(ValueError):
843            pass
844
845        # We are raising CustomException here to trigger an exception
846        # during the execution of SimpleHandler.finish_response(), so
847        # we can easily test that the internal state of the handler is
848        # preserved in case of an exception.
849        class AbortingWriter:
850            def write(self, b):
851                raise CustomException
852
853        stderr = StringIO()
854        environ = {"SERVER_PROTOCOL": "HTTP/1.0"}
855        h = SimpleHandler(BytesIO(), AbortingWriter(), stderr, environ)
856        h.run(hello_app)
857
858        self.assertIn("CustomException", stderr.getvalue())
859
860        # Test that the internal state of the handler is preserved.
861        self.assertIsNotNone(h.result)
862        self.assertIsNotNone(h.headers)
863        self.assertIsNotNone(h.status)
864        self.assertIsNotNone(h.environ)
865
866
867if __name__ == "__main__":
868    unittest.main()
869