• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client, HTTPStatus
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13from unittest import mock
14TestCase = unittest.TestCase
15
16from test import support
17from test.support import os_helper
18from test.support import socket_helper
19from test.support import warnings_helper
20
21
22here = os.path.dirname(__file__)
23# Self-signed cert file for 'localhost'
24CERT_localhost = os.path.join(here, 'keycert.pem')
25# Self-signed cert file for 'fakehostname'
26CERT_fakehostname = os.path.join(here, 'keycert2.pem')
27# Self-signed cert file for self-signed.pythontest.net
28CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
29
30# constants for testing chunked encoding
31chunked_start = (
32    'HTTP/1.1 200 OK\r\n'
33    'Transfer-Encoding: chunked\r\n\r\n'
34    'a\r\n'
35    'hello worl\r\n'
36    '3\r\n'
37    'd! \r\n'
38    '8\r\n'
39    'and now \r\n'
40    '22\r\n'
41    'for something completely different\r\n'
42)
43chunked_expected = b'hello world! and now for something completely different'
44chunk_extension = ";foo=bar"
45last_chunk = "0\r\n"
46last_chunk_extended = "0" + chunk_extension + "\r\n"
47trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
48chunked_end = "\r\n"
49
50HOST = socket_helper.HOST
51
52class FakeSocket:
53    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
54        if isinstance(text, str):
55            text = text.encode("ascii")
56        self.text = text
57        self.fileclass = fileclass
58        self.data = b''
59        self.sendall_calls = 0
60        self.file_closed = False
61        self.host = host
62        self.port = port
63
64    def sendall(self, data):
65        self.sendall_calls += 1
66        self.data += data
67
68    def makefile(self, mode, bufsize=None):
69        if mode != 'r' and mode != 'rb':
70            raise client.UnimplementedFileMode()
71        # keep the file around so we can check how much was read from it
72        self.file = self.fileclass(self.text)
73        self.file.close = self.file_close #nerf close ()
74        return self.file
75
76    def file_close(self):
77        self.file_closed = True
78
79    def close(self):
80        pass
81
82    def setsockopt(self, level, optname, value):
83        pass
84
85class EPipeSocket(FakeSocket):
86
87    def __init__(self, text, pipe_trigger):
88        # When sendall() is called with pipe_trigger, raise EPIPE.
89        FakeSocket.__init__(self, text)
90        self.pipe_trigger = pipe_trigger
91
92    def sendall(self, data):
93        if self.pipe_trigger in data:
94            raise OSError(errno.EPIPE, "gotcha")
95        self.data += data
96
97    def close(self):
98        pass
99
100class NoEOFBytesIO(io.BytesIO):
101    """Like BytesIO, but raises AssertionError on EOF.
102
103    This is used below to test that http.client doesn't try to read
104    more from the underlying file than it should.
105    """
106    def read(self, n=-1):
107        data = io.BytesIO.read(self, n)
108        if data == b'':
109            raise AssertionError('caller tried to read past EOF')
110        return data
111
112    def readline(self, length=None):
113        data = io.BytesIO.readline(self, length)
114        if data == b'':
115            raise AssertionError('caller tried to read past EOF')
116        return data
117
118class FakeSocketHTTPConnection(client.HTTPConnection):
119    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
120
121    def __init__(self, *args):
122        self.connections = 0
123        super().__init__('example.com')
124        self.fake_socket_args = args
125        self._create_connection = self.create_connection
126
127    def connect(self):
128        """Count the number of times connect() is invoked"""
129        self.connections += 1
130        return super().connect()
131
132    def create_connection(self, *pos, **kw):
133        return FakeSocket(*self.fake_socket_args)
134
135class HeaderTests(TestCase):
136    def test_auto_headers(self):
137        # Some headers are added automatically, but should not be added by
138        # .request() if they are explicitly set.
139
140        class HeaderCountingBuffer(list):
141            def __init__(self):
142                self.count = {}
143            def append(self, item):
144                kv = item.split(b':')
145                if len(kv) > 1:
146                    # item is a 'Key: Value' header string
147                    lcKey = kv[0].decode('ascii').lower()
148                    self.count.setdefault(lcKey, 0)
149                    self.count[lcKey] += 1
150                list.append(self, item)
151
152        for explicit_header in True, False:
153            for header in 'Content-length', 'Host', 'Accept-encoding':
154                conn = client.HTTPConnection('example.com')
155                conn.sock = FakeSocket('blahblahblah')
156                conn._buffer = HeaderCountingBuffer()
157
158                body = 'spamspamspam'
159                headers = {}
160                if explicit_header:
161                    headers[header] = str(len(body))
162                conn.request('POST', '/', body, headers)
163                self.assertEqual(conn._buffer.count[header.lower()], 1)
164
165    def test_content_length_0(self):
166
167        class ContentLengthChecker(list):
168            def __init__(self):
169                list.__init__(self)
170                self.content_length = None
171            def append(self, item):
172                kv = item.split(b':', 1)
173                if len(kv) > 1 and kv[0].lower() == b'content-length':
174                    self.content_length = kv[1].strip()
175                list.append(self, item)
176
177        # Here, we're testing that methods expecting a body get a
178        # content-length set to zero if the body is empty (either None or '')
179        bodies = (None, '')
180        methods_with_body = ('PUT', 'POST', 'PATCH')
181        for method, body in itertools.product(methods_with_body, bodies):
182            conn = client.HTTPConnection('example.com')
183            conn.sock = FakeSocket(None)
184            conn._buffer = ContentLengthChecker()
185            conn.request(method, '/', body)
186            self.assertEqual(
187                conn._buffer.content_length, b'0',
188                'Header Content-Length incorrect on {}'.format(method)
189            )
190
191        # For these methods, we make sure that content-length is not set when
192        # the body is None because it might cause unexpected behaviour on the
193        # server.
194        methods_without_body = (
195             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
196        )
197        for method in methods_without_body:
198            conn = client.HTTPConnection('example.com')
199            conn.sock = FakeSocket(None)
200            conn._buffer = ContentLengthChecker()
201            conn.request(method, '/', None)
202            self.assertEqual(
203                conn._buffer.content_length, None,
204                'Header Content-Length set for empty body on {}'.format(method)
205            )
206
207        # If the body is set to '', that's considered to be "present but
208        # empty" rather than "missing", so content length would be set, even
209        # for methods that don't expect a body.
210        for method in methods_without_body:
211            conn = client.HTTPConnection('example.com')
212            conn.sock = FakeSocket(None)
213            conn._buffer = ContentLengthChecker()
214            conn.request(method, '/', '')
215            self.assertEqual(
216                conn._buffer.content_length, b'0',
217                'Header Content-Length incorrect on {}'.format(method)
218            )
219
220        # If the body is set, make sure Content-Length is set.
221        for method in itertools.chain(methods_without_body, methods_with_body):
222            conn = client.HTTPConnection('example.com')
223            conn.sock = FakeSocket(None)
224            conn._buffer = ContentLengthChecker()
225            conn.request(method, '/', ' ')
226            self.assertEqual(
227                conn._buffer.content_length, b'1',
228                'Header Content-Length incorrect on {}'.format(method)
229            )
230
231    def test_putheader(self):
232        conn = client.HTTPConnection('example.com')
233        conn.sock = FakeSocket(None)
234        conn.putrequest('GET','/')
235        conn.putheader('Content-length', 42)
236        self.assertIn(b'Content-length: 42', conn._buffer)
237
238        conn.putheader('Foo', ' bar ')
239        self.assertIn(b'Foo:  bar ', conn._buffer)
240        conn.putheader('Bar', '\tbaz\t')
241        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
242        conn.putheader('Authorization', 'Bearer mytoken')
243        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
244        conn.putheader('IterHeader', 'IterA', 'IterB')
245        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
246        conn.putheader('LatinHeader', b'\xFF')
247        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
248        conn.putheader('Utf8Header', b'\xc3\x80')
249        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
250        conn.putheader('C1-Control', b'next\x85line')
251        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
252        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
253        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
254        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
255        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
256        conn.putheader('Key Space', 'value')
257        self.assertIn(b'Key Space: value', conn._buffer)
258        conn.putheader('KeySpace ', 'value')
259        self.assertIn(b'KeySpace : value', conn._buffer)
260        conn.putheader(b'Nonbreak\xa0Space', 'value')
261        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
262        conn.putheader(b'\xa0NonbreakSpace', 'value')
263        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
264
265    def test_ipv6host_header(self):
266        # Default host header on IPv6 transaction should be wrapped by [] if
267        # it is an IPv6 address
268        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
269                   b'Accept-Encoding: identity\r\n\r\n'
270        conn = client.HTTPConnection('[2001::]:81')
271        sock = FakeSocket('')
272        conn.sock = sock
273        conn.request('GET', '/foo')
274        self.assertTrue(sock.data.startswith(expected))
275
276        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
277                   b'Accept-Encoding: identity\r\n\r\n'
278        conn = client.HTTPConnection('[2001:102A::]')
279        sock = FakeSocket('')
280        conn.sock = sock
281        conn.request('GET', '/foo')
282        self.assertTrue(sock.data.startswith(expected))
283
284    def test_malformed_headers_coped_with(self):
285        # Issue 19996
286        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
287        sock = FakeSocket(body)
288        resp = client.HTTPResponse(sock)
289        resp.begin()
290
291        self.assertEqual(resp.getheader('First'), 'val')
292        self.assertEqual(resp.getheader('Second'), 'val')
293
294    def test_parse_all_octets(self):
295        # Ensure no valid header field octet breaks the parser
296        body = (
297            b'HTTP/1.1 200 OK\r\n'
298            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
299            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
300            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
301            b'obs-fold: text\r\n'
302            b' folded with space\r\n'
303            b'\tfolded with tab\r\n'
304            b'Content-Length: 0\r\n'
305            b'\r\n'
306        )
307        sock = FakeSocket(body)
308        resp = client.HTTPResponse(sock)
309        resp.begin()
310        self.assertEqual(resp.getheader('Content-Length'), '0')
311        self.assertEqual(resp.msg['Content-Length'], '0')
312        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
313        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
314        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
315        self.assertEqual(resp.getheader('VCHAR'), vchar)
316        self.assertEqual(resp.msg['VCHAR'], vchar)
317        self.assertIsNotNone(resp.getheader('obs-text'))
318        self.assertIn('obs-text', resp.msg)
319        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
320            self.assertTrue(folded.startswith('text'))
321            self.assertIn(' folded with space', folded)
322            self.assertTrue(folded.endswith('folded with tab'))
323
324    def test_invalid_headers(self):
325        conn = client.HTTPConnection('example.com')
326        conn.sock = FakeSocket('')
327        conn.putrequest('GET', '/')
328
329        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
330        # longer allowed in header names
331        cases = (
332            (b'Invalid\r\nName', b'ValidValue'),
333            (b'Invalid\rName', b'ValidValue'),
334            (b'Invalid\nName', b'ValidValue'),
335            (b'\r\nInvalidName', b'ValidValue'),
336            (b'\rInvalidName', b'ValidValue'),
337            (b'\nInvalidName', b'ValidValue'),
338            (b' InvalidName', b'ValidValue'),
339            (b'\tInvalidName', b'ValidValue'),
340            (b'Invalid:Name', b'ValidValue'),
341            (b':InvalidName', b'ValidValue'),
342            (b'ValidName', b'Invalid\r\nValue'),
343            (b'ValidName', b'Invalid\rValue'),
344            (b'ValidName', b'Invalid\nValue'),
345            (b'ValidName', b'InvalidValue\r\n'),
346            (b'ValidName', b'InvalidValue\r'),
347            (b'ValidName', b'InvalidValue\n'),
348        )
349        for name, value in cases:
350            with self.subTest((name, value)):
351                with self.assertRaisesRegex(ValueError, 'Invalid header'):
352                    conn.putheader(name, value)
353
354    def test_headers_debuglevel(self):
355        body = (
356            b'HTTP/1.1 200 OK\r\n'
357            b'First: val\r\n'
358            b'Second: val1\r\n'
359            b'Second: val2\r\n'
360        )
361        sock = FakeSocket(body)
362        resp = client.HTTPResponse(sock, debuglevel=1)
363        with support.captured_stdout() as output:
364            resp.begin()
365        lines = output.getvalue().splitlines()
366        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
367        self.assertEqual(lines[1], "header: First: val")
368        self.assertEqual(lines[2], "header: Second: val1")
369        self.assertEqual(lines[3], "header: Second: val2")
370
371
372class HttpMethodTests(TestCase):
373    def test_invalid_method_names(self):
374        methods = (
375            'GET\r',
376            'POST\n',
377            'PUT\n\r',
378            'POST\nValue',
379            'POST\nHOST:abc',
380            'GET\nrHost:abc\n',
381            'POST\rRemainder:\r',
382            'GET\rHOST:\n',
383            '\nPUT'
384        )
385
386        for method in methods:
387            with self.assertRaisesRegex(
388                    ValueError, "method can't contain control characters"):
389                conn = client.HTTPConnection('example.com')
390                conn.sock = FakeSocket(None)
391                conn.request(method=method, url="/")
392
393
394class TransferEncodingTest(TestCase):
395    expected_body = b"It's just a flesh wound"
396
397    def test_endheaders_chunked(self):
398        conn = client.HTTPConnection('example.com')
399        conn.sock = FakeSocket(b'')
400        conn.putrequest('POST', '/')
401        conn.endheaders(self._make_body(), encode_chunked=True)
402
403        _, _, body = self._parse_request(conn.sock.data)
404        body = self._parse_chunked(body)
405        self.assertEqual(body, self.expected_body)
406
407    def test_explicit_headers(self):
408        # explicit chunked
409        conn = client.HTTPConnection('example.com')
410        conn.sock = FakeSocket(b'')
411        # this shouldn't actually be automatically chunk-encoded because the
412        # calling code has explicitly stated that it's taking care of it
413        conn.request(
414            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
415
416        _, headers, body = self._parse_request(conn.sock.data)
417        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
418        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
419        self.assertEqual(body, self.expected_body)
420
421        # explicit chunked, string body
422        conn = client.HTTPConnection('example.com')
423        conn.sock = FakeSocket(b'')
424        conn.request(
425            'POST', '/', self.expected_body.decode('latin-1'),
426            {'Transfer-Encoding': 'chunked'})
427
428        _, headers, body = self._parse_request(conn.sock.data)
429        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
430        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
431        self.assertEqual(body, self.expected_body)
432
433        # User-specified TE, but request() does the chunk encoding
434        conn = client.HTTPConnection('example.com')
435        conn.sock = FakeSocket(b'')
436        conn.request('POST', '/',
437            headers={'Transfer-Encoding': 'gzip, chunked'},
438            encode_chunked=True,
439            body=self._make_body())
440        _, headers, body = self._parse_request(conn.sock.data)
441        self.assertNotIn('content-length', [k.lower() for k in headers])
442        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
443        self.assertEqual(self._parse_chunked(body), self.expected_body)
444
445    def test_request(self):
446        for empty_lines in (False, True,):
447            conn = client.HTTPConnection('example.com')
448            conn.sock = FakeSocket(b'')
449            conn.request(
450                'POST', '/', self._make_body(empty_lines=empty_lines))
451
452            _, headers, body = self._parse_request(conn.sock.data)
453            body = self._parse_chunked(body)
454            self.assertEqual(body, self.expected_body)
455            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
456
457            # Content-Length and Transfer-Encoding SHOULD not be sent in the
458            # same request
459            self.assertNotIn('content-length', [k.lower() for k in headers])
460
461    def test_empty_body(self):
462        # Zero-length iterable should be treated like any other iterable
463        conn = client.HTTPConnection('example.com')
464        conn.sock = FakeSocket(b'')
465        conn.request('POST', '/', ())
466        _, headers, body = self._parse_request(conn.sock.data)
467        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
468        self.assertNotIn('content-length', [k.lower() for k in headers])
469        self.assertEqual(body, b"0\r\n\r\n")
470
471    def _make_body(self, empty_lines=False):
472        lines = self.expected_body.split(b' ')
473        for idx, line in enumerate(lines):
474            # for testing handling empty lines
475            if empty_lines and idx % 2:
476                yield b''
477            if idx < len(lines) - 1:
478                yield line + b' '
479            else:
480                yield line
481
482    def _parse_request(self, data):
483        lines = data.split(b'\r\n')
484        request = lines[0]
485        headers = {}
486        n = 1
487        while n < len(lines) and len(lines[n]) > 0:
488            key, val = lines[n].split(b':')
489            key = key.decode('latin-1').strip()
490            headers[key] = val.decode('latin-1').strip()
491            n += 1
492
493        return request, headers, b'\r\n'.join(lines[n + 1:])
494
495    def _parse_chunked(self, data):
496        body = []
497        trailers = {}
498        n = 0
499        lines = data.split(b'\r\n')
500        # parse body
501        while True:
502            size, chunk = lines[n:n+2]
503            size = int(size, 16)
504
505            if size == 0:
506                n += 1
507                break
508
509            self.assertEqual(size, len(chunk))
510            body.append(chunk)
511
512            n += 2
513            # we /should/ hit the end chunk, but check against the size of
514            # lines so we're not stuck in an infinite loop should we get
515            # malformed data
516            if n > len(lines):
517                break
518
519        return b''.join(body)
520
521
522class BasicTest(TestCase):
523    def test_dir_with_added_behavior_on_status(self):
524        # see issue40084
525        self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
526
527    def test_status_lines(self):
528        # Test HTTP status lines
529
530        body = "HTTP/1.1 200 Ok\r\n\r\nText"
531        sock = FakeSocket(body)
532        resp = client.HTTPResponse(sock)
533        resp.begin()
534        self.assertEqual(resp.read(0), b'')  # Issue #20007
535        self.assertFalse(resp.isclosed())
536        self.assertFalse(resp.closed)
537        self.assertEqual(resp.read(), b"Text")
538        self.assertTrue(resp.isclosed())
539        self.assertFalse(resp.closed)
540        resp.close()
541        self.assertTrue(resp.closed)
542
543        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
544        sock = FakeSocket(body)
545        resp = client.HTTPResponse(sock)
546        self.assertRaises(client.BadStatusLine, resp.begin)
547
548    def test_bad_status_repr(self):
549        exc = client.BadStatusLine('')
550        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
551
552    def test_partial_reads(self):
553        # if we have Content-Length, HTTPResponse knows when to close itself,
554        # the same behaviour as when we read the whole thing with read()
555        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
556        sock = FakeSocket(body)
557        resp = client.HTTPResponse(sock)
558        resp.begin()
559        self.assertEqual(resp.read(2), b'Te')
560        self.assertFalse(resp.isclosed())
561        self.assertEqual(resp.read(2), b'xt')
562        self.assertTrue(resp.isclosed())
563        self.assertFalse(resp.closed)
564        resp.close()
565        self.assertTrue(resp.closed)
566
567    def test_mixed_reads(self):
568        # readline() should update the remaining length, so that read() knows
569        # how much data is left and does not raise IncompleteRead
570        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
571        sock = FakeSocket(body)
572        resp = client.HTTPResponse(sock)
573        resp.begin()
574        self.assertEqual(resp.readline(), b'Text\r\n')
575        self.assertFalse(resp.isclosed())
576        self.assertEqual(resp.read(), b'Another')
577        self.assertTrue(resp.isclosed())
578        self.assertFalse(resp.closed)
579        resp.close()
580        self.assertTrue(resp.closed)
581
582    def test_partial_readintos(self):
583        # if we have Content-Length, HTTPResponse knows when to close itself,
584        # the same behaviour as when we read the whole thing with read()
585        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
586        sock = FakeSocket(body)
587        resp = client.HTTPResponse(sock)
588        resp.begin()
589        b = bytearray(2)
590        n = resp.readinto(b)
591        self.assertEqual(n, 2)
592        self.assertEqual(bytes(b), b'Te')
593        self.assertFalse(resp.isclosed())
594        n = resp.readinto(b)
595        self.assertEqual(n, 2)
596        self.assertEqual(bytes(b), b'xt')
597        self.assertTrue(resp.isclosed())
598        self.assertFalse(resp.closed)
599        resp.close()
600        self.assertTrue(resp.closed)
601
602    def test_partial_reads_past_end(self):
603        # if we have Content-Length, clip reads to the end
604        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
605        sock = FakeSocket(body)
606        resp = client.HTTPResponse(sock)
607        resp.begin()
608        self.assertEqual(resp.read(10), b'Text')
609        self.assertTrue(resp.isclosed())
610        self.assertFalse(resp.closed)
611        resp.close()
612        self.assertTrue(resp.closed)
613
614    def test_partial_readintos_past_end(self):
615        # if we have Content-Length, clip readintos to the end
616        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
617        sock = FakeSocket(body)
618        resp = client.HTTPResponse(sock)
619        resp.begin()
620        b = bytearray(10)
621        n = resp.readinto(b)
622        self.assertEqual(n, 4)
623        self.assertEqual(bytes(b)[:4], b'Text')
624        self.assertTrue(resp.isclosed())
625        self.assertFalse(resp.closed)
626        resp.close()
627        self.assertTrue(resp.closed)
628
629    def test_partial_reads_no_content_length(self):
630        # when no length is present, the socket should be gracefully closed when
631        # all data was read
632        body = "HTTP/1.1 200 Ok\r\n\r\nText"
633        sock = FakeSocket(body)
634        resp = client.HTTPResponse(sock)
635        resp.begin()
636        self.assertEqual(resp.read(2), b'Te')
637        self.assertFalse(resp.isclosed())
638        self.assertEqual(resp.read(2), b'xt')
639        self.assertEqual(resp.read(1), b'')
640        self.assertTrue(resp.isclosed())
641        self.assertFalse(resp.closed)
642        resp.close()
643        self.assertTrue(resp.closed)
644
645    def test_partial_readintos_no_content_length(self):
646        # when no length is present, the socket should be gracefully closed when
647        # all data was read
648        body = "HTTP/1.1 200 Ok\r\n\r\nText"
649        sock = FakeSocket(body)
650        resp = client.HTTPResponse(sock)
651        resp.begin()
652        b = bytearray(2)
653        n = resp.readinto(b)
654        self.assertEqual(n, 2)
655        self.assertEqual(bytes(b), b'Te')
656        self.assertFalse(resp.isclosed())
657        n = resp.readinto(b)
658        self.assertEqual(n, 2)
659        self.assertEqual(bytes(b), b'xt')
660        n = resp.readinto(b)
661        self.assertEqual(n, 0)
662        self.assertTrue(resp.isclosed())
663
664    def test_partial_reads_incomplete_body(self):
665        # if the server shuts down the connection before the whole
666        # content-length is delivered, the socket is gracefully closed
667        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
668        sock = FakeSocket(body)
669        resp = client.HTTPResponse(sock)
670        resp.begin()
671        self.assertEqual(resp.read(2), b'Te')
672        self.assertFalse(resp.isclosed())
673        self.assertEqual(resp.read(2), b'xt')
674        self.assertEqual(resp.read(1), b'')
675        self.assertTrue(resp.isclosed())
676
677    def test_partial_readintos_incomplete_body(self):
678        # if the server shuts down the connection before the whole
679        # content-length is delivered, the socket is gracefully closed
680        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
681        sock = FakeSocket(body)
682        resp = client.HTTPResponse(sock)
683        resp.begin()
684        b = bytearray(2)
685        n = resp.readinto(b)
686        self.assertEqual(n, 2)
687        self.assertEqual(bytes(b), b'Te')
688        self.assertFalse(resp.isclosed())
689        n = resp.readinto(b)
690        self.assertEqual(n, 2)
691        self.assertEqual(bytes(b), b'xt')
692        n = resp.readinto(b)
693        self.assertEqual(n, 0)
694        self.assertTrue(resp.isclosed())
695        self.assertFalse(resp.closed)
696        resp.close()
697        self.assertTrue(resp.closed)
698
699    def test_host_port(self):
700        # Check invalid host_port
701
702        for hp in ("www.python.org:abc", "user:password@www.python.org"):
703            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
704
705        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
706                          "fe80::207:e9ff:fe9b", 8000),
707                         ("www.python.org:80", "www.python.org", 80),
708                         ("www.python.org:", "www.python.org", 80),
709                         ("www.python.org", "www.python.org", 80),
710                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
711                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
712            c = client.HTTPConnection(hp)
713            self.assertEqual(h, c.host)
714            self.assertEqual(p, c.port)
715
716    def test_response_headers(self):
717        # test response with multiple message headers with the same field name.
718        text = ('HTTP/1.1 200 OK\r\n'
719                'Set-Cookie: Customer="WILE_E_COYOTE"; '
720                'Version="1"; Path="/acme"\r\n'
721                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
722                ' Path="/acme"\r\n'
723                '\r\n'
724                'No body\r\n')
725        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
726               ', '
727               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
728        s = FakeSocket(text)
729        r = client.HTTPResponse(s)
730        r.begin()
731        cookies = r.getheader("Set-Cookie")
732        self.assertEqual(cookies, hdr)
733
734    def test_read_head(self):
735        # Test that the library doesn't attempt to read any data
736        # from a HEAD request.  (Tickles SF bug #622042.)
737        sock = FakeSocket(
738            'HTTP/1.1 200 OK\r\n'
739            'Content-Length: 14432\r\n'
740            '\r\n',
741            NoEOFBytesIO)
742        resp = client.HTTPResponse(sock, method="HEAD")
743        resp.begin()
744        if resp.read():
745            self.fail("Did not expect response from HEAD request")
746
747    def test_readinto_head(self):
748        # Test that the library doesn't attempt to read any data
749        # from a HEAD request.  (Tickles SF bug #622042.)
750        sock = FakeSocket(
751            'HTTP/1.1 200 OK\r\n'
752            'Content-Length: 14432\r\n'
753            '\r\n',
754            NoEOFBytesIO)
755        resp = client.HTTPResponse(sock, method="HEAD")
756        resp.begin()
757        b = bytearray(5)
758        if resp.readinto(b) != 0:
759            self.fail("Did not expect response from HEAD request")
760        self.assertEqual(bytes(b), b'\x00'*5)
761
762    def test_too_many_headers(self):
763        headers = '\r\n'.join('Header%d: foo' % i
764                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
765        text = ('HTTP/1.1 200 OK\r\n' + headers)
766        s = FakeSocket(text)
767        r = client.HTTPResponse(s)
768        self.assertRaisesRegex(client.HTTPException,
769                               r"got more than \d+ headers", r.begin)
770
771    def test_send_file(self):
772        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
773                    b'Accept-Encoding: identity\r\n'
774                    b'Transfer-Encoding: chunked\r\n'
775                    b'\r\n')
776
777        with open(__file__, 'rb') as body:
778            conn = client.HTTPConnection('example.com')
779            sock = FakeSocket(body)
780            conn.sock = sock
781            conn.request('GET', '/foo', body)
782            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
783                    (sock.data[:len(expected)], expected))
784
785    def test_send(self):
786        expected = b'this is a test this is only a test'
787        conn = client.HTTPConnection('example.com')
788        sock = FakeSocket(None)
789        conn.sock = sock
790        conn.send(expected)
791        self.assertEqual(expected, sock.data)
792        sock.data = b''
793        conn.send(array.array('b', expected))
794        self.assertEqual(expected, sock.data)
795        sock.data = b''
796        conn.send(io.BytesIO(expected))
797        self.assertEqual(expected, sock.data)
798
799    def test_send_updating_file(self):
800        def data():
801            yield 'data'
802            yield None
803            yield 'data_two'
804
805        class UpdatingFile(io.TextIOBase):
806            mode = 'r'
807            d = data()
808            def read(self, blocksize=-1):
809                return next(self.d)
810
811        expected = b'data'
812
813        conn = client.HTTPConnection('example.com')
814        sock = FakeSocket("")
815        conn.sock = sock
816        conn.send(UpdatingFile())
817        self.assertEqual(sock.data, expected)
818
819
820    def test_send_iter(self):
821        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
822                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
823                   b'\r\nonetwothree'
824
825        def body():
826            yield b"one"
827            yield b"two"
828            yield b"three"
829
830        conn = client.HTTPConnection('example.com')
831        sock = FakeSocket("")
832        conn.sock = sock
833        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
834        self.assertEqual(sock.data, expected)
835
836    def test_blocksize_request(self):
837        """Check that request() respects the configured block size."""
838        blocksize = 8  # For easy debugging.
839        conn = client.HTTPConnection('example.com', blocksize=blocksize)
840        sock = FakeSocket(None)
841        conn.sock = sock
842        expected = b"a" * blocksize + b"b"
843        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
844        self.assertEqual(sock.sendall_calls, 3)
845        body = sock.data.split(b"\r\n\r\n", 1)[1]
846        self.assertEqual(body, expected)
847
848    def test_blocksize_send(self):
849        """Check that send() respects the configured block size."""
850        blocksize = 8  # For easy debugging.
851        conn = client.HTTPConnection('example.com', blocksize=blocksize)
852        sock = FakeSocket(None)
853        conn.sock = sock
854        expected = b"a" * blocksize + b"b"
855        conn.send(io.BytesIO(expected))
856        self.assertEqual(sock.sendall_calls, 2)
857        self.assertEqual(sock.data, expected)
858
859    def test_send_type_error(self):
860        # See: Issue #12676
861        conn = client.HTTPConnection('example.com')
862        conn.sock = FakeSocket('')
863        with self.assertRaises(TypeError):
864            conn.request('POST', 'test', conn)
865
866    def test_chunked(self):
867        expected = chunked_expected
868        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
869        resp = client.HTTPResponse(sock, method="GET")
870        resp.begin()
871        self.assertEqual(resp.read(), expected)
872        resp.close()
873
874        # Various read sizes
875        for n in range(1, 12):
876            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
877            resp = client.HTTPResponse(sock, method="GET")
878            resp.begin()
879            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
880            resp.close()
881
882        for x in ('', 'foo\r\n'):
883            sock = FakeSocket(chunked_start + x)
884            resp = client.HTTPResponse(sock, method="GET")
885            resp.begin()
886            try:
887                resp.read()
888            except client.IncompleteRead as i:
889                self.assertEqual(i.partial, expected)
890                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
891                self.assertEqual(repr(i), expected_message)
892                self.assertEqual(str(i), expected_message)
893            else:
894                self.fail('IncompleteRead expected')
895            finally:
896                resp.close()
897
898    def test_readinto_chunked(self):
899
900        expected = chunked_expected
901        nexpected = len(expected)
902        b = bytearray(128)
903
904        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
905        resp = client.HTTPResponse(sock, method="GET")
906        resp.begin()
907        n = resp.readinto(b)
908        self.assertEqual(b[:nexpected], expected)
909        self.assertEqual(n, nexpected)
910        resp.close()
911
912        # Various read sizes
913        for n in range(1, 12):
914            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
915            resp = client.HTTPResponse(sock, method="GET")
916            resp.begin()
917            m = memoryview(b)
918            i = resp.readinto(m[0:n])
919            i += resp.readinto(m[i:n + i])
920            i += resp.readinto(m[i:])
921            self.assertEqual(b[:nexpected], expected)
922            self.assertEqual(i, nexpected)
923            resp.close()
924
925        for x in ('', 'foo\r\n'):
926            sock = FakeSocket(chunked_start + x)
927            resp = client.HTTPResponse(sock, method="GET")
928            resp.begin()
929            try:
930                n = resp.readinto(b)
931            except client.IncompleteRead as i:
932                self.assertEqual(i.partial, expected)
933                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
934                self.assertEqual(repr(i), expected_message)
935                self.assertEqual(str(i), expected_message)
936            else:
937                self.fail('IncompleteRead expected')
938            finally:
939                resp.close()
940
941    def test_chunked_head(self):
942        chunked_start = (
943            'HTTP/1.1 200 OK\r\n'
944            'Transfer-Encoding: chunked\r\n\r\n'
945            'a\r\n'
946            'hello world\r\n'
947            '1\r\n'
948            'd\r\n'
949        )
950        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
951        resp = client.HTTPResponse(sock, method="HEAD")
952        resp.begin()
953        self.assertEqual(resp.read(), b'')
954        self.assertEqual(resp.status, 200)
955        self.assertEqual(resp.reason, 'OK')
956        self.assertTrue(resp.isclosed())
957        self.assertFalse(resp.closed)
958        resp.close()
959        self.assertTrue(resp.closed)
960
961    def test_readinto_chunked_head(self):
962        chunked_start = (
963            'HTTP/1.1 200 OK\r\n'
964            'Transfer-Encoding: chunked\r\n\r\n'
965            'a\r\n'
966            'hello world\r\n'
967            '1\r\n'
968            'd\r\n'
969        )
970        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
971        resp = client.HTTPResponse(sock, method="HEAD")
972        resp.begin()
973        b = bytearray(5)
974        n = resp.readinto(b)
975        self.assertEqual(n, 0)
976        self.assertEqual(bytes(b), b'\x00'*5)
977        self.assertEqual(resp.status, 200)
978        self.assertEqual(resp.reason, 'OK')
979        self.assertTrue(resp.isclosed())
980        self.assertFalse(resp.closed)
981        resp.close()
982        self.assertTrue(resp.closed)
983
984    def test_negative_content_length(self):
985        sock = FakeSocket(
986            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
987        resp = client.HTTPResponse(sock, method="GET")
988        resp.begin()
989        self.assertEqual(resp.read(), b'Hello\r\n')
990        self.assertTrue(resp.isclosed())
991
992    def test_incomplete_read(self):
993        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
994        resp = client.HTTPResponse(sock, method="GET")
995        resp.begin()
996        try:
997            resp.read()
998        except client.IncompleteRead as i:
999            self.assertEqual(i.partial, b'Hello\r\n')
1000            self.assertEqual(repr(i),
1001                             "IncompleteRead(7 bytes read, 3 more expected)")
1002            self.assertEqual(str(i),
1003                             "IncompleteRead(7 bytes read, 3 more expected)")
1004            self.assertTrue(resp.isclosed())
1005        else:
1006            self.fail('IncompleteRead expected')
1007
1008    def test_epipe(self):
1009        sock = EPipeSocket(
1010            "HTTP/1.0 401 Authorization Required\r\n"
1011            "Content-type: text/html\r\n"
1012            "WWW-Authenticate: Basic realm=\"example\"\r\n",
1013            b"Content-Length")
1014        conn = client.HTTPConnection("example.com")
1015        conn.sock = sock
1016        self.assertRaises(OSError,
1017                          lambda: conn.request("PUT", "/url", "body"))
1018        resp = conn.getresponse()
1019        self.assertEqual(401, resp.status)
1020        self.assertEqual("Basic realm=\"example\"",
1021                         resp.getheader("www-authenticate"))
1022
1023    # Test lines overflowing the max line size (_MAXLINE in http.client)
1024
1025    def test_overflowing_status_line(self):
1026        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
1027        resp = client.HTTPResponse(FakeSocket(body))
1028        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
1029
1030    def test_overflowing_header_line(self):
1031        body = (
1032            'HTTP/1.1 200 OK\r\n'
1033            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
1034        )
1035        resp = client.HTTPResponse(FakeSocket(body))
1036        self.assertRaises(client.LineTooLong, resp.begin)
1037
1038    def test_overflowing_header_limit_after_100(self):
1039        body = (
1040            'HTTP/1.1 100 OK\r\n'
1041            'r\n' * 32768
1042        )
1043        resp = client.HTTPResponse(FakeSocket(body))
1044        with self.assertRaises(client.HTTPException) as cm:
1045            resp.begin()
1046        # We must assert more because other reasonable errors that we
1047        # do not want can also be HTTPException derived.
1048        self.assertIn('got more than ', str(cm.exception))
1049        self.assertIn('headers', str(cm.exception))
1050
1051    def test_overflowing_chunked_line(self):
1052        body = (
1053            'HTTP/1.1 200 OK\r\n'
1054            'Transfer-Encoding: chunked\r\n\r\n'
1055            + '0' * 65536 + 'a\r\n'
1056            'hello world\r\n'
1057            '0\r\n'
1058            '\r\n'
1059        )
1060        resp = client.HTTPResponse(FakeSocket(body))
1061        resp.begin()
1062        self.assertRaises(client.LineTooLong, resp.read)
1063
1064    def test_early_eof(self):
1065        # Test httpresponse with no \r\n termination,
1066        body = "HTTP/1.1 200 Ok"
1067        sock = FakeSocket(body)
1068        resp = client.HTTPResponse(sock)
1069        resp.begin()
1070        self.assertEqual(resp.read(), b'')
1071        self.assertTrue(resp.isclosed())
1072        self.assertFalse(resp.closed)
1073        resp.close()
1074        self.assertTrue(resp.closed)
1075
1076    def test_error_leak(self):
1077        # Test that the socket is not leaked if getresponse() fails
1078        conn = client.HTTPConnection('example.com')
1079        response = None
1080        class Response(client.HTTPResponse):
1081            def __init__(self, *pos, **kw):
1082                nonlocal response
1083                response = self  # Avoid garbage collector closing the socket
1084                client.HTTPResponse.__init__(self, *pos, **kw)
1085        conn.response_class = Response
1086        conn.sock = FakeSocket('Invalid status line')
1087        conn.request('GET', '/')
1088        self.assertRaises(client.BadStatusLine, conn.getresponse)
1089        self.assertTrue(response.closed)
1090        self.assertTrue(conn.sock.file_closed)
1091
1092    def test_chunked_extension(self):
1093        extra = '3;foo=bar\r\n' + 'abc\r\n'
1094        expected = chunked_expected + b'abc'
1095
1096        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1097        resp = client.HTTPResponse(sock, method="GET")
1098        resp.begin()
1099        self.assertEqual(resp.read(), expected)
1100        resp.close()
1101
1102    def test_chunked_missing_end(self):
1103        """some servers may serve up a short chunked encoding stream"""
1104        expected = chunked_expected
1105        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1106        resp = client.HTTPResponse(sock, method="GET")
1107        resp.begin()
1108        self.assertEqual(resp.read(), expected)
1109        resp.close()
1110
1111    def test_chunked_trailers(self):
1112        """See that trailers are read and ignored"""
1113        expected = chunked_expected
1114        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1115        resp = client.HTTPResponse(sock, method="GET")
1116        resp.begin()
1117        self.assertEqual(resp.read(), expected)
1118        # we should have reached the end of the file
1119        self.assertEqual(sock.file.read(), b"") #we read to the end
1120        resp.close()
1121
1122    def test_chunked_sync(self):
1123        """Check that we don't read past the end of the chunked-encoding stream"""
1124        expected = chunked_expected
1125        extradata = "extradata"
1126        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1127        resp = client.HTTPResponse(sock, method="GET")
1128        resp.begin()
1129        self.assertEqual(resp.read(), expected)
1130        # the file should now have our extradata ready to be read
1131        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1132        resp.close()
1133
1134    def test_content_length_sync(self):
1135        """Check that we don't read past the end of the Content-Length stream"""
1136        extradata = b"extradata"
1137        expected = b"Hello123\r\n"
1138        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1139        resp = client.HTTPResponse(sock, method="GET")
1140        resp.begin()
1141        self.assertEqual(resp.read(), expected)
1142        # the file should now have our extradata ready to be read
1143        self.assertEqual(sock.file.read(), extradata) #we read to the end
1144        resp.close()
1145
1146    def test_readlines_content_length(self):
1147        extradata = b"extradata"
1148        expected = b"Hello123\r\n"
1149        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1150        resp = client.HTTPResponse(sock, method="GET")
1151        resp.begin()
1152        self.assertEqual(resp.readlines(2000), [expected])
1153        # the file should now have our extradata ready to be read
1154        self.assertEqual(sock.file.read(), extradata) #we read to the end
1155        resp.close()
1156
1157    def test_read1_content_length(self):
1158        extradata = b"extradata"
1159        expected = b"Hello123\r\n"
1160        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1161        resp = client.HTTPResponse(sock, method="GET")
1162        resp.begin()
1163        self.assertEqual(resp.read1(2000), expected)
1164        # the file should now have our extradata ready to be read
1165        self.assertEqual(sock.file.read(), extradata) #we read to the end
1166        resp.close()
1167
1168    def test_readline_bound_content_length(self):
1169        extradata = b"extradata"
1170        expected = b"Hello123\r\n"
1171        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1172        resp = client.HTTPResponse(sock, method="GET")
1173        resp.begin()
1174        self.assertEqual(resp.readline(10), expected)
1175        self.assertEqual(resp.readline(10), b"")
1176        # the file should now have our extradata ready to be read
1177        self.assertEqual(sock.file.read(), extradata) #we read to the end
1178        resp.close()
1179
1180    def test_read1_bound_content_length(self):
1181        extradata = b"extradata"
1182        expected = b"Hello123\r\n"
1183        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1184        resp = client.HTTPResponse(sock, method="GET")
1185        resp.begin()
1186        self.assertEqual(resp.read1(20), expected*2)
1187        self.assertEqual(resp.read(), expected)
1188        # the file should now have our extradata ready to be read
1189        self.assertEqual(sock.file.read(), extradata) #we read to the end
1190        resp.close()
1191
1192    def test_response_fileno(self):
1193        # Make sure fd returned by fileno is valid.
1194        serv = socket.create_server((HOST, 0))
1195        self.addCleanup(serv.close)
1196
1197        result = None
1198        def run_server():
1199            [conn, address] = serv.accept()
1200            with conn, conn.makefile("rb") as reader:
1201                # Read the request header until a blank line
1202                while True:
1203                    line = reader.readline()
1204                    if not line.rstrip(b"\r\n"):
1205                        break
1206                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1207                nonlocal result
1208                result = reader.read()
1209
1210        thread = threading.Thread(target=run_server)
1211        thread.start()
1212        self.addCleanup(thread.join, float(1))
1213        conn = client.HTTPConnection(*serv.getsockname())
1214        conn.request("CONNECT", "dummy:1234")
1215        response = conn.getresponse()
1216        try:
1217            self.assertEqual(response.status, client.OK)
1218            s = socket.socket(fileno=response.fileno())
1219            try:
1220                s.sendall(b"proxied data\n")
1221            finally:
1222                s.detach()
1223        finally:
1224            response.close()
1225            conn.close()
1226        thread.join()
1227        self.assertEqual(result, b"proxied data\n")
1228
1229    def test_putrequest_override_domain_validation(self):
1230        """
1231        It should be possible to override the default validation
1232        behavior in putrequest (bpo-38216).
1233        """
1234        class UnsafeHTTPConnection(client.HTTPConnection):
1235            def _validate_path(self, url):
1236                pass
1237
1238        conn = UnsafeHTTPConnection('example.com')
1239        conn.sock = FakeSocket('')
1240        conn.putrequest('GET', '/\x00')
1241
1242    def test_putrequest_override_host_validation(self):
1243        class UnsafeHTTPConnection(client.HTTPConnection):
1244            def _validate_host(self, url):
1245                pass
1246
1247        conn = UnsafeHTTPConnection('example.com\r\n')
1248        conn.sock = FakeSocket('')
1249        # set skip_host so a ValueError is not raised upon adding the
1250        # invalid URL as the value of the "Host:" header
1251        conn.putrequest('GET', '/', skip_host=1)
1252
1253    def test_putrequest_override_encoding(self):
1254        """
1255        It should be possible to override the default encoding
1256        to transmit bytes in another encoding even if invalid
1257        (bpo-36274).
1258        """
1259        class UnsafeHTTPConnection(client.HTTPConnection):
1260            def _encode_request(self, str_url):
1261                return str_url.encode('utf-8')
1262
1263        conn = UnsafeHTTPConnection('example.com')
1264        conn.sock = FakeSocket('')
1265        conn.putrequest('GET', '/☃')
1266
1267
1268class ExtendedReadTest(TestCase):
1269    """
1270    Test peek(), read1(), readline()
1271    """
1272    lines = (
1273        'HTTP/1.1 200 OK\r\n'
1274        '\r\n'
1275        'hello world!\n'
1276        'and now \n'
1277        'for something completely different\n'
1278        'foo'
1279        )
1280    lines_expected = lines[lines.find('hello'):].encode("ascii")
1281    lines_chunked = (
1282        'HTTP/1.1 200 OK\r\n'
1283        'Transfer-Encoding: chunked\r\n\r\n'
1284        'a\r\n'
1285        'hello worl\r\n'
1286        '3\r\n'
1287        'd!\n\r\n'
1288        '9\r\n'
1289        'and now \n\r\n'
1290        '23\r\n'
1291        'for something completely different\n\r\n'
1292        '3\r\n'
1293        'foo\r\n'
1294        '0\r\n' # terminating chunk
1295        '\r\n'  # end of trailers
1296    )
1297
1298    def setUp(self):
1299        sock = FakeSocket(self.lines)
1300        resp = client.HTTPResponse(sock, method="GET")
1301        resp.begin()
1302        resp.fp = io.BufferedReader(resp.fp)
1303        self.resp = resp
1304
1305
1306
1307    def test_peek(self):
1308        resp = self.resp
1309        # patch up the buffered peek so that it returns not too much stuff
1310        oldpeek = resp.fp.peek
1311        def mypeek(n=-1):
1312            p = oldpeek(n)
1313            if n >= 0:
1314                return p[:n]
1315            return p[:10]
1316        resp.fp.peek = mypeek
1317
1318        all = []
1319        while True:
1320            # try a short peek
1321            p = resp.peek(3)
1322            if p:
1323                self.assertGreater(len(p), 0)
1324                # then unbounded peek
1325                p2 = resp.peek()
1326                self.assertGreaterEqual(len(p2), len(p))
1327                self.assertTrue(p2.startswith(p))
1328                next = resp.read(len(p2))
1329                self.assertEqual(next, p2)
1330            else:
1331                next = resp.read()
1332                self.assertFalse(next)
1333            all.append(next)
1334            if not next:
1335                break
1336        self.assertEqual(b"".join(all), self.lines_expected)
1337
1338    def test_readline(self):
1339        resp = self.resp
1340        self._verify_readline(self.resp.readline, self.lines_expected)
1341
1342    def _verify_readline(self, readline, expected):
1343        all = []
1344        while True:
1345            # short readlines
1346            line = readline(5)
1347            if line and line != b"foo":
1348                if len(line) < 5:
1349                    self.assertTrue(line.endswith(b"\n"))
1350            all.append(line)
1351            if not line:
1352                break
1353        self.assertEqual(b"".join(all), expected)
1354
1355    def test_read1(self):
1356        resp = self.resp
1357        def r():
1358            res = resp.read1(4)
1359            self.assertLessEqual(len(res), 4)
1360            return res
1361        readliner = Readliner(r)
1362        self._verify_readline(readliner.readline, self.lines_expected)
1363
1364    def test_read1_unbounded(self):
1365        resp = self.resp
1366        all = []
1367        while True:
1368            data = resp.read1()
1369            if not data:
1370                break
1371            all.append(data)
1372        self.assertEqual(b"".join(all), self.lines_expected)
1373
1374    def test_read1_bounded(self):
1375        resp = self.resp
1376        all = []
1377        while True:
1378            data = resp.read1(10)
1379            if not data:
1380                break
1381            self.assertLessEqual(len(data), 10)
1382            all.append(data)
1383        self.assertEqual(b"".join(all), self.lines_expected)
1384
1385    def test_read1_0(self):
1386        self.assertEqual(self.resp.read1(0), b"")
1387
1388    def test_peek_0(self):
1389        p = self.resp.peek(0)
1390        self.assertLessEqual(0, len(p))
1391
1392
1393class ExtendedReadTestChunked(ExtendedReadTest):
1394    """
1395    Test peek(), read1(), readline() in chunked mode
1396    """
1397    lines = (
1398        'HTTP/1.1 200 OK\r\n'
1399        'Transfer-Encoding: chunked\r\n\r\n'
1400        'a\r\n'
1401        'hello worl\r\n'
1402        '3\r\n'
1403        'd!\n\r\n'
1404        '9\r\n'
1405        'and now \n\r\n'
1406        '23\r\n'
1407        'for something completely different\n\r\n'
1408        '3\r\n'
1409        'foo\r\n'
1410        '0\r\n' # terminating chunk
1411        '\r\n'  # end of trailers
1412    )
1413
1414
1415class Readliner:
1416    """
1417    a simple readline class that uses an arbitrary read function and buffering
1418    """
1419    def __init__(self, readfunc):
1420        self.readfunc = readfunc
1421        self.remainder = b""
1422
1423    def readline(self, limit):
1424        data = []
1425        datalen = 0
1426        read = self.remainder
1427        try:
1428            while True:
1429                idx = read.find(b'\n')
1430                if idx != -1:
1431                    break
1432                if datalen + len(read) >= limit:
1433                    idx = limit - datalen - 1
1434                # read more data
1435                data.append(read)
1436                read = self.readfunc()
1437                if not read:
1438                    idx = 0 #eof condition
1439                    break
1440            idx += 1
1441            data.append(read[:idx])
1442            self.remainder = read[idx:]
1443            return b"".join(data)
1444        except:
1445            self.remainder = b"".join(data)
1446            raise
1447
1448
1449class OfflineTest(TestCase):
1450    def test_all(self):
1451        # Documented objects defined in the module should be in __all__
1452        expected = {"responses"}  # Allowlist documented dict() object
1453        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1454        # intentionally omitted for simplicity
1455        denylist = {"HTTPMessage", "parse_headers"}
1456        for name in dir(client):
1457            if name.startswith("_") or name in denylist:
1458                continue
1459            module_object = getattr(client, name)
1460            if getattr(module_object, "__module__", None) == "http.client":
1461                expected.add(name)
1462        self.assertCountEqual(client.__all__, expected)
1463
1464    def test_responses(self):
1465        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1466
1467    def test_client_constants(self):
1468        # Make sure we don't break backward compatibility with 3.4
1469        expected = [
1470            'CONTINUE',
1471            'SWITCHING_PROTOCOLS',
1472            'PROCESSING',
1473            'OK',
1474            'CREATED',
1475            'ACCEPTED',
1476            'NON_AUTHORITATIVE_INFORMATION',
1477            'NO_CONTENT',
1478            'RESET_CONTENT',
1479            'PARTIAL_CONTENT',
1480            'MULTI_STATUS',
1481            'IM_USED',
1482            'MULTIPLE_CHOICES',
1483            'MOVED_PERMANENTLY',
1484            'FOUND',
1485            'SEE_OTHER',
1486            'NOT_MODIFIED',
1487            'USE_PROXY',
1488            'TEMPORARY_REDIRECT',
1489            'BAD_REQUEST',
1490            'UNAUTHORIZED',
1491            'PAYMENT_REQUIRED',
1492            'FORBIDDEN',
1493            'NOT_FOUND',
1494            'METHOD_NOT_ALLOWED',
1495            'NOT_ACCEPTABLE',
1496            'PROXY_AUTHENTICATION_REQUIRED',
1497            'REQUEST_TIMEOUT',
1498            'CONFLICT',
1499            'GONE',
1500            'LENGTH_REQUIRED',
1501            'PRECONDITION_FAILED',
1502            'REQUEST_ENTITY_TOO_LARGE',
1503            'REQUEST_URI_TOO_LONG',
1504            'UNSUPPORTED_MEDIA_TYPE',
1505            'REQUESTED_RANGE_NOT_SATISFIABLE',
1506            'EXPECTATION_FAILED',
1507            'IM_A_TEAPOT',
1508            'MISDIRECTED_REQUEST',
1509            'UNPROCESSABLE_ENTITY',
1510            'LOCKED',
1511            'FAILED_DEPENDENCY',
1512            'UPGRADE_REQUIRED',
1513            'PRECONDITION_REQUIRED',
1514            'TOO_MANY_REQUESTS',
1515            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1516            'UNAVAILABLE_FOR_LEGAL_REASONS',
1517            'INTERNAL_SERVER_ERROR',
1518            'NOT_IMPLEMENTED',
1519            'BAD_GATEWAY',
1520            'SERVICE_UNAVAILABLE',
1521            'GATEWAY_TIMEOUT',
1522            'HTTP_VERSION_NOT_SUPPORTED',
1523            'INSUFFICIENT_STORAGE',
1524            'NOT_EXTENDED',
1525            'NETWORK_AUTHENTICATION_REQUIRED',
1526            'EARLY_HINTS',
1527            'TOO_EARLY'
1528        ]
1529        for const in expected:
1530            with self.subTest(constant=const):
1531                self.assertTrue(hasattr(client, const))
1532
1533
1534class SourceAddressTest(TestCase):
1535    def setUp(self):
1536        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1537        self.port = socket_helper.bind_port(self.serv)
1538        self.source_port = socket_helper.find_unused_port()
1539        self.serv.listen()
1540        self.conn = None
1541
1542    def tearDown(self):
1543        if self.conn:
1544            self.conn.close()
1545            self.conn = None
1546        self.serv.close()
1547        self.serv = None
1548
1549    def testHTTPConnectionSourceAddress(self):
1550        self.conn = client.HTTPConnection(HOST, self.port,
1551                source_address=('', self.source_port))
1552        self.conn.connect()
1553        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1554
1555    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1556                     'http.client.HTTPSConnection not defined')
1557    def testHTTPSConnectionSourceAddress(self):
1558        self.conn = client.HTTPSConnection(HOST, self.port,
1559                source_address=('', self.source_port))
1560        # We don't test anything here other than the constructor not barfing as
1561        # this code doesn't deal with setting up an active running SSL server
1562        # for an ssl_wrapped connect() to actually return from.
1563
1564
1565class TimeoutTest(TestCase):
1566    PORT = None
1567
1568    def setUp(self):
1569        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1570        TimeoutTest.PORT = socket_helper.bind_port(self.serv)
1571        self.serv.listen()
1572
1573    def tearDown(self):
1574        self.serv.close()
1575        self.serv = None
1576
1577    def testTimeoutAttribute(self):
1578        # This will prove that the timeout gets through HTTPConnection
1579        # and into the socket.
1580
1581        # default -- use global socket timeout
1582        self.assertIsNone(socket.getdefaulttimeout())
1583        socket.setdefaulttimeout(30)
1584        try:
1585            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1586            httpConn.connect()
1587        finally:
1588            socket.setdefaulttimeout(None)
1589        self.assertEqual(httpConn.sock.gettimeout(), 30)
1590        httpConn.close()
1591
1592        # no timeout -- do not use global socket default
1593        self.assertIsNone(socket.getdefaulttimeout())
1594        socket.setdefaulttimeout(30)
1595        try:
1596            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1597                                              timeout=None)
1598            httpConn.connect()
1599        finally:
1600            socket.setdefaulttimeout(None)
1601        self.assertEqual(httpConn.sock.gettimeout(), None)
1602        httpConn.close()
1603
1604        # a value
1605        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1606        httpConn.connect()
1607        self.assertEqual(httpConn.sock.gettimeout(), 30)
1608        httpConn.close()
1609
1610
1611class PersistenceTest(TestCase):
1612
1613    def test_reuse_reconnect(self):
1614        # Should reuse or reconnect depending on header from server
1615        tests = (
1616            ('1.0', '', False),
1617            ('1.0', 'Connection: keep-alive\r\n', True),
1618            ('1.1', '', True),
1619            ('1.1', 'Connection: close\r\n', False),
1620            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1621            ('1.1', 'Connection: cloSE\r\n', False),
1622        )
1623        for version, header, reuse in tests:
1624            with self.subTest(version=version, header=header):
1625                msg = (
1626                    'HTTP/{} 200 OK\r\n'
1627                    '{}'
1628                    'Content-Length: 12\r\n'
1629                    '\r\n'
1630                    'Dummy body\r\n'
1631                ).format(version, header)
1632                conn = FakeSocketHTTPConnection(msg)
1633                self.assertIsNone(conn.sock)
1634                conn.request('GET', '/open-connection')
1635                with conn.getresponse() as response:
1636                    self.assertEqual(conn.sock is None, not reuse)
1637                    response.read()
1638                self.assertEqual(conn.sock is None, not reuse)
1639                self.assertEqual(conn.connections, 1)
1640                conn.request('GET', '/subsequent-request')
1641                self.assertEqual(conn.connections, 1 if reuse else 2)
1642
1643    def test_disconnected(self):
1644
1645        def make_reset_reader(text):
1646            """Return BufferedReader that raises ECONNRESET at EOF"""
1647            stream = io.BytesIO(text)
1648            def readinto(buffer):
1649                size = io.BytesIO.readinto(stream, buffer)
1650                if size == 0:
1651                    raise ConnectionResetError()
1652                return size
1653            stream.readinto = readinto
1654            return io.BufferedReader(stream)
1655
1656        tests = (
1657            (io.BytesIO, client.RemoteDisconnected),
1658            (make_reset_reader, ConnectionResetError),
1659        )
1660        for stream_factory, exception in tests:
1661            with self.subTest(exception=exception):
1662                conn = FakeSocketHTTPConnection(b'', stream_factory)
1663                conn.request('GET', '/eof-response')
1664                self.assertRaises(exception, conn.getresponse)
1665                self.assertIsNone(conn.sock)
1666                # HTTPConnection.connect() should be automatically invoked
1667                conn.request('GET', '/reconnect')
1668                self.assertEqual(conn.connections, 2)
1669
1670    def test_100_close(self):
1671        conn = FakeSocketHTTPConnection(
1672            b'HTTP/1.1 100 Continue\r\n'
1673            b'\r\n'
1674            # Missing final response
1675        )
1676        conn.request('GET', '/', headers={'Expect': '100-continue'})
1677        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1678        self.assertIsNone(conn.sock)
1679        conn.request('GET', '/reconnect')
1680        self.assertEqual(conn.connections, 2)
1681
1682
1683class HTTPSTest(TestCase):
1684
1685    def setUp(self):
1686        if not hasattr(client, 'HTTPSConnection'):
1687            self.skipTest('ssl support required')
1688
1689    def make_server(self, certfile):
1690        from test.ssl_servers import make_https_server
1691        return make_https_server(self, certfile=certfile)
1692
1693    def test_attributes(self):
1694        # simple test to check it's storing the timeout
1695        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1696        self.assertEqual(h.timeout, 30)
1697
1698    def test_networked(self):
1699        # Default settings: requires a valid cert from a trusted CA
1700        import ssl
1701        support.requires('network')
1702        with socket_helper.transient_internet('self-signed.pythontest.net'):
1703            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1704            with self.assertRaises(ssl.SSLError) as exc_info:
1705                h.request('GET', '/')
1706            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1707
1708    def test_networked_noverification(self):
1709        # Switch off cert verification
1710        import ssl
1711        support.requires('network')
1712        with socket_helper.transient_internet('self-signed.pythontest.net'):
1713            context = ssl._create_unverified_context()
1714            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1715                                       context=context)
1716            h.request('GET', '/')
1717            resp = h.getresponse()
1718            h.close()
1719            self.assertIn('nginx', resp.getheader('server'))
1720            resp.close()
1721
1722    @support.system_must_validate_cert
1723    def test_networked_trusted_by_default_cert(self):
1724        # Default settings: requires a valid cert from a trusted CA
1725        support.requires('network')
1726        with socket_helper.transient_internet('www.python.org'):
1727            h = client.HTTPSConnection('www.python.org', 443)
1728            h.request('GET', '/')
1729            resp = h.getresponse()
1730            content_type = resp.getheader('content-type')
1731            resp.close()
1732            h.close()
1733            self.assertIn('text/html', content_type)
1734
1735    def test_networked_good_cert(self):
1736        # We feed the server's cert as a validating cert
1737        import ssl
1738        support.requires('network')
1739        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1740        with socket_helper.transient_internet(selfsigned_pythontestdotnet):
1741            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1742            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1743            self.assertEqual(context.check_hostname, True)
1744            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1745            try:
1746                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1747                                           context=context)
1748                h.request('GET', '/')
1749                resp = h.getresponse()
1750            except ssl.SSLError as ssl_err:
1751                ssl_err_str = str(ssl_err)
1752                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1753                # modern Linux distros (Debian Buster, etc) default OpenSSL
1754                # configurations it'll fail saying "key too weak" until we
1755                # address https://bugs.python.org/issue36816 to use a proper
1756                # key size on self-signed.pythontest.net.
1757                if re.search(r'(?i)key.too.weak', ssl_err_str):
1758                    raise unittest.SkipTest(
1759                        f'Got {ssl_err_str} trying to connect '
1760                        f'to {selfsigned_pythontestdotnet}. '
1761                        'See https://bugs.python.org/issue36816.')
1762                raise
1763            server_string = resp.getheader('server')
1764            resp.close()
1765            h.close()
1766            self.assertIn('nginx', server_string)
1767
1768    def test_networked_bad_cert(self):
1769        # We feed a "CA" cert that is unrelated to the server's cert
1770        import ssl
1771        support.requires('network')
1772        with socket_helper.transient_internet('self-signed.pythontest.net'):
1773            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1774            context.load_verify_locations(CERT_localhost)
1775            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1776            with self.assertRaises(ssl.SSLError) as exc_info:
1777                h.request('GET', '/')
1778            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1779
1780    def test_local_unknown_cert(self):
1781        # The custom cert isn't known to the default trust bundle
1782        import ssl
1783        server = self.make_server(CERT_localhost)
1784        h = client.HTTPSConnection('localhost', server.port)
1785        with self.assertRaises(ssl.SSLError) as exc_info:
1786            h.request('GET', '/')
1787        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1788
1789    def test_local_good_hostname(self):
1790        # The (valid) cert validates the HTTP hostname
1791        import ssl
1792        server = self.make_server(CERT_localhost)
1793        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1794        context.load_verify_locations(CERT_localhost)
1795        h = client.HTTPSConnection('localhost', server.port, context=context)
1796        self.addCleanup(h.close)
1797        h.request('GET', '/nonexistent')
1798        resp = h.getresponse()
1799        self.addCleanup(resp.close)
1800        self.assertEqual(resp.status, 404)
1801
1802    def test_local_bad_hostname(self):
1803        # The (valid) cert doesn't validate the HTTP hostname
1804        import ssl
1805        server = self.make_server(CERT_fakehostname)
1806        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1807        context.load_verify_locations(CERT_fakehostname)
1808        h = client.HTTPSConnection('localhost', server.port, context=context)
1809        with self.assertRaises(ssl.CertificateError):
1810            h.request('GET', '/')
1811        # Same with explicit check_hostname=True
1812        with warnings_helper.check_warnings(('', DeprecationWarning)):
1813            h = client.HTTPSConnection('localhost', server.port,
1814                                       context=context, check_hostname=True)
1815        with self.assertRaises(ssl.CertificateError):
1816            h.request('GET', '/')
1817        # With check_hostname=False, the mismatching is ignored
1818        context.check_hostname = False
1819        with warnings_helper.check_warnings(('', DeprecationWarning)):
1820            h = client.HTTPSConnection('localhost', server.port,
1821                                       context=context, check_hostname=False)
1822        h.request('GET', '/nonexistent')
1823        resp = h.getresponse()
1824        resp.close()
1825        h.close()
1826        self.assertEqual(resp.status, 404)
1827        # The context's check_hostname setting is used if one isn't passed to
1828        # HTTPSConnection.
1829        context.check_hostname = False
1830        h = client.HTTPSConnection('localhost', server.port, context=context)
1831        h.request('GET', '/nonexistent')
1832        resp = h.getresponse()
1833        self.assertEqual(resp.status, 404)
1834        resp.close()
1835        h.close()
1836        # Passing check_hostname to HTTPSConnection should override the
1837        # context's setting.
1838        with warnings_helper.check_warnings(('', DeprecationWarning)):
1839            h = client.HTTPSConnection('localhost', server.port,
1840                                       context=context, check_hostname=True)
1841        with self.assertRaises(ssl.CertificateError):
1842            h.request('GET', '/')
1843
1844    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1845                     'http.client.HTTPSConnection not available')
1846    def test_host_port(self):
1847        # Check invalid host_port
1848
1849        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1850            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1851
1852        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1853                          "fe80::207:e9ff:fe9b", 8000),
1854                         ("www.python.org:443", "www.python.org", 443),
1855                         ("www.python.org:", "www.python.org", 443),
1856                         ("www.python.org", "www.python.org", 443),
1857                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1858                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1859                             443)):
1860            c = client.HTTPSConnection(hp)
1861            self.assertEqual(h, c.host)
1862            self.assertEqual(p, c.port)
1863
1864    def test_tls13_pha(self):
1865        import ssl
1866        if not ssl.HAS_TLSv1_3:
1867            self.skipTest('TLS 1.3 support required')
1868        # just check status of PHA flag
1869        h = client.HTTPSConnection('localhost', 443)
1870        self.assertTrue(h._context.post_handshake_auth)
1871
1872        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1873        self.assertFalse(context.post_handshake_auth)
1874        h = client.HTTPSConnection('localhost', 443, context=context)
1875        self.assertIs(h._context, context)
1876        self.assertFalse(h._context.post_handshake_auth)
1877
1878        with warnings.catch_warnings():
1879            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1880                                    DeprecationWarning)
1881            h = client.HTTPSConnection('localhost', 443, context=context,
1882                                       cert_file=CERT_localhost)
1883        self.assertTrue(h._context.post_handshake_auth)
1884
1885
1886class RequestBodyTest(TestCase):
1887    """Test cases where a request includes a message body."""
1888
1889    def setUp(self):
1890        self.conn = client.HTTPConnection('example.com')
1891        self.conn.sock = self.sock = FakeSocket("")
1892        self.conn.sock = self.sock
1893
1894    def get_headers_and_fp(self):
1895        f = io.BytesIO(self.sock.data)
1896        f.readline()  # read the request line
1897        message = client.parse_headers(f)
1898        return message, f
1899
1900    def test_list_body(self):
1901        # Note that no content-length is automatically calculated for
1902        # an iterable.  The request will fall back to send chunked
1903        # transfer encoding.
1904        cases = (
1905            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1906            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1907        )
1908        for body, expected in cases:
1909            with self.subTest(body):
1910                self.conn = client.HTTPConnection('example.com')
1911                self.conn.sock = self.sock = FakeSocket('')
1912
1913                self.conn.request('PUT', '/url', body)
1914                msg, f = self.get_headers_and_fp()
1915                self.assertNotIn('Content-Type', msg)
1916                self.assertNotIn('Content-Length', msg)
1917                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1918                self.assertEqual(expected, f.read())
1919
1920    def test_manual_content_length(self):
1921        # Set an incorrect content-length so that we can verify that
1922        # it will not be over-ridden by the library.
1923        self.conn.request("PUT", "/url", "body",
1924                          {"Content-Length": "42"})
1925        message, f = self.get_headers_and_fp()
1926        self.assertEqual("42", message.get("content-length"))
1927        self.assertEqual(4, len(f.read()))
1928
1929    def test_ascii_body(self):
1930        self.conn.request("PUT", "/url", "body")
1931        message, f = self.get_headers_and_fp()
1932        self.assertEqual("text/plain", message.get_content_type())
1933        self.assertIsNone(message.get_charset())
1934        self.assertEqual("4", message.get("content-length"))
1935        self.assertEqual(b'body', f.read())
1936
1937    def test_latin1_body(self):
1938        self.conn.request("PUT", "/url", "body\xc1")
1939        message, f = self.get_headers_and_fp()
1940        self.assertEqual("text/plain", message.get_content_type())
1941        self.assertIsNone(message.get_charset())
1942        self.assertEqual("5", message.get("content-length"))
1943        self.assertEqual(b'body\xc1', f.read())
1944
1945    def test_bytes_body(self):
1946        self.conn.request("PUT", "/url", b"body\xc1")
1947        message, f = self.get_headers_and_fp()
1948        self.assertEqual("text/plain", message.get_content_type())
1949        self.assertIsNone(message.get_charset())
1950        self.assertEqual("5", message.get("content-length"))
1951        self.assertEqual(b'body\xc1', f.read())
1952
1953    def test_text_file_body(self):
1954        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1955        with open(os_helper.TESTFN, "w", encoding="utf-8") as f:
1956            f.write("body")
1957        with open(os_helper.TESTFN, encoding="utf-8") as f:
1958            self.conn.request("PUT", "/url", f)
1959            message, f = self.get_headers_and_fp()
1960            self.assertEqual("text/plain", message.get_content_type())
1961            self.assertIsNone(message.get_charset())
1962            # No content-length will be determined for files; the body
1963            # will be sent using chunked transfer encoding instead.
1964            self.assertIsNone(message.get("content-length"))
1965            self.assertEqual("chunked", message.get("transfer-encoding"))
1966            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1967
1968    def test_binary_file_body(self):
1969        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1970        with open(os_helper.TESTFN, "wb") as f:
1971            f.write(b"body\xc1")
1972        with open(os_helper.TESTFN, "rb") as f:
1973            self.conn.request("PUT", "/url", f)
1974            message, f = self.get_headers_and_fp()
1975            self.assertEqual("text/plain", message.get_content_type())
1976            self.assertIsNone(message.get_charset())
1977            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1978            self.assertNotIn("Content-Length", message)
1979            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1980
1981
1982class HTTPResponseTest(TestCase):
1983
1984    def setUp(self):
1985        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1986                second-value\r\n\r\nText"
1987        sock = FakeSocket(body)
1988        self.resp = client.HTTPResponse(sock)
1989        self.resp.begin()
1990
1991    def test_getting_header(self):
1992        header = self.resp.getheader('My-Header')
1993        self.assertEqual(header, 'first-value, second-value')
1994
1995        header = self.resp.getheader('My-Header', 'some default')
1996        self.assertEqual(header, 'first-value, second-value')
1997
1998    def test_getting_nonexistent_header_with_string_default(self):
1999        header = self.resp.getheader('No-Such-Header', 'default-value')
2000        self.assertEqual(header, 'default-value')
2001
2002    def test_getting_nonexistent_header_with_iterable_default(self):
2003        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
2004        self.assertEqual(header, 'default, values')
2005
2006        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
2007        self.assertEqual(header, 'default, values')
2008
2009    def test_getting_nonexistent_header_without_default(self):
2010        header = self.resp.getheader('No-Such-Header')
2011        self.assertEqual(header, None)
2012
2013    def test_getting_header_defaultint(self):
2014        header = self.resp.getheader('No-Such-Header',default=42)
2015        self.assertEqual(header, 42)
2016
2017class TunnelTests(TestCase):
2018    def setUp(self):
2019        response_text = (
2020            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
2021            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
2022            'Content-Length: 42\r\n\r\n'
2023        )
2024        self.host = 'proxy.com'
2025        self.conn = client.HTTPConnection(self.host)
2026        self.conn._create_connection = self._create_connection(response_text)
2027
2028    def tearDown(self):
2029        self.conn.close()
2030
2031    def _create_connection(self, response_text):
2032        def create_connection(address, timeout=None, source_address=None):
2033            return FakeSocket(response_text, host=address[0], port=address[1])
2034        return create_connection
2035
2036    def test_set_tunnel_host_port_headers(self):
2037        tunnel_host = 'destination.com'
2038        tunnel_port = 8888
2039        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
2040        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
2041                             headers=tunnel_headers)
2042        self.conn.request('HEAD', '/', '')
2043        self.assertEqual(self.conn.sock.host, self.host)
2044        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2045        self.assertEqual(self.conn._tunnel_host, tunnel_host)
2046        self.assertEqual(self.conn._tunnel_port, tunnel_port)
2047        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2048
2049    def test_disallow_set_tunnel_after_connect(self):
2050        # Once connected, we shouldn't be able to tunnel anymore
2051        self.conn.connect()
2052        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2053                          'destination.com')
2054
2055    def test_connect_with_tunnel(self):
2056        self.conn.set_tunnel('destination.com')
2057        self.conn.request('HEAD', '/', '')
2058        self.assertEqual(self.conn.sock.host, self.host)
2059        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2060        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2061        # issue22095
2062        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2063        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2064
2065        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2066        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2067
2068    def test_tunnel_connect_single_send_connection_setup(self):
2069        """Regresstion test for https://bugs.python.org/issue43332."""
2070        with mock.patch.object(self.conn, 'send') as mock_send:
2071            self.conn.set_tunnel('destination.com')
2072            self.conn.connect()
2073            self.conn.request('GET', '/')
2074        mock_send.assert_called()
2075        # Likely 2, but this test only cares about the first.
2076        self.assertGreater(
2077                len(mock_send.mock_calls), 1,
2078                msg=f'unexpected number of send calls: {mock_send.mock_calls}')
2079        proxy_setup_data_sent = mock_send.mock_calls[0][1][0]
2080        self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent)
2081        self.assertTrue(
2082                proxy_setup_data_sent.endswith(b'\r\n\r\n'),
2083                msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}')
2084
2085    def test_connect_put_request(self):
2086        self.conn.set_tunnel('destination.com')
2087        self.conn.request('PUT', '/', '')
2088        self.assertEqual(self.conn.sock.host, self.host)
2089        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2090        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2091        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2092
2093    def test_tunnel_debuglog(self):
2094        expected_header = 'X-Dummy: 1'
2095        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2096
2097        self.conn.set_debuglevel(1)
2098        self.conn._create_connection = self._create_connection(response_text)
2099        self.conn.set_tunnel('destination.com')
2100
2101        with support.captured_stdout() as output:
2102            self.conn.request('PUT', '/', '')
2103        lines = output.getvalue().splitlines()
2104        self.assertIn('header: {}'.format(expected_header), lines)
2105
2106
2107if __name__ == '__main__':
2108    unittest.main(verbosity=2)
2109