• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13TestCase = unittest.TestCase
14
15from test import support
16
17here = os.path.dirname(__file__)
18# Self-signed cert file for 'localhost'
19CERT_localhost = os.path.join(here, 'keycert.pem')
20# Self-signed cert file for 'fakehostname'
21CERT_fakehostname = os.path.join(here, 'keycert2.pem')
22# Self-signed cert file for self-signed.pythontest.net
23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
24
25# constants for testing chunked encoding
26chunked_start = (
27    'HTTP/1.1 200 OK\r\n'
28    'Transfer-Encoding: chunked\r\n\r\n'
29    'a\r\n'
30    'hello worl\r\n'
31    '3\r\n'
32    'd! \r\n'
33    '8\r\n'
34    'and now \r\n'
35    '22\r\n'
36    'for something completely different\r\n'
37)
38chunked_expected = b'hello world! and now for something completely different'
39chunk_extension = ";foo=bar"
40last_chunk = "0\r\n"
41last_chunk_extended = "0" + chunk_extension + "\r\n"
42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
43chunked_end = "\r\n"
44
45HOST = support.HOST
46
47class FakeSocket:
48    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
49        if isinstance(text, str):
50            text = text.encode("ascii")
51        self.text = text
52        self.fileclass = fileclass
53        self.data = b''
54        self.sendall_calls = 0
55        self.file_closed = False
56        self.host = host
57        self.port = port
58
59    def sendall(self, data):
60        self.sendall_calls += 1
61        self.data += data
62
63    def makefile(self, mode, bufsize=None):
64        if mode != 'r' and mode != 'rb':
65            raise client.UnimplementedFileMode()
66        # keep the file around so we can check how much was read from it
67        self.file = self.fileclass(self.text)
68        self.file.close = self.file_close #nerf close ()
69        return self.file
70
71    def file_close(self):
72        self.file_closed = True
73
74    def close(self):
75        pass
76
77    def setsockopt(self, level, optname, value):
78        pass
79
80class EPipeSocket(FakeSocket):
81
82    def __init__(self, text, pipe_trigger):
83        # When sendall() is called with pipe_trigger, raise EPIPE.
84        FakeSocket.__init__(self, text)
85        self.pipe_trigger = pipe_trigger
86
87    def sendall(self, data):
88        if self.pipe_trigger in data:
89            raise OSError(errno.EPIPE, "gotcha")
90        self.data += data
91
92    def close(self):
93        pass
94
95class NoEOFBytesIO(io.BytesIO):
96    """Like BytesIO, but raises AssertionError on EOF.
97
98    This is used below to test that http.client doesn't try to read
99    more from the underlying file than it should.
100    """
101    def read(self, n=-1):
102        data = io.BytesIO.read(self, n)
103        if data == b'':
104            raise AssertionError('caller tried to read past EOF')
105        return data
106
107    def readline(self, length=None):
108        data = io.BytesIO.readline(self, length)
109        if data == b'':
110            raise AssertionError('caller tried to read past EOF')
111        return data
112
113class FakeSocketHTTPConnection(client.HTTPConnection):
114    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
115
116    def __init__(self, *args):
117        self.connections = 0
118        super().__init__('example.com')
119        self.fake_socket_args = args
120        self._create_connection = self.create_connection
121
122    def connect(self):
123        """Count the number of times connect() is invoked"""
124        self.connections += 1
125        return super().connect()
126
127    def create_connection(self, *pos, **kw):
128        return FakeSocket(*self.fake_socket_args)
129
130class HeaderTests(TestCase):
131    def test_auto_headers(self):
132        # Some headers are added automatically, but should not be added by
133        # .request() if they are explicitly set.
134
135        class HeaderCountingBuffer(list):
136            def __init__(self):
137                self.count = {}
138            def append(self, item):
139                kv = item.split(b':')
140                if len(kv) > 1:
141                    # item is a 'Key: Value' header string
142                    lcKey = kv[0].decode('ascii').lower()
143                    self.count.setdefault(lcKey, 0)
144                    self.count[lcKey] += 1
145                list.append(self, item)
146
147        for explicit_header in True, False:
148            for header in 'Content-length', 'Host', 'Accept-encoding':
149                conn = client.HTTPConnection('example.com')
150                conn.sock = FakeSocket('blahblahblah')
151                conn._buffer = HeaderCountingBuffer()
152
153                body = 'spamspamspam'
154                headers = {}
155                if explicit_header:
156                    headers[header] = str(len(body))
157                conn.request('POST', '/', body, headers)
158                self.assertEqual(conn._buffer.count[header.lower()], 1)
159
160    def test_content_length_0(self):
161
162        class ContentLengthChecker(list):
163            def __init__(self):
164                list.__init__(self)
165                self.content_length = None
166            def append(self, item):
167                kv = item.split(b':', 1)
168                if len(kv) > 1 and kv[0].lower() == b'content-length':
169                    self.content_length = kv[1].strip()
170                list.append(self, item)
171
172        # Here, we're testing that methods expecting a body get a
173        # content-length set to zero if the body is empty (either None or '')
174        bodies = (None, '')
175        methods_with_body = ('PUT', 'POST', 'PATCH')
176        for method, body in itertools.product(methods_with_body, bodies):
177            conn = client.HTTPConnection('example.com')
178            conn.sock = FakeSocket(None)
179            conn._buffer = ContentLengthChecker()
180            conn.request(method, '/', body)
181            self.assertEqual(
182                conn._buffer.content_length, b'0',
183                'Header Content-Length incorrect on {}'.format(method)
184            )
185
186        # For these methods, we make sure that content-length is not set when
187        # the body is None because it might cause unexpected behaviour on the
188        # server.
189        methods_without_body = (
190             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
191        )
192        for method in methods_without_body:
193            conn = client.HTTPConnection('example.com')
194            conn.sock = FakeSocket(None)
195            conn._buffer = ContentLengthChecker()
196            conn.request(method, '/', None)
197            self.assertEqual(
198                conn._buffer.content_length, None,
199                'Header Content-Length set for empty body on {}'.format(method)
200            )
201
202        # If the body is set to '', that's considered to be "present but
203        # empty" rather than "missing", so content length would be set, even
204        # for methods that don't expect a body.
205        for method in methods_without_body:
206            conn = client.HTTPConnection('example.com')
207            conn.sock = FakeSocket(None)
208            conn._buffer = ContentLengthChecker()
209            conn.request(method, '/', '')
210            self.assertEqual(
211                conn._buffer.content_length, b'0',
212                'Header Content-Length incorrect on {}'.format(method)
213            )
214
215        # If the body is set, make sure Content-Length is set.
216        for method in itertools.chain(methods_without_body, methods_with_body):
217            conn = client.HTTPConnection('example.com')
218            conn.sock = FakeSocket(None)
219            conn._buffer = ContentLengthChecker()
220            conn.request(method, '/', ' ')
221            self.assertEqual(
222                conn._buffer.content_length, b'1',
223                'Header Content-Length incorrect on {}'.format(method)
224            )
225
226    def test_putheader(self):
227        conn = client.HTTPConnection('example.com')
228        conn.sock = FakeSocket(None)
229        conn.putrequest('GET','/')
230        conn.putheader('Content-length', 42)
231        self.assertIn(b'Content-length: 42', conn._buffer)
232
233        conn.putheader('Foo', ' bar ')
234        self.assertIn(b'Foo:  bar ', conn._buffer)
235        conn.putheader('Bar', '\tbaz\t')
236        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
237        conn.putheader('Authorization', 'Bearer mytoken')
238        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
239        conn.putheader('IterHeader', 'IterA', 'IterB')
240        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
241        conn.putheader('LatinHeader', b'\xFF')
242        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
243        conn.putheader('Utf8Header', b'\xc3\x80')
244        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
245        conn.putheader('C1-Control', b'next\x85line')
246        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
247        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
248        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
249        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
250        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
251        conn.putheader('Key Space', 'value')
252        self.assertIn(b'Key Space: value', conn._buffer)
253        conn.putheader('KeySpace ', 'value')
254        self.assertIn(b'KeySpace : value', conn._buffer)
255        conn.putheader(b'Nonbreak\xa0Space', 'value')
256        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
257        conn.putheader(b'\xa0NonbreakSpace', 'value')
258        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
259
260    def test_ipv6host_header(self):
261        # Default host header on IPv6 transaction should be wrapped by [] if
262        # it is an IPv6 address
263        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
264                   b'Accept-Encoding: identity\r\n\r\n'
265        conn = client.HTTPConnection('[2001::]:81')
266        sock = FakeSocket('')
267        conn.sock = sock
268        conn.request('GET', '/foo')
269        self.assertTrue(sock.data.startswith(expected))
270
271        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
272                   b'Accept-Encoding: identity\r\n\r\n'
273        conn = client.HTTPConnection('[2001:102A::]')
274        sock = FakeSocket('')
275        conn.sock = sock
276        conn.request('GET', '/foo')
277        self.assertTrue(sock.data.startswith(expected))
278
279    def test_malformed_headers_coped_with(self):
280        # Issue 19996
281        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
282        sock = FakeSocket(body)
283        resp = client.HTTPResponse(sock)
284        resp.begin()
285
286        self.assertEqual(resp.getheader('First'), 'val')
287        self.assertEqual(resp.getheader('Second'), 'val')
288
289    def test_parse_all_octets(self):
290        # Ensure no valid header field octet breaks the parser
291        body = (
292            b'HTTP/1.1 200 OK\r\n'
293            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
294            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
295            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
296            b'obs-fold: text\r\n'
297            b' folded with space\r\n'
298            b'\tfolded with tab\r\n'
299            b'Content-Length: 0\r\n'
300            b'\r\n'
301        )
302        sock = FakeSocket(body)
303        resp = client.HTTPResponse(sock)
304        resp.begin()
305        self.assertEqual(resp.getheader('Content-Length'), '0')
306        self.assertEqual(resp.msg['Content-Length'], '0')
307        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
308        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
309        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
310        self.assertEqual(resp.getheader('VCHAR'), vchar)
311        self.assertEqual(resp.msg['VCHAR'], vchar)
312        self.assertIsNotNone(resp.getheader('obs-text'))
313        self.assertIn('obs-text', resp.msg)
314        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
315            self.assertTrue(folded.startswith('text'))
316            self.assertIn(' folded with space', folded)
317            self.assertTrue(folded.endswith('folded with tab'))
318
319    def test_invalid_headers(self):
320        conn = client.HTTPConnection('example.com')
321        conn.sock = FakeSocket('')
322        conn.putrequest('GET', '/')
323
324        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
325        # longer allowed in header names
326        cases = (
327            (b'Invalid\r\nName', b'ValidValue'),
328            (b'Invalid\rName', b'ValidValue'),
329            (b'Invalid\nName', b'ValidValue'),
330            (b'\r\nInvalidName', b'ValidValue'),
331            (b'\rInvalidName', b'ValidValue'),
332            (b'\nInvalidName', b'ValidValue'),
333            (b' InvalidName', b'ValidValue'),
334            (b'\tInvalidName', b'ValidValue'),
335            (b'Invalid:Name', b'ValidValue'),
336            (b':InvalidName', b'ValidValue'),
337            (b'ValidName', b'Invalid\r\nValue'),
338            (b'ValidName', b'Invalid\rValue'),
339            (b'ValidName', b'Invalid\nValue'),
340            (b'ValidName', b'InvalidValue\r\n'),
341            (b'ValidName', b'InvalidValue\r'),
342            (b'ValidName', b'InvalidValue\n'),
343        )
344        for name, value in cases:
345            with self.subTest((name, value)):
346                with self.assertRaisesRegex(ValueError, 'Invalid header'):
347                    conn.putheader(name, value)
348
349    def test_headers_debuglevel(self):
350        body = (
351            b'HTTP/1.1 200 OK\r\n'
352            b'First: val\r\n'
353            b'Second: val1\r\n'
354            b'Second: val2\r\n'
355        )
356        sock = FakeSocket(body)
357        resp = client.HTTPResponse(sock, debuglevel=1)
358        with support.captured_stdout() as output:
359            resp.begin()
360        lines = output.getvalue().splitlines()
361        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
362        self.assertEqual(lines[1], "header: First: val")
363        self.assertEqual(lines[2], "header: Second: val1")
364        self.assertEqual(lines[3], "header: Second: val2")
365
366
367class HttpMethodTests(TestCase):
368    def test_invalid_method_names(self):
369        methods = (
370            'GET\r',
371            'POST\n',
372            'PUT\n\r',
373            'POST\nValue',
374            'POST\nHOST:abc',
375            'GET\nrHost:abc\n',
376            'POST\rRemainder:\r',
377            'GET\rHOST:\n',
378            '\nPUT'
379        )
380
381        for method in methods:
382            with self.assertRaisesRegex(
383                    ValueError, "method can't contain control characters"):
384                conn = client.HTTPConnection('example.com')
385                conn.sock = FakeSocket(None)
386                conn.request(method=method, url="/")
387
388
389class TransferEncodingTest(TestCase):
390    expected_body = b"It's just a flesh wound"
391
392    def test_endheaders_chunked(self):
393        conn = client.HTTPConnection('example.com')
394        conn.sock = FakeSocket(b'')
395        conn.putrequest('POST', '/')
396        conn.endheaders(self._make_body(), encode_chunked=True)
397
398        _, _, body = self._parse_request(conn.sock.data)
399        body = self._parse_chunked(body)
400        self.assertEqual(body, self.expected_body)
401
402    def test_explicit_headers(self):
403        # explicit chunked
404        conn = client.HTTPConnection('example.com')
405        conn.sock = FakeSocket(b'')
406        # this shouldn't actually be automatically chunk-encoded because the
407        # calling code has explicitly stated that it's taking care of it
408        conn.request(
409            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
410
411        _, headers, body = self._parse_request(conn.sock.data)
412        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
413        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
414        self.assertEqual(body, self.expected_body)
415
416        # explicit chunked, string body
417        conn = client.HTTPConnection('example.com')
418        conn.sock = FakeSocket(b'')
419        conn.request(
420            'POST', '/', self.expected_body.decode('latin-1'),
421            {'Transfer-Encoding': 'chunked'})
422
423        _, headers, body = self._parse_request(conn.sock.data)
424        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
425        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
426        self.assertEqual(body, self.expected_body)
427
428        # User-specified TE, but request() does the chunk encoding
429        conn = client.HTTPConnection('example.com')
430        conn.sock = FakeSocket(b'')
431        conn.request('POST', '/',
432            headers={'Transfer-Encoding': 'gzip, chunked'},
433            encode_chunked=True,
434            body=self._make_body())
435        _, headers, body = self._parse_request(conn.sock.data)
436        self.assertNotIn('content-length', [k.lower() for k in headers])
437        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
438        self.assertEqual(self._parse_chunked(body), self.expected_body)
439
440    def test_request(self):
441        for empty_lines in (False, True,):
442            conn = client.HTTPConnection('example.com')
443            conn.sock = FakeSocket(b'')
444            conn.request(
445                'POST', '/', self._make_body(empty_lines=empty_lines))
446
447            _, headers, body = self._parse_request(conn.sock.data)
448            body = self._parse_chunked(body)
449            self.assertEqual(body, self.expected_body)
450            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
451
452            # Content-Length and Transfer-Encoding SHOULD not be sent in the
453            # same request
454            self.assertNotIn('content-length', [k.lower() for k in headers])
455
456    def test_empty_body(self):
457        # Zero-length iterable should be treated like any other iterable
458        conn = client.HTTPConnection('example.com')
459        conn.sock = FakeSocket(b'')
460        conn.request('POST', '/', ())
461        _, headers, body = self._parse_request(conn.sock.data)
462        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
463        self.assertNotIn('content-length', [k.lower() for k in headers])
464        self.assertEqual(body, b"0\r\n\r\n")
465
466    def _make_body(self, empty_lines=False):
467        lines = self.expected_body.split(b' ')
468        for idx, line in enumerate(lines):
469            # for testing handling empty lines
470            if empty_lines and idx % 2:
471                yield b''
472            if idx < len(lines) - 1:
473                yield line + b' '
474            else:
475                yield line
476
477    def _parse_request(self, data):
478        lines = data.split(b'\r\n')
479        request = lines[0]
480        headers = {}
481        n = 1
482        while n < len(lines) and len(lines[n]) > 0:
483            key, val = lines[n].split(b':')
484            key = key.decode('latin-1').strip()
485            headers[key] = val.decode('latin-1').strip()
486            n += 1
487
488        return request, headers, b'\r\n'.join(lines[n + 1:])
489
490    def _parse_chunked(self, data):
491        body = []
492        trailers = {}
493        n = 0
494        lines = data.split(b'\r\n')
495        # parse body
496        while True:
497            size, chunk = lines[n:n+2]
498            size = int(size, 16)
499
500            if size == 0:
501                n += 1
502                break
503
504            self.assertEqual(size, len(chunk))
505            body.append(chunk)
506
507            n += 2
508            # we /should/ hit the end chunk, but check against the size of
509            # lines so we're not stuck in an infinite loop should we get
510            # malformed data
511            if n > len(lines):
512                break
513
514        return b''.join(body)
515
516
517class BasicTest(TestCase):
518    def test_status_lines(self):
519        # Test HTTP status lines
520
521        body = "HTTP/1.1 200 Ok\r\n\r\nText"
522        sock = FakeSocket(body)
523        resp = client.HTTPResponse(sock)
524        resp.begin()
525        self.assertEqual(resp.read(0), b'')  # Issue #20007
526        self.assertFalse(resp.isclosed())
527        self.assertFalse(resp.closed)
528        self.assertEqual(resp.read(), b"Text")
529        self.assertTrue(resp.isclosed())
530        self.assertFalse(resp.closed)
531        resp.close()
532        self.assertTrue(resp.closed)
533
534        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
535        sock = FakeSocket(body)
536        resp = client.HTTPResponse(sock)
537        self.assertRaises(client.BadStatusLine, resp.begin)
538
539    def test_bad_status_repr(self):
540        exc = client.BadStatusLine('')
541        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
542
543    def test_partial_reads(self):
544        # if we have Content-Length, HTTPResponse knows when to close itself,
545        # the same behaviour as when we read the whole thing with read()
546        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
547        sock = FakeSocket(body)
548        resp = client.HTTPResponse(sock)
549        resp.begin()
550        self.assertEqual(resp.read(2), b'Te')
551        self.assertFalse(resp.isclosed())
552        self.assertEqual(resp.read(2), b'xt')
553        self.assertTrue(resp.isclosed())
554        self.assertFalse(resp.closed)
555        resp.close()
556        self.assertTrue(resp.closed)
557
558    def test_mixed_reads(self):
559        # readline() should update the remaining length, so that read() knows
560        # how much data is left and does not raise IncompleteRead
561        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
562        sock = FakeSocket(body)
563        resp = client.HTTPResponse(sock)
564        resp.begin()
565        self.assertEqual(resp.readline(), b'Text\r\n')
566        self.assertFalse(resp.isclosed())
567        self.assertEqual(resp.read(), b'Another')
568        self.assertTrue(resp.isclosed())
569        self.assertFalse(resp.closed)
570        resp.close()
571        self.assertTrue(resp.closed)
572
573    def test_partial_readintos(self):
574        # if we have Content-Length, HTTPResponse knows when to close itself,
575        # the same behaviour as when we read the whole thing with read()
576        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
577        sock = FakeSocket(body)
578        resp = client.HTTPResponse(sock)
579        resp.begin()
580        b = bytearray(2)
581        n = resp.readinto(b)
582        self.assertEqual(n, 2)
583        self.assertEqual(bytes(b), b'Te')
584        self.assertFalse(resp.isclosed())
585        n = resp.readinto(b)
586        self.assertEqual(n, 2)
587        self.assertEqual(bytes(b), b'xt')
588        self.assertTrue(resp.isclosed())
589        self.assertFalse(resp.closed)
590        resp.close()
591        self.assertTrue(resp.closed)
592
593    def test_partial_reads_no_content_length(self):
594        # when no length is present, the socket should be gracefully closed when
595        # all data was read
596        body = "HTTP/1.1 200 Ok\r\n\r\nText"
597        sock = FakeSocket(body)
598        resp = client.HTTPResponse(sock)
599        resp.begin()
600        self.assertEqual(resp.read(2), b'Te')
601        self.assertFalse(resp.isclosed())
602        self.assertEqual(resp.read(2), b'xt')
603        self.assertEqual(resp.read(1), b'')
604        self.assertTrue(resp.isclosed())
605        self.assertFalse(resp.closed)
606        resp.close()
607        self.assertTrue(resp.closed)
608
609    def test_partial_readintos_no_content_length(self):
610        # when no length is present, the socket should be gracefully closed when
611        # all data was read
612        body = "HTTP/1.1 200 Ok\r\n\r\nText"
613        sock = FakeSocket(body)
614        resp = client.HTTPResponse(sock)
615        resp.begin()
616        b = bytearray(2)
617        n = resp.readinto(b)
618        self.assertEqual(n, 2)
619        self.assertEqual(bytes(b), b'Te')
620        self.assertFalse(resp.isclosed())
621        n = resp.readinto(b)
622        self.assertEqual(n, 2)
623        self.assertEqual(bytes(b), b'xt')
624        n = resp.readinto(b)
625        self.assertEqual(n, 0)
626        self.assertTrue(resp.isclosed())
627
628    def test_partial_reads_incomplete_body(self):
629        # if the server shuts down the connection before the whole
630        # content-length is delivered, the socket is gracefully closed
631        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
632        sock = FakeSocket(body)
633        resp = client.HTTPResponse(sock)
634        resp.begin()
635        self.assertEqual(resp.read(2), b'Te')
636        self.assertFalse(resp.isclosed())
637        self.assertEqual(resp.read(2), b'xt')
638        self.assertEqual(resp.read(1), b'')
639        self.assertTrue(resp.isclosed())
640
641    def test_partial_readintos_incomplete_body(self):
642        # if the server shuts down the connection before the whole
643        # content-length is delivered, the socket is gracefully closed
644        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
645        sock = FakeSocket(body)
646        resp = client.HTTPResponse(sock)
647        resp.begin()
648        b = bytearray(2)
649        n = resp.readinto(b)
650        self.assertEqual(n, 2)
651        self.assertEqual(bytes(b), b'Te')
652        self.assertFalse(resp.isclosed())
653        n = resp.readinto(b)
654        self.assertEqual(n, 2)
655        self.assertEqual(bytes(b), b'xt')
656        n = resp.readinto(b)
657        self.assertEqual(n, 0)
658        self.assertTrue(resp.isclosed())
659        self.assertFalse(resp.closed)
660        resp.close()
661        self.assertTrue(resp.closed)
662
663    def test_host_port(self):
664        # Check invalid host_port
665
666        for hp in ("www.python.org:abc", "user:password@www.python.org"):
667            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
668
669        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
670                          "fe80::207:e9ff:fe9b", 8000),
671                         ("www.python.org:80", "www.python.org", 80),
672                         ("www.python.org:", "www.python.org", 80),
673                         ("www.python.org", "www.python.org", 80),
674                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
675                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
676            c = client.HTTPConnection(hp)
677            self.assertEqual(h, c.host)
678            self.assertEqual(p, c.port)
679
680    def test_response_headers(self):
681        # test response with multiple message headers with the same field name.
682        text = ('HTTP/1.1 200 OK\r\n'
683                'Set-Cookie: Customer="WILE_E_COYOTE"; '
684                'Version="1"; Path="/acme"\r\n'
685                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
686                ' Path="/acme"\r\n'
687                '\r\n'
688                'No body\r\n')
689        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
690               ', '
691               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
692        s = FakeSocket(text)
693        r = client.HTTPResponse(s)
694        r.begin()
695        cookies = r.getheader("Set-Cookie")
696        self.assertEqual(cookies, hdr)
697
698    def test_read_head(self):
699        # Test that the library doesn't attempt to read any data
700        # from a HEAD request.  (Tickles SF bug #622042.)
701        sock = FakeSocket(
702            'HTTP/1.1 200 OK\r\n'
703            'Content-Length: 14432\r\n'
704            '\r\n',
705            NoEOFBytesIO)
706        resp = client.HTTPResponse(sock, method="HEAD")
707        resp.begin()
708        if resp.read():
709            self.fail("Did not expect response from HEAD request")
710
711    def test_readinto_head(self):
712        # Test that the library doesn't attempt to read any data
713        # from a HEAD request.  (Tickles SF bug #622042.)
714        sock = FakeSocket(
715            'HTTP/1.1 200 OK\r\n'
716            'Content-Length: 14432\r\n'
717            '\r\n',
718            NoEOFBytesIO)
719        resp = client.HTTPResponse(sock, method="HEAD")
720        resp.begin()
721        b = bytearray(5)
722        if resp.readinto(b) != 0:
723            self.fail("Did not expect response from HEAD request")
724        self.assertEqual(bytes(b), b'\x00'*5)
725
726    def test_too_many_headers(self):
727        headers = '\r\n'.join('Header%d: foo' % i
728                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
729        text = ('HTTP/1.1 200 OK\r\n' + headers)
730        s = FakeSocket(text)
731        r = client.HTTPResponse(s)
732        self.assertRaisesRegex(client.HTTPException,
733                               r"got more than \d+ headers", r.begin)
734
735    def test_send_file(self):
736        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
737                    b'Accept-Encoding: identity\r\n'
738                    b'Transfer-Encoding: chunked\r\n'
739                    b'\r\n')
740
741        with open(__file__, 'rb') as body:
742            conn = client.HTTPConnection('example.com')
743            sock = FakeSocket(body)
744            conn.sock = sock
745            conn.request('GET', '/foo', body)
746            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
747                    (sock.data[:len(expected)], expected))
748
749    def test_send(self):
750        expected = b'this is a test this is only a test'
751        conn = client.HTTPConnection('example.com')
752        sock = FakeSocket(None)
753        conn.sock = sock
754        conn.send(expected)
755        self.assertEqual(expected, sock.data)
756        sock.data = b''
757        conn.send(array.array('b', expected))
758        self.assertEqual(expected, sock.data)
759        sock.data = b''
760        conn.send(io.BytesIO(expected))
761        self.assertEqual(expected, sock.data)
762
763    def test_send_updating_file(self):
764        def data():
765            yield 'data'
766            yield None
767            yield 'data_two'
768
769        class UpdatingFile(io.TextIOBase):
770            mode = 'r'
771            d = data()
772            def read(self, blocksize=-1):
773                return next(self.d)
774
775        expected = b'data'
776
777        conn = client.HTTPConnection('example.com')
778        sock = FakeSocket("")
779        conn.sock = sock
780        conn.send(UpdatingFile())
781        self.assertEqual(sock.data, expected)
782
783
784    def test_send_iter(self):
785        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
786                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
787                   b'\r\nonetwothree'
788
789        def body():
790            yield b"one"
791            yield b"two"
792            yield b"three"
793
794        conn = client.HTTPConnection('example.com')
795        sock = FakeSocket("")
796        conn.sock = sock
797        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
798        self.assertEqual(sock.data, expected)
799
800    def test_blocksize_request(self):
801        """Check that request() respects the configured block size."""
802        blocksize = 8  # For easy debugging.
803        conn = client.HTTPConnection('example.com', blocksize=blocksize)
804        sock = FakeSocket(None)
805        conn.sock = sock
806        expected = b"a" * blocksize + b"b"
807        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
808        self.assertEqual(sock.sendall_calls, 3)
809        body = sock.data.split(b"\r\n\r\n", 1)[1]
810        self.assertEqual(body, expected)
811
812    def test_blocksize_send(self):
813        """Check that send() respects the configured block size."""
814        blocksize = 8  # For easy debugging.
815        conn = client.HTTPConnection('example.com', blocksize=blocksize)
816        sock = FakeSocket(None)
817        conn.sock = sock
818        expected = b"a" * blocksize + b"b"
819        conn.send(io.BytesIO(expected))
820        self.assertEqual(sock.sendall_calls, 2)
821        self.assertEqual(sock.data, expected)
822
823    def test_send_type_error(self):
824        # See: Issue #12676
825        conn = client.HTTPConnection('example.com')
826        conn.sock = FakeSocket('')
827        with self.assertRaises(TypeError):
828            conn.request('POST', 'test', conn)
829
830    def test_chunked(self):
831        expected = chunked_expected
832        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
833        resp = client.HTTPResponse(sock, method="GET")
834        resp.begin()
835        self.assertEqual(resp.read(), expected)
836        resp.close()
837
838        # Various read sizes
839        for n in range(1, 12):
840            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
841            resp = client.HTTPResponse(sock, method="GET")
842            resp.begin()
843            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
844            resp.close()
845
846        for x in ('', 'foo\r\n'):
847            sock = FakeSocket(chunked_start + x)
848            resp = client.HTTPResponse(sock, method="GET")
849            resp.begin()
850            try:
851                resp.read()
852            except client.IncompleteRead as i:
853                self.assertEqual(i.partial, expected)
854                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
855                self.assertEqual(repr(i), expected_message)
856                self.assertEqual(str(i), expected_message)
857            else:
858                self.fail('IncompleteRead expected')
859            finally:
860                resp.close()
861
862    def test_readinto_chunked(self):
863
864        expected = chunked_expected
865        nexpected = len(expected)
866        b = bytearray(128)
867
868        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
869        resp = client.HTTPResponse(sock, method="GET")
870        resp.begin()
871        n = resp.readinto(b)
872        self.assertEqual(b[:nexpected], expected)
873        self.assertEqual(n, nexpected)
874        resp.close()
875
876        # Various read sizes
877        for n in range(1, 12):
878            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
879            resp = client.HTTPResponse(sock, method="GET")
880            resp.begin()
881            m = memoryview(b)
882            i = resp.readinto(m[0:n])
883            i += resp.readinto(m[i:n + i])
884            i += resp.readinto(m[i:])
885            self.assertEqual(b[:nexpected], expected)
886            self.assertEqual(i, nexpected)
887            resp.close()
888
889        for x in ('', 'foo\r\n'):
890            sock = FakeSocket(chunked_start + x)
891            resp = client.HTTPResponse(sock, method="GET")
892            resp.begin()
893            try:
894                n = resp.readinto(b)
895            except client.IncompleteRead as i:
896                self.assertEqual(i.partial, expected)
897                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
898                self.assertEqual(repr(i), expected_message)
899                self.assertEqual(str(i), expected_message)
900            else:
901                self.fail('IncompleteRead expected')
902            finally:
903                resp.close()
904
905    def test_chunked_head(self):
906        chunked_start = (
907            'HTTP/1.1 200 OK\r\n'
908            'Transfer-Encoding: chunked\r\n\r\n'
909            'a\r\n'
910            'hello world\r\n'
911            '1\r\n'
912            'd\r\n'
913        )
914        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
915        resp = client.HTTPResponse(sock, method="HEAD")
916        resp.begin()
917        self.assertEqual(resp.read(), b'')
918        self.assertEqual(resp.status, 200)
919        self.assertEqual(resp.reason, 'OK')
920        self.assertTrue(resp.isclosed())
921        self.assertFalse(resp.closed)
922        resp.close()
923        self.assertTrue(resp.closed)
924
925    def test_readinto_chunked_head(self):
926        chunked_start = (
927            'HTTP/1.1 200 OK\r\n'
928            'Transfer-Encoding: chunked\r\n\r\n'
929            'a\r\n'
930            'hello world\r\n'
931            '1\r\n'
932            'd\r\n'
933        )
934        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
935        resp = client.HTTPResponse(sock, method="HEAD")
936        resp.begin()
937        b = bytearray(5)
938        n = resp.readinto(b)
939        self.assertEqual(n, 0)
940        self.assertEqual(bytes(b), b'\x00'*5)
941        self.assertEqual(resp.status, 200)
942        self.assertEqual(resp.reason, 'OK')
943        self.assertTrue(resp.isclosed())
944        self.assertFalse(resp.closed)
945        resp.close()
946        self.assertTrue(resp.closed)
947
948    def test_negative_content_length(self):
949        sock = FakeSocket(
950            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
951        resp = client.HTTPResponse(sock, method="GET")
952        resp.begin()
953        self.assertEqual(resp.read(), b'Hello\r\n')
954        self.assertTrue(resp.isclosed())
955
956    def test_incomplete_read(self):
957        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
958        resp = client.HTTPResponse(sock, method="GET")
959        resp.begin()
960        try:
961            resp.read()
962        except client.IncompleteRead as i:
963            self.assertEqual(i.partial, b'Hello\r\n')
964            self.assertEqual(repr(i),
965                             "IncompleteRead(7 bytes read, 3 more expected)")
966            self.assertEqual(str(i),
967                             "IncompleteRead(7 bytes read, 3 more expected)")
968            self.assertTrue(resp.isclosed())
969        else:
970            self.fail('IncompleteRead expected')
971
972    def test_epipe(self):
973        sock = EPipeSocket(
974            "HTTP/1.0 401 Authorization Required\r\n"
975            "Content-type: text/html\r\n"
976            "WWW-Authenticate: Basic realm=\"example\"\r\n",
977            b"Content-Length")
978        conn = client.HTTPConnection("example.com")
979        conn.sock = sock
980        self.assertRaises(OSError,
981                          lambda: conn.request("PUT", "/url", "body"))
982        resp = conn.getresponse()
983        self.assertEqual(401, resp.status)
984        self.assertEqual("Basic realm=\"example\"",
985                         resp.getheader("www-authenticate"))
986
987    # Test lines overflowing the max line size (_MAXLINE in http.client)
988
989    def test_overflowing_status_line(self):
990        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
991        resp = client.HTTPResponse(FakeSocket(body))
992        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
993
994    def test_overflowing_header_line(self):
995        body = (
996            'HTTP/1.1 200 OK\r\n'
997            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
998        )
999        resp = client.HTTPResponse(FakeSocket(body))
1000        self.assertRaises(client.LineTooLong, resp.begin)
1001
1002    def test_overflowing_header_limit_after_100(self):
1003        body = (
1004            'HTTP/1.1 100 OK\r\n'
1005            'r\n' * 32768
1006        )
1007        resp = client.HTTPResponse(FakeSocket(body))
1008        with self.assertRaises(client.HTTPException) as cm:
1009            resp.begin()
1010        # We must assert more because other reasonable errors that we
1011        # do not want can also be HTTPException derived.
1012        self.assertIn('got more than ', str(cm.exception))
1013        self.assertIn('headers', str(cm.exception))
1014
1015    def test_overflowing_chunked_line(self):
1016        body = (
1017            'HTTP/1.1 200 OK\r\n'
1018            'Transfer-Encoding: chunked\r\n\r\n'
1019            + '0' * 65536 + 'a\r\n'
1020            'hello world\r\n'
1021            '0\r\n'
1022            '\r\n'
1023        )
1024        resp = client.HTTPResponse(FakeSocket(body))
1025        resp.begin()
1026        self.assertRaises(client.LineTooLong, resp.read)
1027
1028    def test_early_eof(self):
1029        # Test httpresponse with no \r\n termination,
1030        body = "HTTP/1.1 200 Ok"
1031        sock = FakeSocket(body)
1032        resp = client.HTTPResponse(sock)
1033        resp.begin()
1034        self.assertEqual(resp.read(), b'')
1035        self.assertTrue(resp.isclosed())
1036        self.assertFalse(resp.closed)
1037        resp.close()
1038        self.assertTrue(resp.closed)
1039
1040    def test_error_leak(self):
1041        # Test that the socket is not leaked if getresponse() fails
1042        conn = client.HTTPConnection('example.com')
1043        response = None
1044        class Response(client.HTTPResponse):
1045            def __init__(self, *pos, **kw):
1046                nonlocal response
1047                response = self  # Avoid garbage collector closing the socket
1048                client.HTTPResponse.__init__(self, *pos, **kw)
1049        conn.response_class = Response
1050        conn.sock = FakeSocket('Invalid status line')
1051        conn.request('GET', '/')
1052        self.assertRaises(client.BadStatusLine, conn.getresponse)
1053        self.assertTrue(response.closed)
1054        self.assertTrue(conn.sock.file_closed)
1055
1056    def test_chunked_extension(self):
1057        extra = '3;foo=bar\r\n' + 'abc\r\n'
1058        expected = chunked_expected + b'abc'
1059
1060        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1061        resp = client.HTTPResponse(sock, method="GET")
1062        resp.begin()
1063        self.assertEqual(resp.read(), expected)
1064        resp.close()
1065
1066    def test_chunked_missing_end(self):
1067        """some servers may serve up a short chunked encoding stream"""
1068        expected = chunked_expected
1069        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1070        resp = client.HTTPResponse(sock, method="GET")
1071        resp.begin()
1072        self.assertEqual(resp.read(), expected)
1073        resp.close()
1074
1075    def test_chunked_trailers(self):
1076        """See that trailers are read and ignored"""
1077        expected = chunked_expected
1078        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1079        resp = client.HTTPResponse(sock, method="GET")
1080        resp.begin()
1081        self.assertEqual(resp.read(), expected)
1082        # we should have reached the end of the file
1083        self.assertEqual(sock.file.read(), b"") #we read to the end
1084        resp.close()
1085
1086    def test_chunked_sync(self):
1087        """Check that we don't read past the end of the chunked-encoding stream"""
1088        expected = chunked_expected
1089        extradata = "extradata"
1090        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1091        resp = client.HTTPResponse(sock, method="GET")
1092        resp.begin()
1093        self.assertEqual(resp.read(), expected)
1094        # the file should now have our extradata ready to be read
1095        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1096        resp.close()
1097
1098    def test_content_length_sync(self):
1099        """Check that we don't read past the end of the Content-Length stream"""
1100        extradata = b"extradata"
1101        expected = b"Hello123\r\n"
1102        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1103        resp = client.HTTPResponse(sock, method="GET")
1104        resp.begin()
1105        self.assertEqual(resp.read(), expected)
1106        # the file should now have our extradata ready to be read
1107        self.assertEqual(sock.file.read(), extradata) #we read to the end
1108        resp.close()
1109
1110    def test_readlines_content_length(self):
1111        extradata = b"extradata"
1112        expected = b"Hello123\r\n"
1113        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1114        resp = client.HTTPResponse(sock, method="GET")
1115        resp.begin()
1116        self.assertEqual(resp.readlines(2000), [expected])
1117        # the file should now have our extradata ready to be read
1118        self.assertEqual(sock.file.read(), extradata) #we read to the end
1119        resp.close()
1120
1121    def test_read1_content_length(self):
1122        extradata = b"extradata"
1123        expected = b"Hello123\r\n"
1124        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1125        resp = client.HTTPResponse(sock, method="GET")
1126        resp.begin()
1127        self.assertEqual(resp.read1(2000), expected)
1128        # the file should now have our extradata ready to be read
1129        self.assertEqual(sock.file.read(), extradata) #we read to the end
1130        resp.close()
1131
1132    def test_readline_bound_content_length(self):
1133        extradata = b"extradata"
1134        expected = b"Hello123\r\n"
1135        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1136        resp = client.HTTPResponse(sock, method="GET")
1137        resp.begin()
1138        self.assertEqual(resp.readline(10), expected)
1139        self.assertEqual(resp.readline(10), b"")
1140        # the file should now have our extradata ready to be read
1141        self.assertEqual(sock.file.read(), extradata) #we read to the end
1142        resp.close()
1143
1144    def test_read1_bound_content_length(self):
1145        extradata = b"extradata"
1146        expected = b"Hello123\r\n"
1147        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1148        resp = client.HTTPResponse(sock, method="GET")
1149        resp.begin()
1150        self.assertEqual(resp.read1(20), expected*2)
1151        self.assertEqual(resp.read(), expected)
1152        # the file should now have our extradata ready to be read
1153        self.assertEqual(sock.file.read(), extradata) #we read to the end
1154        resp.close()
1155
1156    def test_response_fileno(self):
1157        # Make sure fd returned by fileno is valid.
1158        serv = socket.create_server((HOST, 0))
1159        self.addCleanup(serv.close)
1160
1161        result = None
1162        def run_server():
1163            [conn, address] = serv.accept()
1164            with conn, conn.makefile("rb") as reader:
1165                # Read the request header until a blank line
1166                while True:
1167                    line = reader.readline()
1168                    if not line.rstrip(b"\r\n"):
1169                        break
1170                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1171                nonlocal result
1172                result = reader.read()
1173
1174        thread = threading.Thread(target=run_server)
1175        thread.start()
1176        self.addCleanup(thread.join, float(1))
1177        conn = client.HTTPConnection(*serv.getsockname())
1178        conn.request("CONNECT", "dummy:1234")
1179        response = conn.getresponse()
1180        try:
1181            self.assertEqual(response.status, client.OK)
1182            s = socket.socket(fileno=response.fileno())
1183            try:
1184                s.sendall(b"proxied data\n")
1185            finally:
1186                s.detach()
1187        finally:
1188            response.close()
1189            conn.close()
1190        thread.join()
1191        self.assertEqual(result, b"proxied data\n")
1192
1193    def test_putrequest_override_domain_validation(self):
1194        """
1195        It should be possible to override the default validation
1196        behavior in putrequest (bpo-38216).
1197        """
1198        class UnsafeHTTPConnection(client.HTTPConnection):
1199            def _validate_path(self, url):
1200                pass
1201
1202        conn = UnsafeHTTPConnection('example.com')
1203        conn.sock = FakeSocket('')
1204        conn.putrequest('GET', '/\x00')
1205
1206    def test_putrequest_override_host_validation(self):
1207        class UnsafeHTTPConnection(client.HTTPConnection):
1208            def _validate_host(self, url):
1209                pass
1210
1211        conn = UnsafeHTTPConnection('example.com\r\n')
1212        conn.sock = FakeSocket('')
1213        # set skip_host so a ValueError is not raised upon adding the
1214        # invalid URL as the value of the "Host:" header
1215        conn.putrequest('GET', '/', skip_host=1)
1216
1217    def test_putrequest_override_encoding(self):
1218        """
1219        It should be possible to override the default encoding
1220        to transmit bytes in another encoding even if invalid
1221        (bpo-36274).
1222        """
1223        class UnsafeHTTPConnection(client.HTTPConnection):
1224            def _encode_request(self, str_url):
1225                return str_url.encode('utf-8')
1226
1227        conn = UnsafeHTTPConnection('example.com')
1228        conn.sock = FakeSocket('')
1229        conn.putrequest('GET', '/☃')
1230
1231
1232class ExtendedReadTest(TestCase):
1233    """
1234    Test peek(), read1(), readline()
1235    """
1236    lines = (
1237        'HTTP/1.1 200 OK\r\n'
1238        '\r\n'
1239        'hello world!\n'
1240        'and now \n'
1241        'for something completely different\n'
1242        'foo'
1243        )
1244    lines_expected = lines[lines.find('hello'):].encode("ascii")
1245    lines_chunked = (
1246        'HTTP/1.1 200 OK\r\n'
1247        'Transfer-Encoding: chunked\r\n\r\n'
1248        'a\r\n'
1249        'hello worl\r\n'
1250        '3\r\n'
1251        'd!\n\r\n'
1252        '9\r\n'
1253        'and now \n\r\n'
1254        '23\r\n'
1255        'for something completely different\n\r\n'
1256        '3\r\n'
1257        'foo\r\n'
1258        '0\r\n' # terminating chunk
1259        '\r\n'  # end of trailers
1260    )
1261
1262    def setUp(self):
1263        sock = FakeSocket(self.lines)
1264        resp = client.HTTPResponse(sock, method="GET")
1265        resp.begin()
1266        resp.fp = io.BufferedReader(resp.fp)
1267        self.resp = resp
1268
1269
1270
1271    def test_peek(self):
1272        resp = self.resp
1273        # patch up the buffered peek so that it returns not too much stuff
1274        oldpeek = resp.fp.peek
1275        def mypeek(n=-1):
1276            p = oldpeek(n)
1277            if n >= 0:
1278                return p[:n]
1279            return p[:10]
1280        resp.fp.peek = mypeek
1281
1282        all = []
1283        while True:
1284            # try a short peek
1285            p = resp.peek(3)
1286            if p:
1287                self.assertGreater(len(p), 0)
1288                # then unbounded peek
1289                p2 = resp.peek()
1290                self.assertGreaterEqual(len(p2), len(p))
1291                self.assertTrue(p2.startswith(p))
1292                next = resp.read(len(p2))
1293                self.assertEqual(next, p2)
1294            else:
1295                next = resp.read()
1296                self.assertFalse(next)
1297            all.append(next)
1298            if not next:
1299                break
1300        self.assertEqual(b"".join(all), self.lines_expected)
1301
1302    def test_readline(self):
1303        resp = self.resp
1304        self._verify_readline(self.resp.readline, self.lines_expected)
1305
1306    def _verify_readline(self, readline, expected):
1307        all = []
1308        while True:
1309            # short readlines
1310            line = readline(5)
1311            if line and line != b"foo":
1312                if len(line) < 5:
1313                    self.assertTrue(line.endswith(b"\n"))
1314            all.append(line)
1315            if not line:
1316                break
1317        self.assertEqual(b"".join(all), expected)
1318
1319    def test_read1(self):
1320        resp = self.resp
1321        def r():
1322            res = resp.read1(4)
1323            self.assertLessEqual(len(res), 4)
1324            return res
1325        readliner = Readliner(r)
1326        self._verify_readline(readliner.readline, self.lines_expected)
1327
1328    def test_read1_unbounded(self):
1329        resp = self.resp
1330        all = []
1331        while True:
1332            data = resp.read1()
1333            if not data:
1334                break
1335            all.append(data)
1336        self.assertEqual(b"".join(all), self.lines_expected)
1337
1338    def test_read1_bounded(self):
1339        resp = self.resp
1340        all = []
1341        while True:
1342            data = resp.read1(10)
1343            if not data:
1344                break
1345            self.assertLessEqual(len(data), 10)
1346            all.append(data)
1347        self.assertEqual(b"".join(all), self.lines_expected)
1348
1349    def test_read1_0(self):
1350        self.assertEqual(self.resp.read1(0), b"")
1351
1352    def test_peek_0(self):
1353        p = self.resp.peek(0)
1354        self.assertLessEqual(0, len(p))
1355
1356
1357class ExtendedReadTestChunked(ExtendedReadTest):
1358    """
1359    Test peek(), read1(), readline() in chunked mode
1360    """
1361    lines = (
1362        'HTTP/1.1 200 OK\r\n'
1363        'Transfer-Encoding: chunked\r\n\r\n'
1364        'a\r\n'
1365        'hello worl\r\n'
1366        '3\r\n'
1367        'd!\n\r\n'
1368        '9\r\n'
1369        'and now \n\r\n'
1370        '23\r\n'
1371        'for something completely different\n\r\n'
1372        '3\r\n'
1373        'foo\r\n'
1374        '0\r\n' # terminating chunk
1375        '\r\n'  # end of trailers
1376    )
1377
1378
1379class Readliner:
1380    """
1381    a simple readline class that uses an arbitrary read function and buffering
1382    """
1383    def __init__(self, readfunc):
1384        self.readfunc = readfunc
1385        self.remainder = b""
1386
1387    def readline(self, limit):
1388        data = []
1389        datalen = 0
1390        read = self.remainder
1391        try:
1392            while True:
1393                idx = read.find(b'\n')
1394                if idx != -1:
1395                    break
1396                if datalen + len(read) >= limit:
1397                    idx = limit - datalen - 1
1398                # read more data
1399                data.append(read)
1400                read = self.readfunc()
1401                if not read:
1402                    idx = 0 #eof condition
1403                    break
1404            idx += 1
1405            data.append(read[:idx])
1406            self.remainder = read[idx:]
1407            return b"".join(data)
1408        except:
1409            self.remainder = b"".join(data)
1410            raise
1411
1412
1413class OfflineTest(TestCase):
1414    def test_all(self):
1415        # Documented objects defined in the module should be in __all__
1416        expected = {"responses"}  # Allowlist documented dict() object
1417        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1418        # intentionally omitted for simplicity
1419        blacklist = {"HTTPMessage", "parse_headers"}
1420        for name in dir(client):
1421            if name.startswith("_") or name in blacklist:
1422                continue
1423            module_object = getattr(client, name)
1424            if getattr(module_object, "__module__", None) == "http.client":
1425                expected.add(name)
1426        self.assertCountEqual(client.__all__, expected)
1427
1428    def test_responses(self):
1429        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1430
1431    def test_client_constants(self):
1432        # Make sure we don't break backward compatibility with 3.4
1433        expected = [
1434            'CONTINUE',
1435            'SWITCHING_PROTOCOLS',
1436            'PROCESSING',
1437            'OK',
1438            'CREATED',
1439            'ACCEPTED',
1440            'NON_AUTHORITATIVE_INFORMATION',
1441            'NO_CONTENT',
1442            'RESET_CONTENT',
1443            'PARTIAL_CONTENT',
1444            'MULTI_STATUS',
1445            'IM_USED',
1446            'MULTIPLE_CHOICES',
1447            'MOVED_PERMANENTLY',
1448            'FOUND',
1449            'SEE_OTHER',
1450            'NOT_MODIFIED',
1451            'USE_PROXY',
1452            'TEMPORARY_REDIRECT',
1453            'BAD_REQUEST',
1454            'UNAUTHORIZED',
1455            'PAYMENT_REQUIRED',
1456            'FORBIDDEN',
1457            'NOT_FOUND',
1458            'METHOD_NOT_ALLOWED',
1459            'NOT_ACCEPTABLE',
1460            'PROXY_AUTHENTICATION_REQUIRED',
1461            'REQUEST_TIMEOUT',
1462            'CONFLICT',
1463            'GONE',
1464            'LENGTH_REQUIRED',
1465            'PRECONDITION_FAILED',
1466            'REQUEST_ENTITY_TOO_LARGE',
1467            'REQUEST_URI_TOO_LONG',
1468            'UNSUPPORTED_MEDIA_TYPE',
1469            'REQUESTED_RANGE_NOT_SATISFIABLE',
1470            'EXPECTATION_FAILED',
1471            'MISDIRECTED_REQUEST',
1472            'UNPROCESSABLE_ENTITY',
1473            'LOCKED',
1474            'FAILED_DEPENDENCY',
1475            'UPGRADE_REQUIRED',
1476            'PRECONDITION_REQUIRED',
1477            'TOO_MANY_REQUESTS',
1478            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1479            'UNAVAILABLE_FOR_LEGAL_REASONS',
1480            'INTERNAL_SERVER_ERROR',
1481            'NOT_IMPLEMENTED',
1482            'BAD_GATEWAY',
1483            'SERVICE_UNAVAILABLE',
1484            'GATEWAY_TIMEOUT',
1485            'HTTP_VERSION_NOT_SUPPORTED',
1486            'INSUFFICIENT_STORAGE',
1487            'NOT_EXTENDED',
1488            'NETWORK_AUTHENTICATION_REQUIRED',
1489        ]
1490        for const in expected:
1491            with self.subTest(constant=const):
1492                self.assertTrue(hasattr(client, const))
1493
1494
1495class SourceAddressTest(TestCase):
1496    def setUp(self):
1497        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1498        self.port = support.bind_port(self.serv)
1499        self.source_port = support.find_unused_port()
1500        self.serv.listen()
1501        self.conn = None
1502
1503    def tearDown(self):
1504        if self.conn:
1505            self.conn.close()
1506            self.conn = None
1507        self.serv.close()
1508        self.serv = None
1509
1510    def testHTTPConnectionSourceAddress(self):
1511        self.conn = client.HTTPConnection(HOST, self.port,
1512                source_address=('', self.source_port))
1513        self.conn.connect()
1514        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1515
1516    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1517                     'http.client.HTTPSConnection not defined')
1518    def testHTTPSConnectionSourceAddress(self):
1519        self.conn = client.HTTPSConnection(HOST, self.port,
1520                source_address=('', self.source_port))
1521        # We don't test anything here other than the constructor not barfing as
1522        # this code doesn't deal with setting up an active running SSL server
1523        # for an ssl_wrapped connect() to actually return from.
1524
1525
1526class TimeoutTest(TestCase):
1527    PORT = None
1528
1529    def setUp(self):
1530        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1531        TimeoutTest.PORT = support.bind_port(self.serv)
1532        self.serv.listen()
1533
1534    def tearDown(self):
1535        self.serv.close()
1536        self.serv = None
1537
1538    def testTimeoutAttribute(self):
1539        # This will prove that the timeout gets through HTTPConnection
1540        # and into the socket.
1541
1542        # default -- use global socket timeout
1543        self.assertIsNone(socket.getdefaulttimeout())
1544        socket.setdefaulttimeout(30)
1545        try:
1546            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1547            httpConn.connect()
1548        finally:
1549            socket.setdefaulttimeout(None)
1550        self.assertEqual(httpConn.sock.gettimeout(), 30)
1551        httpConn.close()
1552
1553        # no timeout -- do not use global socket default
1554        self.assertIsNone(socket.getdefaulttimeout())
1555        socket.setdefaulttimeout(30)
1556        try:
1557            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1558                                              timeout=None)
1559            httpConn.connect()
1560        finally:
1561            socket.setdefaulttimeout(None)
1562        self.assertEqual(httpConn.sock.gettimeout(), None)
1563        httpConn.close()
1564
1565        # a value
1566        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1567        httpConn.connect()
1568        self.assertEqual(httpConn.sock.gettimeout(), 30)
1569        httpConn.close()
1570
1571
1572class PersistenceTest(TestCase):
1573
1574    def test_reuse_reconnect(self):
1575        # Should reuse or reconnect depending on header from server
1576        tests = (
1577            ('1.0', '', False),
1578            ('1.0', 'Connection: keep-alive\r\n', True),
1579            ('1.1', '', True),
1580            ('1.1', 'Connection: close\r\n', False),
1581            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1582            ('1.1', 'Connection: cloSE\r\n', False),
1583        )
1584        for version, header, reuse in tests:
1585            with self.subTest(version=version, header=header):
1586                msg = (
1587                    'HTTP/{} 200 OK\r\n'
1588                    '{}'
1589                    'Content-Length: 12\r\n'
1590                    '\r\n'
1591                    'Dummy body\r\n'
1592                ).format(version, header)
1593                conn = FakeSocketHTTPConnection(msg)
1594                self.assertIsNone(conn.sock)
1595                conn.request('GET', '/open-connection')
1596                with conn.getresponse() as response:
1597                    self.assertEqual(conn.sock is None, not reuse)
1598                    response.read()
1599                self.assertEqual(conn.sock is None, not reuse)
1600                self.assertEqual(conn.connections, 1)
1601                conn.request('GET', '/subsequent-request')
1602                self.assertEqual(conn.connections, 1 if reuse else 2)
1603
1604    def test_disconnected(self):
1605
1606        def make_reset_reader(text):
1607            """Return BufferedReader that raises ECONNRESET at EOF"""
1608            stream = io.BytesIO(text)
1609            def readinto(buffer):
1610                size = io.BytesIO.readinto(stream, buffer)
1611                if size == 0:
1612                    raise ConnectionResetError()
1613                return size
1614            stream.readinto = readinto
1615            return io.BufferedReader(stream)
1616
1617        tests = (
1618            (io.BytesIO, client.RemoteDisconnected),
1619            (make_reset_reader, ConnectionResetError),
1620        )
1621        for stream_factory, exception in tests:
1622            with self.subTest(exception=exception):
1623                conn = FakeSocketHTTPConnection(b'', stream_factory)
1624                conn.request('GET', '/eof-response')
1625                self.assertRaises(exception, conn.getresponse)
1626                self.assertIsNone(conn.sock)
1627                # HTTPConnection.connect() should be automatically invoked
1628                conn.request('GET', '/reconnect')
1629                self.assertEqual(conn.connections, 2)
1630
1631    def test_100_close(self):
1632        conn = FakeSocketHTTPConnection(
1633            b'HTTP/1.1 100 Continue\r\n'
1634            b'\r\n'
1635            # Missing final response
1636        )
1637        conn.request('GET', '/', headers={'Expect': '100-continue'})
1638        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1639        self.assertIsNone(conn.sock)
1640        conn.request('GET', '/reconnect')
1641        self.assertEqual(conn.connections, 2)
1642
1643
1644class HTTPSTest(TestCase):
1645
1646    def setUp(self):
1647        if not hasattr(client, 'HTTPSConnection'):
1648            self.skipTest('ssl support required')
1649
1650    def make_server(self, certfile):
1651        from test.ssl_servers import make_https_server
1652        return make_https_server(self, certfile=certfile)
1653
1654    def test_attributes(self):
1655        # simple test to check it's storing the timeout
1656        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1657        self.assertEqual(h.timeout, 30)
1658
1659    def test_networked(self):
1660        # Default settings: requires a valid cert from a trusted CA
1661        import ssl
1662        support.requires('network')
1663        with support.transient_internet('self-signed.pythontest.net'):
1664            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1665            with self.assertRaises(ssl.SSLError) as exc_info:
1666                h.request('GET', '/')
1667            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1668
1669    def test_networked_noverification(self):
1670        # Switch off cert verification
1671        import ssl
1672        support.requires('network')
1673        with support.transient_internet('self-signed.pythontest.net'):
1674            context = ssl._create_unverified_context()
1675            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1676                                       context=context)
1677            h.request('GET', '/')
1678            resp = h.getresponse()
1679            h.close()
1680            self.assertIn('nginx', resp.getheader('server'))
1681            resp.close()
1682
1683    @support.system_must_validate_cert
1684    def test_networked_trusted_by_default_cert(self):
1685        # Default settings: requires a valid cert from a trusted CA
1686        support.requires('network')
1687        with support.transient_internet('www.python.org'):
1688            h = client.HTTPSConnection('www.python.org', 443)
1689            h.request('GET', '/')
1690            resp = h.getresponse()
1691            content_type = resp.getheader('content-type')
1692            resp.close()
1693            h.close()
1694            self.assertIn('text/html', content_type)
1695
1696    def test_networked_good_cert(self):
1697        # We feed the server's cert as a validating cert
1698        import ssl
1699        support.requires('network')
1700        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1701        with support.transient_internet(selfsigned_pythontestdotnet):
1702            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1703            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1704            self.assertEqual(context.check_hostname, True)
1705            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1706            try:
1707                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1708                                           context=context)
1709                h.request('GET', '/')
1710                resp = h.getresponse()
1711            except ssl.SSLError as ssl_err:
1712                ssl_err_str = str(ssl_err)
1713                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1714                # modern Linux distros (Debian Buster, etc) default OpenSSL
1715                # configurations it'll fail saying "key too weak" until we
1716                # address https://bugs.python.org/issue36816 to use a proper
1717                # key size on self-signed.pythontest.net.
1718                if re.search(r'(?i)key.too.weak', ssl_err_str):
1719                    raise unittest.SkipTest(
1720                        f'Got {ssl_err_str} trying to connect '
1721                        f'to {selfsigned_pythontestdotnet}. '
1722                        'See https://bugs.python.org/issue36816.')
1723                raise
1724            server_string = resp.getheader('server')
1725            resp.close()
1726            h.close()
1727            self.assertIn('nginx', server_string)
1728
1729    def test_networked_bad_cert(self):
1730        # We feed a "CA" cert that is unrelated to the server's cert
1731        import ssl
1732        support.requires('network')
1733        with support.transient_internet('self-signed.pythontest.net'):
1734            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1735            context.load_verify_locations(CERT_localhost)
1736            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1737            with self.assertRaises(ssl.SSLError) as exc_info:
1738                h.request('GET', '/')
1739            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1740
1741    def test_local_unknown_cert(self):
1742        # The custom cert isn't known to the default trust bundle
1743        import ssl
1744        server = self.make_server(CERT_localhost)
1745        h = client.HTTPSConnection('localhost', server.port)
1746        with self.assertRaises(ssl.SSLError) as exc_info:
1747            h.request('GET', '/')
1748        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1749
1750    def test_local_good_hostname(self):
1751        # The (valid) cert validates the HTTP hostname
1752        import ssl
1753        server = self.make_server(CERT_localhost)
1754        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1755        context.load_verify_locations(CERT_localhost)
1756        h = client.HTTPSConnection('localhost', server.port, context=context)
1757        self.addCleanup(h.close)
1758        h.request('GET', '/nonexistent')
1759        resp = h.getresponse()
1760        self.addCleanup(resp.close)
1761        self.assertEqual(resp.status, 404)
1762
1763    def test_local_bad_hostname(self):
1764        # The (valid) cert doesn't validate the HTTP hostname
1765        import ssl
1766        server = self.make_server(CERT_fakehostname)
1767        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1768        context.load_verify_locations(CERT_fakehostname)
1769        h = client.HTTPSConnection('localhost', server.port, context=context)
1770        with self.assertRaises(ssl.CertificateError):
1771            h.request('GET', '/')
1772        # Same with explicit check_hostname=True
1773        with support.check_warnings(('', DeprecationWarning)):
1774            h = client.HTTPSConnection('localhost', server.port,
1775                                       context=context, check_hostname=True)
1776        with self.assertRaises(ssl.CertificateError):
1777            h.request('GET', '/')
1778        # With check_hostname=False, the mismatching is ignored
1779        context.check_hostname = False
1780        with support.check_warnings(('', DeprecationWarning)):
1781            h = client.HTTPSConnection('localhost', server.port,
1782                                       context=context, check_hostname=False)
1783        h.request('GET', '/nonexistent')
1784        resp = h.getresponse()
1785        resp.close()
1786        h.close()
1787        self.assertEqual(resp.status, 404)
1788        # The context's check_hostname setting is used if one isn't passed to
1789        # HTTPSConnection.
1790        context.check_hostname = False
1791        h = client.HTTPSConnection('localhost', server.port, context=context)
1792        h.request('GET', '/nonexistent')
1793        resp = h.getresponse()
1794        self.assertEqual(resp.status, 404)
1795        resp.close()
1796        h.close()
1797        # Passing check_hostname to HTTPSConnection should override the
1798        # context's setting.
1799        with support.check_warnings(('', DeprecationWarning)):
1800            h = client.HTTPSConnection('localhost', server.port,
1801                                       context=context, check_hostname=True)
1802        with self.assertRaises(ssl.CertificateError):
1803            h.request('GET', '/')
1804
1805    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1806                     'http.client.HTTPSConnection not available')
1807    def test_host_port(self):
1808        # Check invalid host_port
1809
1810        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1811            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1812
1813        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1814                          "fe80::207:e9ff:fe9b", 8000),
1815                         ("www.python.org:443", "www.python.org", 443),
1816                         ("www.python.org:", "www.python.org", 443),
1817                         ("www.python.org", "www.python.org", 443),
1818                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1819                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1820                             443)):
1821            c = client.HTTPSConnection(hp)
1822            self.assertEqual(h, c.host)
1823            self.assertEqual(p, c.port)
1824
1825    def test_tls13_pha(self):
1826        import ssl
1827        if not ssl.HAS_TLSv1_3:
1828            self.skipTest('TLS 1.3 support required')
1829        # just check status of PHA flag
1830        h = client.HTTPSConnection('localhost', 443)
1831        self.assertTrue(h._context.post_handshake_auth)
1832
1833        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1834        self.assertFalse(context.post_handshake_auth)
1835        h = client.HTTPSConnection('localhost', 443, context=context)
1836        self.assertIs(h._context, context)
1837        self.assertFalse(h._context.post_handshake_auth)
1838
1839        with warnings.catch_warnings():
1840            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1841                                    DeprecationWarning)
1842            h = client.HTTPSConnection('localhost', 443, context=context,
1843                                       cert_file=CERT_localhost)
1844        self.assertTrue(h._context.post_handshake_auth)
1845
1846
1847class RequestBodyTest(TestCase):
1848    """Test cases where a request includes a message body."""
1849
1850    def setUp(self):
1851        self.conn = client.HTTPConnection('example.com')
1852        self.conn.sock = self.sock = FakeSocket("")
1853        self.conn.sock = self.sock
1854
1855    def get_headers_and_fp(self):
1856        f = io.BytesIO(self.sock.data)
1857        f.readline()  # read the request line
1858        message = client.parse_headers(f)
1859        return message, f
1860
1861    def test_list_body(self):
1862        # Note that no content-length is automatically calculated for
1863        # an iterable.  The request will fall back to send chunked
1864        # transfer encoding.
1865        cases = (
1866            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1867            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1868        )
1869        for body, expected in cases:
1870            with self.subTest(body):
1871                self.conn = client.HTTPConnection('example.com')
1872                self.conn.sock = self.sock = FakeSocket('')
1873
1874                self.conn.request('PUT', '/url', body)
1875                msg, f = self.get_headers_and_fp()
1876                self.assertNotIn('Content-Type', msg)
1877                self.assertNotIn('Content-Length', msg)
1878                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1879                self.assertEqual(expected, f.read())
1880
1881    def test_manual_content_length(self):
1882        # Set an incorrect content-length so that we can verify that
1883        # it will not be over-ridden by the library.
1884        self.conn.request("PUT", "/url", "body",
1885                          {"Content-Length": "42"})
1886        message, f = self.get_headers_and_fp()
1887        self.assertEqual("42", message.get("content-length"))
1888        self.assertEqual(4, len(f.read()))
1889
1890    def test_ascii_body(self):
1891        self.conn.request("PUT", "/url", "body")
1892        message, f = self.get_headers_and_fp()
1893        self.assertEqual("text/plain", message.get_content_type())
1894        self.assertIsNone(message.get_charset())
1895        self.assertEqual("4", message.get("content-length"))
1896        self.assertEqual(b'body', f.read())
1897
1898    def test_latin1_body(self):
1899        self.conn.request("PUT", "/url", "body\xc1")
1900        message, f = self.get_headers_and_fp()
1901        self.assertEqual("text/plain", message.get_content_type())
1902        self.assertIsNone(message.get_charset())
1903        self.assertEqual("5", message.get("content-length"))
1904        self.assertEqual(b'body\xc1', f.read())
1905
1906    def test_bytes_body(self):
1907        self.conn.request("PUT", "/url", b"body\xc1")
1908        message, f = self.get_headers_and_fp()
1909        self.assertEqual("text/plain", message.get_content_type())
1910        self.assertIsNone(message.get_charset())
1911        self.assertEqual("5", message.get("content-length"))
1912        self.assertEqual(b'body\xc1', f.read())
1913
1914    def test_text_file_body(self):
1915        self.addCleanup(support.unlink, support.TESTFN)
1916        with open(support.TESTFN, "w") as f:
1917            f.write("body")
1918        with open(support.TESTFN) as f:
1919            self.conn.request("PUT", "/url", f)
1920            message, f = self.get_headers_and_fp()
1921            self.assertEqual("text/plain", message.get_content_type())
1922            self.assertIsNone(message.get_charset())
1923            # No content-length will be determined for files; the body
1924            # will be sent using chunked transfer encoding instead.
1925            self.assertIsNone(message.get("content-length"))
1926            self.assertEqual("chunked", message.get("transfer-encoding"))
1927            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1928
1929    def test_binary_file_body(self):
1930        self.addCleanup(support.unlink, support.TESTFN)
1931        with open(support.TESTFN, "wb") as f:
1932            f.write(b"body\xc1")
1933        with open(support.TESTFN, "rb") as f:
1934            self.conn.request("PUT", "/url", f)
1935            message, f = self.get_headers_and_fp()
1936            self.assertEqual("text/plain", message.get_content_type())
1937            self.assertIsNone(message.get_charset())
1938            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1939            self.assertNotIn("Content-Length", message)
1940            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1941
1942
1943class HTTPResponseTest(TestCase):
1944
1945    def setUp(self):
1946        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1947                second-value\r\n\r\nText"
1948        sock = FakeSocket(body)
1949        self.resp = client.HTTPResponse(sock)
1950        self.resp.begin()
1951
1952    def test_getting_header(self):
1953        header = self.resp.getheader('My-Header')
1954        self.assertEqual(header, 'first-value, second-value')
1955
1956        header = self.resp.getheader('My-Header', 'some default')
1957        self.assertEqual(header, 'first-value, second-value')
1958
1959    def test_getting_nonexistent_header_with_string_default(self):
1960        header = self.resp.getheader('No-Such-Header', 'default-value')
1961        self.assertEqual(header, 'default-value')
1962
1963    def test_getting_nonexistent_header_with_iterable_default(self):
1964        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1965        self.assertEqual(header, 'default, values')
1966
1967        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1968        self.assertEqual(header, 'default, values')
1969
1970    def test_getting_nonexistent_header_without_default(self):
1971        header = self.resp.getheader('No-Such-Header')
1972        self.assertEqual(header, None)
1973
1974    def test_getting_header_defaultint(self):
1975        header = self.resp.getheader('No-Such-Header',default=42)
1976        self.assertEqual(header, 42)
1977
1978class TunnelTests(TestCase):
1979    def setUp(self):
1980        response_text = (
1981            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1982            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1983            'Content-Length: 42\r\n\r\n'
1984        )
1985        self.host = 'proxy.com'
1986        self.conn = client.HTTPConnection(self.host)
1987        self.conn._create_connection = self._create_connection(response_text)
1988
1989    def tearDown(self):
1990        self.conn.close()
1991
1992    def _create_connection(self, response_text):
1993        def create_connection(address, timeout=None, source_address=None):
1994            return FakeSocket(response_text, host=address[0], port=address[1])
1995        return create_connection
1996
1997    def test_set_tunnel_host_port_headers(self):
1998        tunnel_host = 'destination.com'
1999        tunnel_port = 8888
2000        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
2001        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
2002                             headers=tunnel_headers)
2003        self.conn.request('HEAD', '/', '')
2004        self.assertEqual(self.conn.sock.host, self.host)
2005        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2006        self.assertEqual(self.conn._tunnel_host, tunnel_host)
2007        self.assertEqual(self.conn._tunnel_port, tunnel_port)
2008        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2009
2010    def test_disallow_set_tunnel_after_connect(self):
2011        # Once connected, we shouldn't be able to tunnel anymore
2012        self.conn.connect()
2013        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2014                          'destination.com')
2015
2016    def test_connect_with_tunnel(self):
2017        self.conn.set_tunnel('destination.com')
2018        self.conn.request('HEAD', '/', '')
2019        self.assertEqual(self.conn.sock.host, self.host)
2020        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2021        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2022        # issue22095
2023        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2024        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2025
2026        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2027        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2028
2029    def test_connect_put_request(self):
2030        self.conn.set_tunnel('destination.com')
2031        self.conn.request('PUT', '/', '')
2032        self.assertEqual(self.conn.sock.host, self.host)
2033        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2034        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2035        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2036
2037    def test_tunnel_debuglog(self):
2038        expected_header = 'X-Dummy: 1'
2039        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2040
2041        self.conn.set_debuglevel(1)
2042        self.conn._create_connection = self._create_connection(response_text)
2043        self.conn.set_tunnel('destination.com')
2044
2045        with support.captured_stdout() as output:
2046            self.conn.request('PUT', '/', '')
2047        lines = output.getvalue().splitlines()
2048        self.assertIn('header: {}'.format(expected_header), lines)
2049
2050
2051if __name__ == '__main__':
2052    unittest.main(verbosity=2)
2053