• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client, HTTPStatus
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13TestCase = unittest.TestCase
14
15from test import support
16from test.support import socket_helper
17
18here = os.path.dirname(__file__)
19# Self-signed cert file for 'localhost'
20CERT_localhost = os.path.join(here, 'keycert.pem')
21# Self-signed cert file for 'fakehostname'
22CERT_fakehostname = os.path.join(here, 'keycert2.pem')
23# Self-signed cert file for self-signed.pythontest.net
24CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
25
26# constants for testing chunked encoding
27chunked_start = (
28    'HTTP/1.1 200 OK\r\n'
29    'Transfer-Encoding: chunked\r\n\r\n'
30    'a\r\n'
31    'hello worl\r\n'
32    '3\r\n'
33    'd! \r\n'
34    '8\r\n'
35    'and now \r\n'
36    '22\r\n'
37    'for something completely different\r\n'
38)
39chunked_expected = b'hello world! and now for something completely different'
40chunk_extension = ";foo=bar"
41last_chunk = "0\r\n"
42last_chunk_extended = "0" + chunk_extension + "\r\n"
43trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
44chunked_end = "\r\n"
45
46HOST = socket_helper.HOST
47
48class FakeSocket:
49    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
50        if isinstance(text, str):
51            text = text.encode("ascii")
52        self.text = text
53        self.fileclass = fileclass
54        self.data = b''
55        self.sendall_calls = 0
56        self.file_closed = False
57        self.host = host
58        self.port = port
59
60    def sendall(self, data):
61        self.sendall_calls += 1
62        self.data += data
63
64    def makefile(self, mode, bufsize=None):
65        if mode != 'r' and mode != 'rb':
66            raise client.UnimplementedFileMode()
67        # keep the file around so we can check how much was read from it
68        self.file = self.fileclass(self.text)
69        self.file.close = self.file_close #nerf close ()
70        return self.file
71
72    def file_close(self):
73        self.file_closed = True
74
75    def close(self):
76        pass
77
78    def setsockopt(self, level, optname, value):
79        pass
80
81class EPipeSocket(FakeSocket):
82
83    def __init__(self, text, pipe_trigger):
84        # When sendall() is called with pipe_trigger, raise EPIPE.
85        FakeSocket.__init__(self, text)
86        self.pipe_trigger = pipe_trigger
87
88    def sendall(self, data):
89        if self.pipe_trigger in data:
90            raise OSError(errno.EPIPE, "gotcha")
91        self.data += data
92
93    def close(self):
94        pass
95
96class NoEOFBytesIO(io.BytesIO):
97    """Like BytesIO, but raises AssertionError on EOF.
98
99    This is used below to test that http.client doesn't try to read
100    more from the underlying file than it should.
101    """
102    def read(self, n=-1):
103        data = io.BytesIO.read(self, n)
104        if data == b'':
105            raise AssertionError('caller tried to read past EOF')
106        return data
107
108    def readline(self, length=None):
109        data = io.BytesIO.readline(self, length)
110        if data == b'':
111            raise AssertionError('caller tried to read past EOF')
112        return data
113
114class FakeSocketHTTPConnection(client.HTTPConnection):
115    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
116
117    def __init__(self, *args):
118        self.connections = 0
119        super().__init__('example.com')
120        self.fake_socket_args = args
121        self._create_connection = self.create_connection
122
123    def connect(self):
124        """Count the number of times connect() is invoked"""
125        self.connections += 1
126        return super().connect()
127
128    def create_connection(self, *pos, **kw):
129        return FakeSocket(*self.fake_socket_args)
130
131class HeaderTests(TestCase):
132    def test_auto_headers(self):
133        # Some headers are added automatically, but should not be added by
134        # .request() if they are explicitly set.
135
136        class HeaderCountingBuffer(list):
137            def __init__(self):
138                self.count = {}
139            def append(self, item):
140                kv = item.split(b':')
141                if len(kv) > 1:
142                    # item is a 'Key: Value' header string
143                    lcKey = kv[0].decode('ascii').lower()
144                    self.count.setdefault(lcKey, 0)
145                    self.count[lcKey] += 1
146                list.append(self, item)
147
148        for explicit_header in True, False:
149            for header in 'Content-length', 'Host', 'Accept-encoding':
150                conn = client.HTTPConnection('example.com')
151                conn.sock = FakeSocket('blahblahblah')
152                conn._buffer = HeaderCountingBuffer()
153
154                body = 'spamspamspam'
155                headers = {}
156                if explicit_header:
157                    headers[header] = str(len(body))
158                conn.request('POST', '/', body, headers)
159                self.assertEqual(conn._buffer.count[header.lower()], 1)
160
161    def test_content_length_0(self):
162
163        class ContentLengthChecker(list):
164            def __init__(self):
165                list.__init__(self)
166                self.content_length = None
167            def append(self, item):
168                kv = item.split(b':', 1)
169                if len(kv) > 1 and kv[0].lower() == b'content-length':
170                    self.content_length = kv[1].strip()
171                list.append(self, item)
172
173        # Here, we're testing that methods expecting a body get a
174        # content-length set to zero if the body is empty (either None or '')
175        bodies = (None, '')
176        methods_with_body = ('PUT', 'POST', 'PATCH')
177        for method, body in itertools.product(methods_with_body, bodies):
178            conn = client.HTTPConnection('example.com')
179            conn.sock = FakeSocket(None)
180            conn._buffer = ContentLengthChecker()
181            conn.request(method, '/', body)
182            self.assertEqual(
183                conn._buffer.content_length, b'0',
184                'Header Content-Length incorrect on {}'.format(method)
185            )
186
187        # For these methods, we make sure that content-length is not set when
188        # the body is None because it might cause unexpected behaviour on the
189        # server.
190        methods_without_body = (
191             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
192        )
193        for method in methods_without_body:
194            conn = client.HTTPConnection('example.com')
195            conn.sock = FakeSocket(None)
196            conn._buffer = ContentLengthChecker()
197            conn.request(method, '/', None)
198            self.assertEqual(
199                conn._buffer.content_length, None,
200                'Header Content-Length set for empty body on {}'.format(method)
201            )
202
203        # If the body is set to '', that's considered to be "present but
204        # empty" rather than "missing", so content length would be set, even
205        # for methods that don't expect a body.
206        for method in methods_without_body:
207            conn = client.HTTPConnection('example.com')
208            conn.sock = FakeSocket(None)
209            conn._buffer = ContentLengthChecker()
210            conn.request(method, '/', '')
211            self.assertEqual(
212                conn._buffer.content_length, b'0',
213                'Header Content-Length incorrect on {}'.format(method)
214            )
215
216        # If the body is set, make sure Content-Length is set.
217        for method in itertools.chain(methods_without_body, methods_with_body):
218            conn = client.HTTPConnection('example.com')
219            conn.sock = FakeSocket(None)
220            conn._buffer = ContentLengthChecker()
221            conn.request(method, '/', ' ')
222            self.assertEqual(
223                conn._buffer.content_length, b'1',
224                'Header Content-Length incorrect on {}'.format(method)
225            )
226
227    def test_putheader(self):
228        conn = client.HTTPConnection('example.com')
229        conn.sock = FakeSocket(None)
230        conn.putrequest('GET','/')
231        conn.putheader('Content-length', 42)
232        self.assertIn(b'Content-length: 42', conn._buffer)
233
234        conn.putheader('Foo', ' bar ')
235        self.assertIn(b'Foo:  bar ', conn._buffer)
236        conn.putheader('Bar', '\tbaz\t')
237        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
238        conn.putheader('Authorization', 'Bearer mytoken')
239        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
240        conn.putheader('IterHeader', 'IterA', 'IterB')
241        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
242        conn.putheader('LatinHeader', b'\xFF')
243        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
244        conn.putheader('Utf8Header', b'\xc3\x80')
245        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
246        conn.putheader('C1-Control', b'next\x85line')
247        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
248        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
249        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
250        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
251        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
252        conn.putheader('Key Space', 'value')
253        self.assertIn(b'Key Space: value', conn._buffer)
254        conn.putheader('KeySpace ', 'value')
255        self.assertIn(b'KeySpace : value', conn._buffer)
256        conn.putheader(b'Nonbreak\xa0Space', 'value')
257        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
258        conn.putheader(b'\xa0NonbreakSpace', 'value')
259        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
260
261    def test_ipv6host_header(self):
262        # Default host header on IPv6 transaction should be wrapped by [] if
263        # it is an IPv6 address
264        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
265                   b'Accept-Encoding: identity\r\n\r\n'
266        conn = client.HTTPConnection('[2001::]:81')
267        sock = FakeSocket('')
268        conn.sock = sock
269        conn.request('GET', '/foo')
270        self.assertTrue(sock.data.startswith(expected))
271
272        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
273                   b'Accept-Encoding: identity\r\n\r\n'
274        conn = client.HTTPConnection('[2001:102A::]')
275        sock = FakeSocket('')
276        conn.sock = sock
277        conn.request('GET', '/foo')
278        self.assertTrue(sock.data.startswith(expected))
279
280    def test_malformed_headers_coped_with(self):
281        # Issue 19996
282        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
283        sock = FakeSocket(body)
284        resp = client.HTTPResponse(sock)
285        resp.begin()
286
287        self.assertEqual(resp.getheader('First'), 'val')
288        self.assertEqual(resp.getheader('Second'), 'val')
289
290    def test_parse_all_octets(self):
291        # Ensure no valid header field octet breaks the parser
292        body = (
293            b'HTTP/1.1 200 OK\r\n'
294            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
295            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
296            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
297            b'obs-fold: text\r\n'
298            b' folded with space\r\n'
299            b'\tfolded with tab\r\n'
300            b'Content-Length: 0\r\n'
301            b'\r\n'
302        )
303        sock = FakeSocket(body)
304        resp = client.HTTPResponse(sock)
305        resp.begin()
306        self.assertEqual(resp.getheader('Content-Length'), '0')
307        self.assertEqual(resp.msg['Content-Length'], '0')
308        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
309        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
310        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
311        self.assertEqual(resp.getheader('VCHAR'), vchar)
312        self.assertEqual(resp.msg['VCHAR'], vchar)
313        self.assertIsNotNone(resp.getheader('obs-text'))
314        self.assertIn('obs-text', resp.msg)
315        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
316            self.assertTrue(folded.startswith('text'))
317            self.assertIn(' folded with space', folded)
318            self.assertTrue(folded.endswith('folded with tab'))
319
320    def test_invalid_headers(self):
321        conn = client.HTTPConnection('example.com')
322        conn.sock = FakeSocket('')
323        conn.putrequest('GET', '/')
324
325        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
326        # longer allowed in header names
327        cases = (
328            (b'Invalid\r\nName', b'ValidValue'),
329            (b'Invalid\rName', b'ValidValue'),
330            (b'Invalid\nName', b'ValidValue'),
331            (b'\r\nInvalidName', b'ValidValue'),
332            (b'\rInvalidName', b'ValidValue'),
333            (b'\nInvalidName', b'ValidValue'),
334            (b' InvalidName', b'ValidValue'),
335            (b'\tInvalidName', b'ValidValue'),
336            (b'Invalid:Name', b'ValidValue'),
337            (b':InvalidName', b'ValidValue'),
338            (b'ValidName', b'Invalid\r\nValue'),
339            (b'ValidName', b'Invalid\rValue'),
340            (b'ValidName', b'Invalid\nValue'),
341            (b'ValidName', b'InvalidValue\r\n'),
342            (b'ValidName', b'InvalidValue\r'),
343            (b'ValidName', b'InvalidValue\n'),
344        )
345        for name, value in cases:
346            with self.subTest((name, value)):
347                with self.assertRaisesRegex(ValueError, 'Invalid header'):
348                    conn.putheader(name, value)
349
350    def test_headers_debuglevel(self):
351        body = (
352            b'HTTP/1.1 200 OK\r\n'
353            b'First: val\r\n'
354            b'Second: val1\r\n'
355            b'Second: val2\r\n'
356        )
357        sock = FakeSocket(body)
358        resp = client.HTTPResponse(sock, debuglevel=1)
359        with support.captured_stdout() as output:
360            resp.begin()
361        lines = output.getvalue().splitlines()
362        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
363        self.assertEqual(lines[1], "header: First: val")
364        self.assertEqual(lines[2], "header: Second: val1")
365        self.assertEqual(lines[3], "header: Second: val2")
366
367
368class HttpMethodTests(TestCase):
369    def test_invalid_method_names(self):
370        methods = (
371            'GET\r',
372            'POST\n',
373            'PUT\n\r',
374            'POST\nValue',
375            'POST\nHOST:abc',
376            'GET\nrHost:abc\n',
377            'POST\rRemainder:\r',
378            'GET\rHOST:\n',
379            '\nPUT'
380        )
381
382        for method in methods:
383            with self.assertRaisesRegex(
384                    ValueError, "method can't contain control characters"):
385                conn = client.HTTPConnection('example.com')
386                conn.sock = FakeSocket(None)
387                conn.request(method=method, url="/")
388
389
390class TransferEncodingTest(TestCase):
391    expected_body = b"It's just a flesh wound"
392
393    def test_endheaders_chunked(self):
394        conn = client.HTTPConnection('example.com')
395        conn.sock = FakeSocket(b'')
396        conn.putrequest('POST', '/')
397        conn.endheaders(self._make_body(), encode_chunked=True)
398
399        _, _, body = self._parse_request(conn.sock.data)
400        body = self._parse_chunked(body)
401        self.assertEqual(body, self.expected_body)
402
403    def test_explicit_headers(self):
404        # explicit chunked
405        conn = client.HTTPConnection('example.com')
406        conn.sock = FakeSocket(b'')
407        # this shouldn't actually be automatically chunk-encoded because the
408        # calling code has explicitly stated that it's taking care of it
409        conn.request(
410            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
411
412        _, headers, body = self._parse_request(conn.sock.data)
413        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
414        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
415        self.assertEqual(body, self.expected_body)
416
417        # explicit chunked, string body
418        conn = client.HTTPConnection('example.com')
419        conn.sock = FakeSocket(b'')
420        conn.request(
421            'POST', '/', self.expected_body.decode('latin-1'),
422            {'Transfer-Encoding': 'chunked'})
423
424        _, headers, body = self._parse_request(conn.sock.data)
425        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
426        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
427        self.assertEqual(body, self.expected_body)
428
429        # User-specified TE, but request() does the chunk encoding
430        conn = client.HTTPConnection('example.com')
431        conn.sock = FakeSocket(b'')
432        conn.request('POST', '/',
433            headers={'Transfer-Encoding': 'gzip, chunked'},
434            encode_chunked=True,
435            body=self._make_body())
436        _, headers, body = self._parse_request(conn.sock.data)
437        self.assertNotIn('content-length', [k.lower() for k in headers])
438        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
439        self.assertEqual(self._parse_chunked(body), self.expected_body)
440
441    def test_request(self):
442        for empty_lines in (False, True,):
443            conn = client.HTTPConnection('example.com')
444            conn.sock = FakeSocket(b'')
445            conn.request(
446                'POST', '/', self._make_body(empty_lines=empty_lines))
447
448            _, headers, body = self._parse_request(conn.sock.data)
449            body = self._parse_chunked(body)
450            self.assertEqual(body, self.expected_body)
451            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
452
453            # Content-Length and Transfer-Encoding SHOULD not be sent in the
454            # same request
455            self.assertNotIn('content-length', [k.lower() for k in headers])
456
457    def test_empty_body(self):
458        # Zero-length iterable should be treated like any other iterable
459        conn = client.HTTPConnection('example.com')
460        conn.sock = FakeSocket(b'')
461        conn.request('POST', '/', ())
462        _, headers, body = self._parse_request(conn.sock.data)
463        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
464        self.assertNotIn('content-length', [k.lower() for k in headers])
465        self.assertEqual(body, b"0\r\n\r\n")
466
467    def _make_body(self, empty_lines=False):
468        lines = self.expected_body.split(b' ')
469        for idx, line in enumerate(lines):
470            # for testing handling empty lines
471            if empty_lines and idx % 2:
472                yield b''
473            if idx < len(lines) - 1:
474                yield line + b' '
475            else:
476                yield line
477
478    def _parse_request(self, data):
479        lines = data.split(b'\r\n')
480        request = lines[0]
481        headers = {}
482        n = 1
483        while n < len(lines) and len(lines[n]) > 0:
484            key, val = lines[n].split(b':')
485            key = key.decode('latin-1').strip()
486            headers[key] = val.decode('latin-1').strip()
487            n += 1
488
489        return request, headers, b'\r\n'.join(lines[n + 1:])
490
491    def _parse_chunked(self, data):
492        body = []
493        trailers = {}
494        n = 0
495        lines = data.split(b'\r\n')
496        # parse body
497        while True:
498            size, chunk = lines[n:n+2]
499            size = int(size, 16)
500
501            if size == 0:
502                n += 1
503                break
504
505            self.assertEqual(size, len(chunk))
506            body.append(chunk)
507
508            n += 2
509            # we /should/ hit the end chunk, but check against the size of
510            # lines so we're not stuck in an infinite loop should we get
511            # malformed data
512            if n > len(lines):
513                break
514
515        return b''.join(body)
516
517
518class BasicTest(TestCase):
519    def test_dir_with_added_behavior_on_status(self):
520        # see issue40084
521        self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
522
523    def test_status_lines(self):
524        # Test HTTP status lines
525
526        body = "HTTP/1.1 200 Ok\r\n\r\nText"
527        sock = FakeSocket(body)
528        resp = client.HTTPResponse(sock)
529        resp.begin()
530        self.assertEqual(resp.read(0), b'')  # Issue #20007
531        self.assertFalse(resp.isclosed())
532        self.assertFalse(resp.closed)
533        self.assertEqual(resp.read(), b"Text")
534        self.assertTrue(resp.isclosed())
535        self.assertFalse(resp.closed)
536        resp.close()
537        self.assertTrue(resp.closed)
538
539        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
540        sock = FakeSocket(body)
541        resp = client.HTTPResponse(sock)
542        self.assertRaises(client.BadStatusLine, resp.begin)
543
544    def test_bad_status_repr(self):
545        exc = client.BadStatusLine('')
546        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
547
548    def test_partial_reads(self):
549        # if we have Content-Length, HTTPResponse knows when to close itself,
550        # the same behaviour as when we read the whole thing with read()
551        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
552        sock = FakeSocket(body)
553        resp = client.HTTPResponse(sock)
554        resp.begin()
555        self.assertEqual(resp.read(2), b'Te')
556        self.assertFalse(resp.isclosed())
557        self.assertEqual(resp.read(2), b'xt')
558        self.assertTrue(resp.isclosed())
559        self.assertFalse(resp.closed)
560        resp.close()
561        self.assertTrue(resp.closed)
562
563    def test_mixed_reads(self):
564        # readline() should update the remaining length, so that read() knows
565        # how much data is left and does not raise IncompleteRead
566        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
567        sock = FakeSocket(body)
568        resp = client.HTTPResponse(sock)
569        resp.begin()
570        self.assertEqual(resp.readline(), b'Text\r\n')
571        self.assertFalse(resp.isclosed())
572        self.assertEqual(resp.read(), b'Another')
573        self.assertTrue(resp.isclosed())
574        self.assertFalse(resp.closed)
575        resp.close()
576        self.assertTrue(resp.closed)
577
578    def test_partial_readintos(self):
579        # if we have Content-Length, HTTPResponse knows when to close itself,
580        # the same behaviour as when we read the whole thing with read()
581        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
582        sock = FakeSocket(body)
583        resp = client.HTTPResponse(sock)
584        resp.begin()
585        b = bytearray(2)
586        n = resp.readinto(b)
587        self.assertEqual(n, 2)
588        self.assertEqual(bytes(b), b'Te')
589        self.assertFalse(resp.isclosed())
590        n = resp.readinto(b)
591        self.assertEqual(n, 2)
592        self.assertEqual(bytes(b), b'xt')
593        self.assertTrue(resp.isclosed())
594        self.assertFalse(resp.closed)
595        resp.close()
596        self.assertTrue(resp.closed)
597
598    def test_partial_reads_no_content_length(self):
599        # when no length is present, the socket should be gracefully closed when
600        # all data was read
601        body = "HTTP/1.1 200 Ok\r\n\r\nText"
602        sock = FakeSocket(body)
603        resp = client.HTTPResponse(sock)
604        resp.begin()
605        self.assertEqual(resp.read(2), b'Te')
606        self.assertFalse(resp.isclosed())
607        self.assertEqual(resp.read(2), b'xt')
608        self.assertEqual(resp.read(1), b'')
609        self.assertTrue(resp.isclosed())
610        self.assertFalse(resp.closed)
611        resp.close()
612        self.assertTrue(resp.closed)
613
614    def test_partial_readintos_no_content_length(self):
615        # when no length is present, the socket should be gracefully closed when
616        # all data was read
617        body = "HTTP/1.1 200 Ok\r\n\r\nText"
618        sock = FakeSocket(body)
619        resp = client.HTTPResponse(sock)
620        resp.begin()
621        b = bytearray(2)
622        n = resp.readinto(b)
623        self.assertEqual(n, 2)
624        self.assertEqual(bytes(b), b'Te')
625        self.assertFalse(resp.isclosed())
626        n = resp.readinto(b)
627        self.assertEqual(n, 2)
628        self.assertEqual(bytes(b), b'xt')
629        n = resp.readinto(b)
630        self.assertEqual(n, 0)
631        self.assertTrue(resp.isclosed())
632
633    def test_partial_reads_incomplete_body(self):
634        # if the server shuts down the connection before the whole
635        # content-length is delivered, the socket is gracefully closed
636        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
637        sock = FakeSocket(body)
638        resp = client.HTTPResponse(sock)
639        resp.begin()
640        self.assertEqual(resp.read(2), b'Te')
641        self.assertFalse(resp.isclosed())
642        self.assertEqual(resp.read(2), b'xt')
643        self.assertEqual(resp.read(1), b'')
644        self.assertTrue(resp.isclosed())
645
646    def test_partial_readintos_incomplete_body(self):
647        # if the server shuts down the connection before the whole
648        # content-length is delivered, the socket is gracefully closed
649        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
650        sock = FakeSocket(body)
651        resp = client.HTTPResponse(sock)
652        resp.begin()
653        b = bytearray(2)
654        n = resp.readinto(b)
655        self.assertEqual(n, 2)
656        self.assertEqual(bytes(b), b'Te')
657        self.assertFalse(resp.isclosed())
658        n = resp.readinto(b)
659        self.assertEqual(n, 2)
660        self.assertEqual(bytes(b), b'xt')
661        n = resp.readinto(b)
662        self.assertEqual(n, 0)
663        self.assertTrue(resp.isclosed())
664        self.assertFalse(resp.closed)
665        resp.close()
666        self.assertTrue(resp.closed)
667
668    def test_host_port(self):
669        # Check invalid host_port
670
671        for hp in ("www.python.org:abc", "user:password@www.python.org"):
672            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
673
674        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
675                          "fe80::207:e9ff:fe9b", 8000),
676                         ("www.python.org:80", "www.python.org", 80),
677                         ("www.python.org:", "www.python.org", 80),
678                         ("www.python.org", "www.python.org", 80),
679                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
680                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
681            c = client.HTTPConnection(hp)
682            self.assertEqual(h, c.host)
683            self.assertEqual(p, c.port)
684
685    def test_response_headers(self):
686        # test response with multiple message headers with the same field name.
687        text = ('HTTP/1.1 200 OK\r\n'
688                'Set-Cookie: Customer="WILE_E_COYOTE"; '
689                'Version="1"; Path="/acme"\r\n'
690                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
691                ' Path="/acme"\r\n'
692                '\r\n'
693                'No body\r\n')
694        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
695               ', '
696               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
697        s = FakeSocket(text)
698        r = client.HTTPResponse(s)
699        r.begin()
700        cookies = r.getheader("Set-Cookie")
701        self.assertEqual(cookies, hdr)
702
703    def test_read_head(self):
704        # Test that the library doesn't attempt to read any data
705        # from a HEAD request.  (Tickles SF bug #622042.)
706        sock = FakeSocket(
707            'HTTP/1.1 200 OK\r\n'
708            'Content-Length: 14432\r\n'
709            '\r\n',
710            NoEOFBytesIO)
711        resp = client.HTTPResponse(sock, method="HEAD")
712        resp.begin()
713        if resp.read():
714            self.fail("Did not expect response from HEAD request")
715
716    def test_readinto_head(self):
717        # Test that the library doesn't attempt to read any data
718        # from a HEAD request.  (Tickles SF bug #622042.)
719        sock = FakeSocket(
720            'HTTP/1.1 200 OK\r\n'
721            'Content-Length: 14432\r\n'
722            '\r\n',
723            NoEOFBytesIO)
724        resp = client.HTTPResponse(sock, method="HEAD")
725        resp.begin()
726        b = bytearray(5)
727        if resp.readinto(b) != 0:
728            self.fail("Did not expect response from HEAD request")
729        self.assertEqual(bytes(b), b'\x00'*5)
730
731    def test_too_many_headers(self):
732        headers = '\r\n'.join('Header%d: foo' % i
733                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
734        text = ('HTTP/1.1 200 OK\r\n' + headers)
735        s = FakeSocket(text)
736        r = client.HTTPResponse(s)
737        self.assertRaisesRegex(client.HTTPException,
738                               r"got more than \d+ headers", r.begin)
739
740    def test_send_file(self):
741        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
742                    b'Accept-Encoding: identity\r\n'
743                    b'Transfer-Encoding: chunked\r\n'
744                    b'\r\n')
745
746        with open(__file__, 'rb') as body:
747            conn = client.HTTPConnection('example.com')
748            sock = FakeSocket(body)
749            conn.sock = sock
750            conn.request('GET', '/foo', body)
751            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
752                    (sock.data[:len(expected)], expected))
753
754    def test_send(self):
755        expected = b'this is a test this is only a test'
756        conn = client.HTTPConnection('example.com')
757        sock = FakeSocket(None)
758        conn.sock = sock
759        conn.send(expected)
760        self.assertEqual(expected, sock.data)
761        sock.data = b''
762        conn.send(array.array('b', expected))
763        self.assertEqual(expected, sock.data)
764        sock.data = b''
765        conn.send(io.BytesIO(expected))
766        self.assertEqual(expected, sock.data)
767
768    def test_send_updating_file(self):
769        def data():
770            yield 'data'
771            yield None
772            yield 'data_two'
773
774        class UpdatingFile(io.TextIOBase):
775            mode = 'r'
776            d = data()
777            def read(self, blocksize=-1):
778                return next(self.d)
779
780        expected = b'data'
781
782        conn = client.HTTPConnection('example.com')
783        sock = FakeSocket("")
784        conn.sock = sock
785        conn.send(UpdatingFile())
786        self.assertEqual(sock.data, expected)
787
788
789    def test_send_iter(self):
790        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
791                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
792                   b'\r\nonetwothree'
793
794        def body():
795            yield b"one"
796            yield b"two"
797            yield b"three"
798
799        conn = client.HTTPConnection('example.com')
800        sock = FakeSocket("")
801        conn.sock = sock
802        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
803        self.assertEqual(sock.data, expected)
804
805    def test_blocksize_request(self):
806        """Check that request() respects the configured block size."""
807        blocksize = 8  # For easy debugging.
808        conn = client.HTTPConnection('example.com', blocksize=blocksize)
809        sock = FakeSocket(None)
810        conn.sock = sock
811        expected = b"a" * blocksize + b"b"
812        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
813        self.assertEqual(sock.sendall_calls, 3)
814        body = sock.data.split(b"\r\n\r\n", 1)[1]
815        self.assertEqual(body, expected)
816
817    def test_blocksize_send(self):
818        """Check that send() respects the configured block size."""
819        blocksize = 8  # For easy debugging.
820        conn = client.HTTPConnection('example.com', blocksize=blocksize)
821        sock = FakeSocket(None)
822        conn.sock = sock
823        expected = b"a" * blocksize + b"b"
824        conn.send(io.BytesIO(expected))
825        self.assertEqual(sock.sendall_calls, 2)
826        self.assertEqual(sock.data, expected)
827
828    def test_send_type_error(self):
829        # See: Issue #12676
830        conn = client.HTTPConnection('example.com')
831        conn.sock = FakeSocket('')
832        with self.assertRaises(TypeError):
833            conn.request('POST', 'test', conn)
834
835    def test_chunked(self):
836        expected = chunked_expected
837        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
838        resp = client.HTTPResponse(sock, method="GET")
839        resp.begin()
840        self.assertEqual(resp.read(), expected)
841        resp.close()
842
843        # Various read sizes
844        for n in range(1, 12):
845            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
846            resp = client.HTTPResponse(sock, method="GET")
847            resp.begin()
848            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
849            resp.close()
850
851        for x in ('', 'foo\r\n'):
852            sock = FakeSocket(chunked_start + x)
853            resp = client.HTTPResponse(sock, method="GET")
854            resp.begin()
855            try:
856                resp.read()
857            except client.IncompleteRead as i:
858                self.assertEqual(i.partial, expected)
859                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
860                self.assertEqual(repr(i), expected_message)
861                self.assertEqual(str(i), expected_message)
862            else:
863                self.fail('IncompleteRead expected')
864            finally:
865                resp.close()
866
867    def test_readinto_chunked(self):
868
869        expected = chunked_expected
870        nexpected = len(expected)
871        b = bytearray(128)
872
873        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
874        resp = client.HTTPResponse(sock, method="GET")
875        resp.begin()
876        n = resp.readinto(b)
877        self.assertEqual(b[:nexpected], expected)
878        self.assertEqual(n, nexpected)
879        resp.close()
880
881        # Various read sizes
882        for n in range(1, 12):
883            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
884            resp = client.HTTPResponse(sock, method="GET")
885            resp.begin()
886            m = memoryview(b)
887            i = resp.readinto(m[0:n])
888            i += resp.readinto(m[i:n + i])
889            i += resp.readinto(m[i:])
890            self.assertEqual(b[:nexpected], expected)
891            self.assertEqual(i, nexpected)
892            resp.close()
893
894        for x in ('', 'foo\r\n'):
895            sock = FakeSocket(chunked_start + x)
896            resp = client.HTTPResponse(sock, method="GET")
897            resp.begin()
898            try:
899                n = resp.readinto(b)
900            except client.IncompleteRead as i:
901                self.assertEqual(i.partial, expected)
902                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
903                self.assertEqual(repr(i), expected_message)
904                self.assertEqual(str(i), expected_message)
905            else:
906                self.fail('IncompleteRead expected')
907            finally:
908                resp.close()
909
910    def test_chunked_head(self):
911        chunked_start = (
912            'HTTP/1.1 200 OK\r\n'
913            'Transfer-Encoding: chunked\r\n\r\n'
914            'a\r\n'
915            'hello world\r\n'
916            '1\r\n'
917            'd\r\n'
918        )
919        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
920        resp = client.HTTPResponse(sock, method="HEAD")
921        resp.begin()
922        self.assertEqual(resp.read(), b'')
923        self.assertEqual(resp.status, 200)
924        self.assertEqual(resp.reason, 'OK')
925        self.assertTrue(resp.isclosed())
926        self.assertFalse(resp.closed)
927        resp.close()
928        self.assertTrue(resp.closed)
929
930    def test_readinto_chunked_head(self):
931        chunked_start = (
932            'HTTP/1.1 200 OK\r\n'
933            'Transfer-Encoding: chunked\r\n\r\n'
934            'a\r\n'
935            'hello world\r\n'
936            '1\r\n'
937            'd\r\n'
938        )
939        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
940        resp = client.HTTPResponse(sock, method="HEAD")
941        resp.begin()
942        b = bytearray(5)
943        n = resp.readinto(b)
944        self.assertEqual(n, 0)
945        self.assertEqual(bytes(b), b'\x00'*5)
946        self.assertEqual(resp.status, 200)
947        self.assertEqual(resp.reason, 'OK')
948        self.assertTrue(resp.isclosed())
949        self.assertFalse(resp.closed)
950        resp.close()
951        self.assertTrue(resp.closed)
952
953    def test_negative_content_length(self):
954        sock = FakeSocket(
955            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
956        resp = client.HTTPResponse(sock, method="GET")
957        resp.begin()
958        self.assertEqual(resp.read(), b'Hello\r\n')
959        self.assertTrue(resp.isclosed())
960
961    def test_incomplete_read(self):
962        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
963        resp = client.HTTPResponse(sock, method="GET")
964        resp.begin()
965        try:
966            resp.read()
967        except client.IncompleteRead as i:
968            self.assertEqual(i.partial, b'Hello\r\n')
969            self.assertEqual(repr(i),
970                             "IncompleteRead(7 bytes read, 3 more expected)")
971            self.assertEqual(str(i),
972                             "IncompleteRead(7 bytes read, 3 more expected)")
973            self.assertTrue(resp.isclosed())
974        else:
975            self.fail('IncompleteRead expected')
976
977    def test_epipe(self):
978        sock = EPipeSocket(
979            "HTTP/1.0 401 Authorization Required\r\n"
980            "Content-type: text/html\r\n"
981            "WWW-Authenticate: Basic realm=\"example\"\r\n",
982            b"Content-Length")
983        conn = client.HTTPConnection("example.com")
984        conn.sock = sock
985        self.assertRaises(OSError,
986                          lambda: conn.request("PUT", "/url", "body"))
987        resp = conn.getresponse()
988        self.assertEqual(401, resp.status)
989        self.assertEqual("Basic realm=\"example\"",
990                         resp.getheader("www-authenticate"))
991
992    # Test lines overflowing the max line size (_MAXLINE in http.client)
993
994    def test_overflowing_status_line(self):
995        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
996        resp = client.HTTPResponse(FakeSocket(body))
997        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
998
999    def test_overflowing_header_line(self):
1000        body = (
1001            'HTTP/1.1 200 OK\r\n'
1002            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
1003        )
1004        resp = client.HTTPResponse(FakeSocket(body))
1005        self.assertRaises(client.LineTooLong, resp.begin)
1006
1007    def test_overflowing_header_limit_after_100(self):
1008        body = (
1009            'HTTP/1.1 100 OK\r\n'
1010            'r\n' * 32768
1011        )
1012        resp = client.HTTPResponse(FakeSocket(body))
1013        with self.assertRaises(client.HTTPException) as cm:
1014            resp.begin()
1015        # We must assert more because other reasonable errors that we
1016        # do not want can also be HTTPException derived.
1017        self.assertIn('got more than ', str(cm.exception))
1018        self.assertIn('headers', str(cm.exception))
1019
1020    def test_overflowing_chunked_line(self):
1021        body = (
1022            'HTTP/1.1 200 OK\r\n'
1023            'Transfer-Encoding: chunked\r\n\r\n'
1024            + '0' * 65536 + 'a\r\n'
1025            'hello world\r\n'
1026            '0\r\n'
1027            '\r\n'
1028        )
1029        resp = client.HTTPResponse(FakeSocket(body))
1030        resp.begin()
1031        self.assertRaises(client.LineTooLong, resp.read)
1032
1033    def test_early_eof(self):
1034        # Test httpresponse with no \r\n termination,
1035        body = "HTTP/1.1 200 Ok"
1036        sock = FakeSocket(body)
1037        resp = client.HTTPResponse(sock)
1038        resp.begin()
1039        self.assertEqual(resp.read(), b'')
1040        self.assertTrue(resp.isclosed())
1041        self.assertFalse(resp.closed)
1042        resp.close()
1043        self.assertTrue(resp.closed)
1044
1045    def test_error_leak(self):
1046        # Test that the socket is not leaked if getresponse() fails
1047        conn = client.HTTPConnection('example.com')
1048        response = None
1049        class Response(client.HTTPResponse):
1050            def __init__(self, *pos, **kw):
1051                nonlocal response
1052                response = self  # Avoid garbage collector closing the socket
1053                client.HTTPResponse.__init__(self, *pos, **kw)
1054        conn.response_class = Response
1055        conn.sock = FakeSocket('Invalid status line')
1056        conn.request('GET', '/')
1057        self.assertRaises(client.BadStatusLine, conn.getresponse)
1058        self.assertTrue(response.closed)
1059        self.assertTrue(conn.sock.file_closed)
1060
1061    def test_chunked_extension(self):
1062        extra = '3;foo=bar\r\n' + 'abc\r\n'
1063        expected = chunked_expected + b'abc'
1064
1065        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1066        resp = client.HTTPResponse(sock, method="GET")
1067        resp.begin()
1068        self.assertEqual(resp.read(), expected)
1069        resp.close()
1070
1071    def test_chunked_missing_end(self):
1072        """some servers may serve up a short chunked encoding stream"""
1073        expected = chunked_expected
1074        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1075        resp = client.HTTPResponse(sock, method="GET")
1076        resp.begin()
1077        self.assertEqual(resp.read(), expected)
1078        resp.close()
1079
1080    def test_chunked_trailers(self):
1081        """See that trailers are read and ignored"""
1082        expected = chunked_expected
1083        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1084        resp = client.HTTPResponse(sock, method="GET")
1085        resp.begin()
1086        self.assertEqual(resp.read(), expected)
1087        # we should have reached the end of the file
1088        self.assertEqual(sock.file.read(), b"") #we read to the end
1089        resp.close()
1090
1091    def test_chunked_sync(self):
1092        """Check that we don't read past the end of the chunked-encoding stream"""
1093        expected = chunked_expected
1094        extradata = "extradata"
1095        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1096        resp = client.HTTPResponse(sock, method="GET")
1097        resp.begin()
1098        self.assertEqual(resp.read(), expected)
1099        # the file should now have our extradata ready to be read
1100        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1101        resp.close()
1102
1103    def test_content_length_sync(self):
1104        """Check that we don't read past the end of the Content-Length stream"""
1105        extradata = b"extradata"
1106        expected = b"Hello123\r\n"
1107        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1108        resp = client.HTTPResponse(sock, method="GET")
1109        resp.begin()
1110        self.assertEqual(resp.read(), expected)
1111        # the file should now have our extradata ready to be read
1112        self.assertEqual(sock.file.read(), extradata) #we read to the end
1113        resp.close()
1114
1115    def test_readlines_content_length(self):
1116        extradata = b"extradata"
1117        expected = b"Hello123\r\n"
1118        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1119        resp = client.HTTPResponse(sock, method="GET")
1120        resp.begin()
1121        self.assertEqual(resp.readlines(2000), [expected])
1122        # the file should now have our extradata ready to be read
1123        self.assertEqual(sock.file.read(), extradata) #we read to the end
1124        resp.close()
1125
1126    def test_read1_content_length(self):
1127        extradata = b"extradata"
1128        expected = b"Hello123\r\n"
1129        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1130        resp = client.HTTPResponse(sock, method="GET")
1131        resp.begin()
1132        self.assertEqual(resp.read1(2000), expected)
1133        # the file should now have our extradata ready to be read
1134        self.assertEqual(sock.file.read(), extradata) #we read to the end
1135        resp.close()
1136
1137    def test_readline_bound_content_length(self):
1138        extradata = b"extradata"
1139        expected = b"Hello123\r\n"
1140        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1141        resp = client.HTTPResponse(sock, method="GET")
1142        resp.begin()
1143        self.assertEqual(resp.readline(10), expected)
1144        self.assertEqual(resp.readline(10), b"")
1145        # the file should now have our extradata ready to be read
1146        self.assertEqual(sock.file.read(), extradata) #we read to the end
1147        resp.close()
1148
1149    def test_read1_bound_content_length(self):
1150        extradata = b"extradata"
1151        expected = b"Hello123\r\n"
1152        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1153        resp = client.HTTPResponse(sock, method="GET")
1154        resp.begin()
1155        self.assertEqual(resp.read1(20), expected*2)
1156        self.assertEqual(resp.read(), expected)
1157        # the file should now have our extradata ready to be read
1158        self.assertEqual(sock.file.read(), extradata) #we read to the end
1159        resp.close()
1160
1161    def test_response_fileno(self):
1162        # Make sure fd returned by fileno is valid.
1163        serv = socket.create_server((HOST, 0))
1164        self.addCleanup(serv.close)
1165
1166        result = None
1167        def run_server():
1168            [conn, address] = serv.accept()
1169            with conn, conn.makefile("rb") as reader:
1170                # Read the request header until a blank line
1171                while True:
1172                    line = reader.readline()
1173                    if not line.rstrip(b"\r\n"):
1174                        break
1175                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1176                nonlocal result
1177                result = reader.read()
1178
1179        thread = threading.Thread(target=run_server)
1180        thread.start()
1181        self.addCleanup(thread.join, float(1))
1182        conn = client.HTTPConnection(*serv.getsockname())
1183        conn.request("CONNECT", "dummy:1234")
1184        response = conn.getresponse()
1185        try:
1186            self.assertEqual(response.status, client.OK)
1187            s = socket.socket(fileno=response.fileno())
1188            try:
1189                s.sendall(b"proxied data\n")
1190            finally:
1191                s.detach()
1192        finally:
1193            response.close()
1194            conn.close()
1195        thread.join()
1196        self.assertEqual(result, b"proxied data\n")
1197
1198    def test_putrequest_override_domain_validation(self):
1199        """
1200        It should be possible to override the default validation
1201        behavior in putrequest (bpo-38216).
1202        """
1203        class UnsafeHTTPConnection(client.HTTPConnection):
1204            def _validate_path(self, url):
1205                pass
1206
1207        conn = UnsafeHTTPConnection('example.com')
1208        conn.sock = FakeSocket('')
1209        conn.putrequest('GET', '/\x00')
1210
1211    def test_putrequest_override_host_validation(self):
1212        class UnsafeHTTPConnection(client.HTTPConnection):
1213            def _validate_host(self, url):
1214                pass
1215
1216        conn = UnsafeHTTPConnection('example.com\r\n')
1217        conn.sock = FakeSocket('')
1218        # set skip_host so a ValueError is not raised upon adding the
1219        # invalid URL as the value of the "Host:" header
1220        conn.putrequest('GET', '/', skip_host=1)
1221
1222    def test_putrequest_override_encoding(self):
1223        """
1224        It should be possible to override the default encoding
1225        to transmit bytes in another encoding even if invalid
1226        (bpo-36274).
1227        """
1228        class UnsafeHTTPConnection(client.HTTPConnection):
1229            def _encode_request(self, str_url):
1230                return str_url.encode('utf-8')
1231
1232        conn = UnsafeHTTPConnection('example.com')
1233        conn.sock = FakeSocket('')
1234        conn.putrequest('GET', '/☃')
1235
1236
1237class ExtendedReadTest(TestCase):
1238    """
1239    Test peek(), read1(), readline()
1240    """
1241    lines = (
1242        'HTTP/1.1 200 OK\r\n'
1243        '\r\n'
1244        'hello world!\n'
1245        'and now \n'
1246        'for something completely different\n'
1247        'foo'
1248        )
1249    lines_expected = lines[lines.find('hello'):].encode("ascii")
1250    lines_chunked = (
1251        'HTTP/1.1 200 OK\r\n'
1252        'Transfer-Encoding: chunked\r\n\r\n'
1253        'a\r\n'
1254        'hello worl\r\n'
1255        '3\r\n'
1256        'd!\n\r\n'
1257        '9\r\n'
1258        'and now \n\r\n'
1259        '23\r\n'
1260        'for something completely different\n\r\n'
1261        '3\r\n'
1262        'foo\r\n'
1263        '0\r\n' # terminating chunk
1264        '\r\n'  # end of trailers
1265    )
1266
1267    def setUp(self):
1268        sock = FakeSocket(self.lines)
1269        resp = client.HTTPResponse(sock, method="GET")
1270        resp.begin()
1271        resp.fp = io.BufferedReader(resp.fp)
1272        self.resp = resp
1273
1274
1275
1276    def test_peek(self):
1277        resp = self.resp
1278        # patch up the buffered peek so that it returns not too much stuff
1279        oldpeek = resp.fp.peek
1280        def mypeek(n=-1):
1281            p = oldpeek(n)
1282            if n >= 0:
1283                return p[:n]
1284            return p[:10]
1285        resp.fp.peek = mypeek
1286
1287        all = []
1288        while True:
1289            # try a short peek
1290            p = resp.peek(3)
1291            if p:
1292                self.assertGreater(len(p), 0)
1293                # then unbounded peek
1294                p2 = resp.peek()
1295                self.assertGreaterEqual(len(p2), len(p))
1296                self.assertTrue(p2.startswith(p))
1297                next = resp.read(len(p2))
1298                self.assertEqual(next, p2)
1299            else:
1300                next = resp.read()
1301                self.assertFalse(next)
1302            all.append(next)
1303            if not next:
1304                break
1305        self.assertEqual(b"".join(all), self.lines_expected)
1306
1307    def test_readline(self):
1308        resp = self.resp
1309        self._verify_readline(self.resp.readline, self.lines_expected)
1310
1311    def _verify_readline(self, readline, expected):
1312        all = []
1313        while True:
1314            # short readlines
1315            line = readline(5)
1316            if line and line != b"foo":
1317                if len(line) < 5:
1318                    self.assertTrue(line.endswith(b"\n"))
1319            all.append(line)
1320            if not line:
1321                break
1322        self.assertEqual(b"".join(all), expected)
1323
1324    def test_read1(self):
1325        resp = self.resp
1326        def r():
1327            res = resp.read1(4)
1328            self.assertLessEqual(len(res), 4)
1329            return res
1330        readliner = Readliner(r)
1331        self._verify_readline(readliner.readline, self.lines_expected)
1332
1333    def test_read1_unbounded(self):
1334        resp = self.resp
1335        all = []
1336        while True:
1337            data = resp.read1()
1338            if not data:
1339                break
1340            all.append(data)
1341        self.assertEqual(b"".join(all), self.lines_expected)
1342
1343    def test_read1_bounded(self):
1344        resp = self.resp
1345        all = []
1346        while True:
1347            data = resp.read1(10)
1348            if not data:
1349                break
1350            self.assertLessEqual(len(data), 10)
1351            all.append(data)
1352        self.assertEqual(b"".join(all), self.lines_expected)
1353
1354    def test_read1_0(self):
1355        self.assertEqual(self.resp.read1(0), b"")
1356
1357    def test_peek_0(self):
1358        p = self.resp.peek(0)
1359        self.assertLessEqual(0, len(p))
1360
1361
1362class ExtendedReadTestChunked(ExtendedReadTest):
1363    """
1364    Test peek(), read1(), readline() in chunked mode
1365    """
1366    lines = (
1367        'HTTP/1.1 200 OK\r\n'
1368        'Transfer-Encoding: chunked\r\n\r\n'
1369        'a\r\n'
1370        'hello worl\r\n'
1371        '3\r\n'
1372        'd!\n\r\n'
1373        '9\r\n'
1374        'and now \n\r\n'
1375        '23\r\n'
1376        'for something completely different\n\r\n'
1377        '3\r\n'
1378        'foo\r\n'
1379        '0\r\n' # terminating chunk
1380        '\r\n'  # end of trailers
1381    )
1382
1383
1384class Readliner:
1385    """
1386    a simple readline class that uses an arbitrary read function and buffering
1387    """
1388    def __init__(self, readfunc):
1389        self.readfunc = readfunc
1390        self.remainder = b""
1391
1392    def readline(self, limit):
1393        data = []
1394        datalen = 0
1395        read = self.remainder
1396        try:
1397            while True:
1398                idx = read.find(b'\n')
1399                if idx != -1:
1400                    break
1401                if datalen + len(read) >= limit:
1402                    idx = limit - datalen - 1
1403                # read more data
1404                data.append(read)
1405                read = self.readfunc()
1406                if not read:
1407                    idx = 0 #eof condition
1408                    break
1409            idx += 1
1410            data.append(read[:idx])
1411            self.remainder = read[idx:]
1412            return b"".join(data)
1413        except:
1414            self.remainder = b"".join(data)
1415            raise
1416
1417
1418class OfflineTest(TestCase):
1419    def test_all(self):
1420        # Documented objects defined in the module should be in __all__
1421        expected = {"responses"}  # Allowlist documented dict() object
1422        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1423        # intentionally omitted for simplicity
1424        blacklist = {"HTTPMessage", "parse_headers"}
1425        for name in dir(client):
1426            if name.startswith("_") or name in blacklist:
1427                continue
1428            module_object = getattr(client, name)
1429            if getattr(module_object, "__module__", None) == "http.client":
1430                expected.add(name)
1431        self.assertCountEqual(client.__all__, expected)
1432
1433    def test_responses(self):
1434        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1435
1436    def test_client_constants(self):
1437        # Make sure we don't break backward compatibility with 3.4
1438        expected = [
1439            'CONTINUE',
1440            'SWITCHING_PROTOCOLS',
1441            'PROCESSING',
1442            'OK',
1443            'CREATED',
1444            'ACCEPTED',
1445            'NON_AUTHORITATIVE_INFORMATION',
1446            'NO_CONTENT',
1447            'RESET_CONTENT',
1448            'PARTIAL_CONTENT',
1449            'MULTI_STATUS',
1450            'IM_USED',
1451            'MULTIPLE_CHOICES',
1452            'MOVED_PERMANENTLY',
1453            'FOUND',
1454            'SEE_OTHER',
1455            'NOT_MODIFIED',
1456            'USE_PROXY',
1457            'TEMPORARY_REDIRECT',
1458            'BAD_REQUEST',
1459            'UNAUTHORIZED',
1460            'PAYMENT_REQUIRED',
1461            'FORBIDDEN',
1462            'NOT_FOUND',
1463            'METHOD_NOT_ALLOWED',
1464            'NOT_ACCEPTABLE',
1465            'PROXY_AUTHENTICATION_REQUIRED',
1466            'REQUEST_TIMEOUT',
1467            'CONFLICT',
1468            'GONE',
1469            'LENGTH_REQUIRED',
1470            'PRECONDITION_FAILED',
1471            'REQUEST_ENTITY_TOO_LARGE',
1472            'REQUEST_URI_TOO_LONG',
1473            'UNSUPPORTED_MEDIA_TYPE',
1474            'REQUESTED_RANGE_NOT_SATISFIABLE',
1475            'EXPECTATION_FAILED',
1476            'IM_A_TEAPOT',
1477            'MISDIRECTED_REQUEST',
1478            'UNPROCESSABLE_ENTITY',
1479            'LOCKED',
1480            'FAILED_DEPENDENCY',
1481            'UPGRADE_REQUIRED',
1482            'PRECONDITION_REQUIRED',
1483            'TOO_MANY_REQUESTS',
1484            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1485            'UNAVAILABLE_FOR_LEGAL_REASONS',
1486            'INTERNAL_SERVER_ERROR',
1487            'NOT_IMPLEMENTED',
1488            'BAD_GATEWAY',
1489            'SERVICE_UNAVAILABLE',
1490            'GATEWAY_TIMEOUT',
1491            'HTTP_VERSION_NOT_SUPPORTED',
1492            'INSUFFICIENT_STORAGE',
1493            'NOT_EXTENDED',
1494            'NETWORK_AUTHENTICATION_REQUIRED',
1495            'EARLY_HINTS',
1496            'TOO_EARLY'
1497        ]
1498        for const in expected:
1499            with self.subTest(constant=const):
1500                self.assertTrue(hasattr(client, const))
1501
1502
1503class SourceAddressTest(TestCase):
1504    def setUp(self):
1505        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1506        self.port = socket_helper.bind_port(self.serv)
1507        self.source_port = socket_helper.find_unused_port()
1508        self.serv.listen()
1509        self.conn = None
1510
1511    def tearDown(self):
1512        if self.conn:
1513            self.conn.close()
1514            self.conn = None
1515        self.serv.close()
1516        self.serv = None
1517
1518    def testHTTPConnectionSourceAddress(self):
1519        self.conn = client.HTTPConnection(HOST, self.port,
1520                source_address=('', self.source_port))
1521        self.conn.connect()
1522        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1523
1524    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1525                     'http.client.HTTPSConnection not defined')
1526    def testHTTPSConnectionSourceAddress(self):
1527        self.conn = client.HTTPSConnection(HOST, self.port,
1528                source_address=('', self.source_port))
1529        # We don't test anything here other than the constructor not barfing as
1530        # this code doesn't deal with setting up an active running SSL server
1531        # for an ssl_wrapped connect() to actually return from.
1532
1533
1534class TimeoutTest(TestCase):
1535    PORT = None
1536
1537    def setUp(self):
1538        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1539        TimeoutTest.PORT = socket_helper.bind_port(self.serv)
1540        self.serv.listen()
1541
1542    def tearDown(self):
1543        self.serv.close()
1544        self.serv = None
1545
1546    def testTimeoutAttribute(self):
1547        # This will prove that the timeout gets through HTTPConnection
1548        # and into the socket.
1549
1550        # default -- use global socket timeout
1551        self.assertIsNone(socket.getdefaulttimeout())
1552        socket.setdefaulttimeout(30)
1553        try:
1554            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1555            httpConn.connect()
1556        finally:
1557            socket.setdefaulttimeout(None)
1558        self.assertEqual(httpConn.sock.gettimeout(), 30)
1559        httpConn.close()
1560
1561        # no timeout -- do not use global socket default
1562        self.assertIsNone(socket.getdefaulttimeout())
1563        socket.setdefaulttimeout(30)
1564        try:
1565            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1566                                              timeout=None)
1567            httpConn.connect()
1568        finally:
1569            socket.setdefaulttimeout(None)
1570        self.assertEqual(httpConn.sock.gettimeout(), None)
1571        httpConn.close()
1572
1573        # a value
1574        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1575        httpConn.connect()
1576        self.assertEqual(httpConn.sock.gettimeout(), 30)
1577        httpConn.close()
1578
1579
1580class PersistenceTest(TestCase):
1581
1582    def test_reuse_reconnect(self):
1583        # Should reuse or reconnect depending on header from server
1584        tests = (
1585            ('1.0', '', False),
1586            ('1.0', 'Connection: keep-alive\r\n', True),
1587            ('1.1', '', True),
1588            ('1.1', 'Connection: close\r\n', False),
1589            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1590            ('1.1', 'Connection: cloSE\r\n', False),
1591        )
1592        for version, header, reuse in tests:
1593            with self.subTest(version=version, header=header):
1594                msg = (
1595                    'HTTP/{} 200 OK\r\n'
1596                    '{}'
1597                    'Content-Length: 12\r\n'
1598                    '\r\n'
1599                    'Dummy body\r\n'
1600                ).format(version, header)
1601                conn = FakeSocketHTTPConnection(msg)
1602                self.assertIsNone(conn.sock)
1603                conn.request('GET', '/open-connection')
1604                with conn.getresponse() as response:
1605                    self.assertEqual(conn.sock is None, not reuse)
1606                    response.read()
1607                self.assertEqual(conn.sock is None, not reuse)
1608                self.assertEqual(conn.connections, 1)
1609                conn.request('GET', '/subsequent-request')
1610                self.assertEqual(conn.connections, 1 if reuse else 2)
1611
1612    def test_disconnected(self):
1613
1614        def make_reset_reader(text):
1615            """Return BufferedReader that raises ECONNRESET at EOF"""
1616            stream = io.BytesIO(text)
1617            def readinto(buffer):
1618                size = io.BytesIO.readinto(stream, buffer)
1619                if size == 0:
1620                    raise ConnectionResetError()
1621                return size
1622            stream.readinto = readinto
1623            return io.BufferedReader(stream)
1624
1625        tests = (
1626            (io.BytesIO, client.RemoteDisconnected),
1627            (make_reset_reader, ConnectionResetError),
1628        )
1629        for stream_factory, exception in tests:
1630            with self.subTest(exception=exception):
1631                conn = FakeSocketHTTPConnection(b'', stream_factory)
1632                conn.request('GET', '/eof-response')
1633                self.assertRaises(exception, conn.getresponse)
1634                self.assertIsNone(conn.sock)
1635                # HTTPConnection.connect() should be automatically invoked
1636                conn.request('GET', '/reconnect')
1637                self.assertEqual(conn.connections, 2)
1638
1639    def test_100_close(self):
1640        conn = FakeSocketHTTPConnection(
1641            b'HTTP/1.1 100 Continue\r\n'
1642            b'\r\n'
1643            # Missing final response
1644        )
1645        conn.request('GET', '/', headers={'Expect': '100-continue'})
1646        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1647        self.assertIsNone(conn.sock)
1648        conn.request('GET', '/reconnect')
1649        self.assertEqual(conn.connections, 2)
1650
1651
1652class HTTPSTest(TestCase):
1653
1654    def setUp(self):
1655        if not hasattr(client, 'HTTPSConnection'):
1656            self.skipTest('ssl support required')
1657
1658    def make_server(self, certfile):
1659        from test.ssl_servers import make_https_server
1660        return make_https_server(self, certfile=certfile)
1661
1662    def test_attributes(self):
1663        # simple test to check it's storing the timeout
1664        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1665        self.assertEqual(h.timeout, 30)
1666
1667    def test_networked(self):
1668        # Default settings: requires a valid cert from a trusted CA
1669        import ssl
1670        support.requires('network')
1671        with socket_helper.transient_internet('self-signed.pythontest.net'):
1672            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1673            with self.assertRaises(ssl.SSLError) as exc_info:
1674                h.request('GET', '/')
1675            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1676
1677    def test_networked_noverification(self):
1678        # Switch off cert verification
1679        import ssl
1680        support.requires('network')
1681        with socket_helper.transient_internet('self-signed.pythontest.net'):
1682            context = ssl._create_unverified_context()
1683            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1684                                       context=context)
1685            h.request('GET', '/')
1686            resp = h.getresponse()
1687            h.close()
1688            self.assertIn('nginx', resp.getheader('server'))
1689            resp.close()
1690
1691    @support.system_must_validate_cert
1692    def test_networked_trusted_by_default_cert(self):
1693        # Default settings: requires a valid cert from a trusted CA
1694        support.requires('network')
1695        with socket_helper.transient_internet('www.python.org'):
1696            h = client.HTTPSConnection('www.python.org', 443)
1697            h.request('GET', '/')
1698            resp = h.getresponse()
1699            content_type = resp.getheader('content-type')
1700            resp.close()
1701            h.close()
1702            self.assertIn('text/html', content_type)
1703
1704    def test_networked_good_cert(self):
1705        # We feed the server's cert as a validating cert
1706        import ssl
1707        support.requires('network')
1708        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1709        with socket_helper.transient_internet(selfsigned_pythontestdotnet):
1710            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1711            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1712            self.assertEqual(context.check_hostname, True)
1713            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1714            try:
1715                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1716                                           context=context)
1717                h.request('GET', '/')
1718                resp = h.getresponse()
1719            except ssl.SSLError as ssl_err:
1720                ssl_err_str = str(ssl_err)
1721                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1722                # modern Linux distros (Debian Buster, etc) default OpenSSL
1723                # configurations it'll fail saying "key too weak" until we
1724                # address https://bugs.python.org/issue36816 to use a proper
1725                # key size on self-signed.pythontest.net.
1726                if re.search(r'(?i)key.too.weak', ssl_err_str):
1727                    raise unittest.SkipTest(
1728                        f'Got {ssl_err_str} trying to connect '
1729                        f'to {selfsigned_pythontestdotnet}. '
1730                        'See https://bugs.python.org/issue36816.')
1731                raise
1732            server_string = resp.getheader('server')
1733            resp.close()
1734            h.close()
1735            self.assertIn('nginx', server_string)
1736
1737    def test_networked_bad_cert(self):
1738        # We feed a "CA" cert that is unrelated to the server's cert
1739        import ssl
1740        support.requires('network')
1741        with socket_helper.transient_internet('self-signed.pythontest.net'):
1742            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1743            context.load_verify_locations(CERT_localhost)
1744            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1745            with self.assertRaises(ssl.SSLError) as exc_info:
1746                h.request('GET', '/')
1747            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1748
1749    def test_local_unknown_cert(self):
1750        # The custom cert isn't known to the default trust bundle
1751        import ssl
1752        server = self.make_server(CERT_localhost)
1753        h = client.HTTPSConnection('localhost', server.port)
1754        with self.assertRaises(ssl.SSLError) as exc_info:
1755            h.request('GET', '/')
1756        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1757
1758    def test_local_good_hostname(self):
1759        # The (valid) cert validates the HTTP hostname
1760        import ssl
1761        server = self.make_server(CERT_localhost)
1762        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1763        context.load_verify_locations(CERT_localhost)
1764        h = client.HTTPSConnection('localhost', server.port, context=context)
1765        self.addCleanup(h.close)
1766        h.request('GET', '/nonexistent')
1767        resp = h.getresponse()
1768        self.addCleanup(resp.close)
1769        self.assertEqual(resp.status, 404)
1770
1771    def test_local_bad_hostname(self):
1772        # The (valid) cert doesn't validate the HTTP hostname
1773        import ssl
1774        server = self.make_server(CERT_fakehostname)
1775        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1776        context.load_verify_locations(CERT_fakehostname)
1777        h = client.HTTPSConnection('localhost', server.port, context=context)
1778        with self.assertRaises(ssl.CertificateError):
1779            h.request('GET', '/')
1780        # Same with explicit check_hostname=True
1781        with support.check_warnings(('', DeprecationWarning)):
1782            h = client.HTTPSConnection('localhost', server.port,
1783                                       context=context, check_hostname=True)
1784        with self.assertRaises(ssl.CertificateError):
1785            h.request('GET', '/')
1786        # With check_hostname=False, the mismatching is ignored
1787        context.check_hostname = False
1788        with support.check_warnings(('', DeprecationWarning)):
1789            h = client.HTTPSConnection('localhost', server.port,
1790                                       context=context, check_hostname=False)
1791        h.request('GET', '/nonexistent')
1792        resp = h.getresponse()
1793        resp.close()
1794        h.close()
1795        self.assertEqual(resp.status, 404)
1796        # The context's check_hostname setting is used if one isn't passed to
1797        # HTTPSConnection.
1798        context.check_hostname = False
1799        h = client.HTTPSConnection('localhost', server.port, context=context)
1800        h.request('GET', '/nonexistent')
1801        resp = h.getresponse()
1802        self.assertEqual(resp.status, 404)
1803        resp.close()
1804        h.close()
1805        # Passing check_hostname to HTTPSConnection should override the
1806        # context's setting.
1807        with support.check_warnings(('', DeprecationWarning)):
1808            h = client.HTTPSConnection('localhost', server.port,
1809                                       context=context, check_hostname=True)
1810        with self.assertRaises(ssl.CertificateError):
1811            h.request('GET', '/')
1812
1813    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1814                     'http.client.HTTPSConnection not available')
1815    def test_host_port(self):
1816        # Check invalid host_port
1817
1818        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1819            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1820
1821        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1822                          "fe80::207:e9ff:fe9b", 8000),
1823                         ("www.python.org:443", "www.python.org", 443),
1824                         ("www.python.org:", "www.python.org", 443),
1825                         ("www.python.org", "www.python.org", 443),
1826                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1827                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1828                             443)):
1829            c = client.HTTPSConnection(hp)
1830            self.assertEqual(h, c.host)
1831            self.assertEqual(p, c.port)
1832
1833    def test_tls13_pha(self):
1834        import ssl
1835        if not ssl.HAS_TLSv1_3:
1836            self.skipTest('TLS 1.3 support required')
1837        # just check status of PHA flag
1838        h = client.HTTPSConnection('localhost', 443)
1839        self.assertTrue(h._context.post_handshake_auth)
1840
1841        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1842        self.assertFalse(context.post_handshake_auth)
1843        h = client.HTTPSConnection('localhost', 443, context=context)
1844        self.assertIs(h._context, context)
1845        self.assertFalse(h._context.post_handshake_auth)
1846
1847        with warnings.catch_warnings():
1848            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1849                                    DeprecationWarning)
1850            h = client.HTTPSConnection('localhost', 443, context=context,
1851                                       cert_file=CERT_localhost)
1852        self.assertTrue(h._context.post_handshake_auth)
1853
1854
1855class RequestBodyTest(TestCase):
1856    """Test cases where a request includes a message body."""
1857
1858    def setUp(self):
1859        self.conn = client.HTTPConnection('example.com')
1860        self.conn.sock = self.sock = FakeSocket("")
1861        self.conn.sock = self.sock
1862
1863    def get_headers_and_fp(self):
1864        f = io.BytesIO(self.sock.data)
1865        f.readline()  # read the request line
1866        message = client.parse_headers(f)
1867        return message, f
1868
1869    def test_list_body(self):
1870        # Note that no content-length is automatically calculated for
1871        # an iterable.  The request will fall back to send chunked
1872        # transfer encoding.
1873        cases = (
1874            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1875            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1876        )
1877        for body, expected in cases:
1878            with self.subTest(body):
1879                self.conn = client.HTTPConnection('example.com')
1880                self.conn.sock = self.sock = FakeSocket('')
1881
1882                self.conn.request('PUT', '/url', body)
1883                msg, f = self.get_headers_and_fp()
1884                self.assertNotIn('Content-Type', msg)
1885                self.assertNotIn('Content-Length', msg)
1886                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1887                self.assertEqual(expected, f.read())
1888
1889    def test_manual_content_length(self):
1890        # Set an incorrect content-length so that we can verify that
1891        # it will not be over-ridden by the library.
1892        self.conn.request("PUT", "/url", "body",
1893                          {"Content-Length": "42"})
1894        message, f = self.get_headers_and_fp()
1895        self.assertEqual("42", message.get("content-length"))
1896        self.assertEqual(4, len(f.read()))
1897
1898    def test_ascii_body(self):
1899        self.conn.request("PUT", "/url", "body")
1900        message, f = self.get_headers_and_fp()
1901        self.assertEqual("text/plain", message.get_content_type())
1902        self.assertIsNone(message.get_charset())
1903        self.assertEqual("4", message.get("content-length"))
1904        self.assertEqual(b'body', f.read())
1905
1906    def test_latin1_body(self):
1907        self.conn.request("PUT", "/url", "body\xc1")
1908        message, f = self.get_headers_and_fp()
1909        self.assertEqual("text/plain", message.get_content_type())
1910        self.assertIsNone(message.get_charset())
1911        self.assertEqual("5", message.get("content-length"))
1912        self.assertEqual(b'body\xc1', f.read())
1913
1914    def test_bytes_body(self):
1915        self.conn.request("PUT", "/url", b"body\xc1")
1916        message, f = self.get_headers_and_fp()
1917        self.assertEqual("text/plain", message.get_content_type())
1918        self.assertIsNone(message.get_charset())
1919        self.assertEqual("5", message.get("content-length"))
1920        self.assertEqual(b'body\xc1', f.read())
1921
1922    def test_text_file_body(self):
1923        self.addCleanup(support.unlink, support.TESTFN)
1924        with open(support.TESTFN, "w") as f:
1925            f.write("body")
1926        with open(support.TESTFN) as f:
1927            self.conn.request("PUT", "/url", f)
1928            message, f = self.get_headers_and_fp()
1929            self.assertEqual("text/plain", message.get_content_type())
1930            self.assertIsNone(message.get_charset())
1931            # No content-length will be determined for files; the body
1932            # will be sent using chunked transfer encoding instead.
1933            self.assertIsNone(message.get("content-length"))
1934            self.assertEqual("chunked", message.get("transfer-encoding"))
1935            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1936
1937    def test_binary_file_body(self):
1938        self.addCleanup(support.unlink, support.TESTFN)
1939        with open(support.TESTFN, "wb") as f:
1940            f.write(b"body\xc1")
1941        with open(support.TESTFN, "rb") as f:
1942            self.conn.request("PUT", "/url", f)
1943            message, f = self.get_headers_and_fp()
1944            self.assertEqual("text/plain", message.get_content_type())
1945            self.assertIsNone(message.get_charset())
1946            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1947            self.assertNotIn("Content-Length", message)
1948            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1949
1950
1951class HTTPResponseTest(TestCase):
1952
1953    def setUp(self):
1954        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1955                second-value\r\n\r\nText"
1956        sock = FakeSocket(body)
1957        self.resp = client.HTTPResponse(sock)
1958        self.resp.begin()
1959
1960    def test_getting_header(self):
1961        header = self.resp.getheader('My-Header')
1962        self.assertEqual(header, 'first-value, second-value')
1963
1964        header = self.resp.getheader('My-Header', 'some default')
1965        self.assertEqual(header, 'first-value, second-value')
1966
1967    def test_getting_nonexistent_header_with_string_default(self):
1968        header = self.resp.getheader('No-Such-Header', 'default-value')
1969        self.assertEqual(header, 'default-value')
1970
1971    def test_getting_nonexistent_header_with_iterable_default(self):
1972        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1973        self.assertEqual(header, 'default, values')
1974
1975        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1976        self.assertEqual(header, 'default, values')
1977
1978    def test_getting_nonexistent_header_without_default(self):
1979        header = self.resp.getheader('No-Such-Header')
1980        self.assertEqual(header, None)
1981
1982    def test_getting_header_defaultint(self):
1983        header = self.resp.getheader('No-Such-Header',default=42)
1984        self.assertEqual(header, 42)
1985
1986class TunnelTests(TestCase):
1987    def setUp(self):
1988        response_text = (
1989            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1990            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1991            'Content-Length: 42\r\n\r\n'
1992        )
1993        self.host = 'proxy.com'
1994        self.conn = client.HTTPConnection(self.host)
1995        self.conn._create_connection = self._create_connection(response_text)
1996
1997    def tearDown(self):
1998        self.conn.close()
1999
2000    def _create_connection(self, response_text):
2001        def create_connection(address, timeout=None, source_address=None):
2002            return FakeSocket(response_text, host=address[0], port=address[1])
2003        return create_connection
2004
2005    def test_set_tunnel_host_port_headers(self):
2006        tunnel_host = 'destination.com'
2007        tunnel_port = 8888
2008        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
2009        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
2010                             headers=tunnel_headers)
2011        self.conn.request('HEAD', '/', '')
2012        self.assertEqual(self.conn.sock.host, self.host)
2013        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2014        self.assertEqual(self.conn._tunnel_host, tunnel_host)
2015        self.assertEqual(self.conn._tunnel_port, tunnel_port)
2016        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2017
2018    def test_disallow_set_tunnel_after_connect(self):
2019        # Once connected, we shouldn't be able to tunnel anymore
2020        self.conn.connect()
2021        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2022                          'destination.com')
2023
2024    def test_connect_with_tunnel(self):
2025        self.conn.set_tunnel('destination.com')
2026        self.conn.request('HEAD', '/', '')
2027        self.assertEqual(self.conn.sock.host, self.host)
2028        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2029        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2030        # issue22095
2031        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2032        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2033
2034        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2035        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2036
2037    def test_connect_put_request(self):
2038        self.conn.set_tunnel('destination.com')
2039        self.conn.request('PUT', '/', '')
2040        self.assertEqual(self.conn.sock.host, self.host)
2041        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2042        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2043        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2044
2045    def test_tunnel_debuglog(self):
2046        expected_header = 'X-Dummy: 1'
2047        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2048
2049        self.conn.set_debuglevel(1)
2050        self.conn._create_connection = self._create_connection(response_text)
2051        self.conn.set_tunnel('destination.com')
2052
2053        with support.captured_stdout() as output:
2054            self.conn.request('PUT', '/', '')
2055        lines = output.getvalue().splitlines()
2056        self.assertIn('header: {}'.format(expected_header), lines)
2057
2058
2059if __name__ == '__main__':
2060    unittest.main(verbosity=2)
2061