• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13TestCase = unittest.TestCase
14
15from test import support
16
17here = os.path.dirname(__file__)
18# Self-signed cert file for 'localhost'
19CERT_localhost = os.path.join(here, 'keycert.pem')
20# Self-signed cert file for 'fakehostname'
21CERT_fakehostname = os.path.join(here, 'keycert2.pem')
22# Self-signed cert file for self-signed.pythontest.net
23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
24
25# constants for testing chunked encoding
26chunked_start = (
27    'HTTP/1.1 200 OK\r\n'
28    'Transfer-Encoding: chunked\r\n\r\n'
29    'a\r\n'
30    'hello worl\r\n'
31    '3\r\n'
32    'd! \r\n'
33    '8\r\n'
34    'and now \r\n'
35    '22\r\n'
36    'for something completely different\r\n'
37)
38chunked_expected = b'hello world! and now for something completely different'
39chunk_extension = ";foo=bar"
40last_chunk = "0\r\n"
41last_chunk_extended = "0" + chunk_extension + "\r\n"
42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
43chunked_end = "\r\n"
44
45HOST = support.HOST
46
47class FakeSocket:
48    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
49        if isinstance(text, str):
50            text = text.encode("ascii")
51        self.text = text
52        self.fileclass = fileclass
53        self.data = b''
54        self.sendall_calls = 0
55        self.file_closed = False
56        self.host = host
57        self.port = port
58
59    def sendall(self, data):
60        self.sendall_calls += 1
61        self.data += data
62
63    def makefile(self, mode, bufsize=None):
64        if mode != 'r' and mode != 'rb':
65            raise client.UnimplementedFileMode()
66        # keep the file around so we can check how much was read from it
67        self.file = self.fileclass(self.text)
68        self.file.close = self.file_close #nerf close ()
69        return self.file
70
71    def file_close(self):
72        self.file_closed = True
73
74    def close(self):
75        pass
76
77    def setsockopt(self, level, optname, value):
78        pass
79
80class EPipeSocket(FakeSocket):
81
82    def __init__(self, text, pipe_trigger):
83        # When sendall() is called with pipe_trigger, raise EPIPE.
84        FakeSocket.__init__(self, text)
85        self.pipe_trigger = pipe_trigger
86
87    def sendall(self, data):
88        if self.pipe_trigger in data:
89            raise OSError(errno.EPIPE, "gotcha")
90        self.data += data
91
92    def close(self):
93        pass
94
95class NoEOFBytesIO(io.BytesIO):
96    """Like BytesIO, but raises AssertionError on EOF.
97
98    This is used below to test that http.client doesn't try to read
99    more from the underlying file than it should.
100    """
101    def read(self, n=-1):
102        data = io.BytesIO.read(self, n)
103        if data == b'':
104            raise AssertionError('caller tried to read past EOF')
105        return data
106
107    def readline(self, length=None):
108        data = io.BytesIO.readline(self, length)
109        if data == b'':
110            raise AssertionError('caller tried to read past EOF')
111        return data
112
113class FakeSocketHTTPConnection(client.HTTPConnection):
114    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
115
116    def __init__(self, *args):
117        self.connections = 0
118        super().__init__('example.com')
119        self.fake_socket_args = args
120        self._create_connection = self.create_connection
121
122    def connect(self):
123        """Count the number of times connect() is invoked"""
124        self.connections += 1
125        return super().connect()
126
127    def create_connection(self, *pos, **kw):
128        return FakeSocket(*self.fake_socket_args)
129
130class HeaderTests(TestCase):
131    def test_auto_headers(self):
132        # Some headers are added automatically, but should not be added by
133        # .request() if they are explicitly set.
134
135        class HeaderCountingBuffer(list):
136            def __init__(self):
137                self.count = {}
138            def append(self, item):
139                kv = item.split(b':')
140                if len(kv) > 1:
141                    # item is a 'Key: Value' header string
142                    lcKey = kv[0].decode('ascii').lower()
143                    self.count.setdefault(lcKey, 0)
144                    self.count[lcKey] += 1
145                list.append(self, item)
146
147        for explicit_header in True, False:
148            for header in 'Content-length', 'Host', 'Accept-encoding':
149                conn = client.HTTPConnection('example.com')
150                conn.sock = FakeSocket('blahblahblah')
151                conn._buffer = HeaderCountingBuffer()
152
153                body = 'spamspamspam'
154                headers = {}
155                if explicit_header:
156                    headers[header] = str(len(body))
157                conn.request('POST', '/', body, headers)
158                self.assertEqual(conn._buffer.count[header.lower()], 1)
159
160    def test_content_length_0(self):
161
162        class ContentLengthChecker(list):
163            def __init__(self):
164                list.__init__(self)
165                self.content_length = None
166            def append(self, item):
167                kv = item.split(b':', 1)
168                if len(kv) > 1 and kv[0].lower() == b'content-length':
169                    self.content_length = kv[1].strip()
170                list.append(self, item)
171
172        # Here, we're testing that methods expecting a body get a
173        # content-length set to zero if the body is empty (either None or '')
174        bodies = (None, '')
175        methods_with_body = ('PUT', 'POST', 'PATCH')
176        for method, body in itertools.product(methods_with_body, bodies):
177            conn = client.HTTPConnection('example.com')
178            conn.sock = FakeSocket(None)
179            conn._buffer = ContentLengthChecker()
180            conn.request(method, '/', body)
181            self.assertEqual(
182                conn._buffer.content_length, b'0',
183                'Header Content-Length incorrect on {}'.format(method)
184            )
185
186        # For these methods, we make sure that content-length is not set when
187        # the body is None because it might cause unexpected behaviour on the
188        # server.
189        methods_without_body = (
190             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
191        )
192        for method in methods_without_body:
193            conn = client.HTTPConnection('example.com')
194            conn.sock = FakeSocket(None)
195            conn._buffer = ContentLengthChecker()
196            conn.request(method, '/', None)
197            self.assertEqual(
198                conn._buffer.content_length, None,
199                'Header Content-Length set for empty body on {}'.format(method)
200            )
201
202        # If the body is set to '', that's considered to be "present but
203        # empty" rather than "missing", so content length would be set, even
204        # for methods that don't expect a body.
205        for method in methods_without_body:
206            conn = client.HTTPConnection('example.com')
207            conn.sock = FakeSocket(None)
208            conn._buffer = ContentLengthChecker()
209            conn.request(method, '/', '')
210            self.assertEqual(
211                conn._buffer.content_length, b'0',
212                'Header Content-Length incorrect on {}'.format(method)
213            )
214
215        # If the body is set, make sure Content-Length is set.
216        for method in itertools.chain(methods_without_body, methods_with_body):
217            conn = client.HTTPConnection('example.com')
218            conn.sock = FakeSocket(None)
219            conn._buffer = ContentLengthChecker()
220            conn.request(method, '/', ' ')
221            self.assertEqual(
222                conn._buffer.content_length, b'1',
223                'Header Content-Length incorrect on {}'.format(method)
224            )
225
226    def test_putheader(self):
227        conn = client.HTTPConnection('example.com')
228        conn.sock = FakeSocket(None)
229        conn.putrequest('GET','/')
230        conn.putheader('Content-length', 42)
231        self.assertIn(b'Content-length: 42', conn._buffer)
232
233        conn.putheader('Foo', ' bar ')
234        self.assertIn(b'Foo:  bar ', conn._buffer)
235        conn.putheader('Bar', '\tbaz\t')
236        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
237        conn.putheader('Authorization', 'Bearer mytoken')
238        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
239        conn.putheader('IterHeader', 'IterA', 'IterB')
240        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
241        conn.putheader('LatinHeader', b'\xFF')
242        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
243        conn.putheader('Utf8Header', b'\xc3\x80')
244        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
245        conn.putheader('C1-Control', b'next\x85line')
246        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
247        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
248        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
249        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
250        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
251        conn.putheader('Key Space', 'value')
252        self.assertIn(b'Key Space: value', conn._buffer)
253        conn.putheader('KeySpace ', 'value')
254        self.assertIn(b'KeySpace : value', conn._buffer)
255        conn.putheader(b'Nonbreak\xa0Space', 'value')
256        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
257        conn.putheader(b'\xa0NonbreakSpace', 'value')
258        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
259
260    def test_ipv6host_header(self):
261        # Default host header on IPv6 transaction should be wrapped by [] if
262        # it is an IPv6 address
263        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
264                   b'Accept-Encoding: identity\r\n\r\n'
265        conn = client.HTTPConnection('[2001::]:81')
266        sock = FakeSocket('')
267        conn.sock = sock
268        conn.request('GET', '/foo')
269        self.assertTrue(sock.data.startswith(expected))
270
271        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
272                   b'Accept-Encoding: identity\r\n\r\n'
273        conn = client.HTTPConnection('[2001:102A::]')
274        sock = FakeSocket('')
275        conn.sock = sock
276        conn.request('GET', '/foo')
277        self.assertTrue(sock.data.startswith(expected))
278
279    def test_malformed_headers_coped_with(self):
280        # Issue 19996
281        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
282        sock = FakeSocket(body)
283        resp = client.HTTPResponse(sock)
284        resp.begin()
285
286        self.assertEqual(resp.getheader('First'), 'val')
287        self.assertEqual(resp.getheader('Second'), 'val')
288
289    def test_parse_all_octets(self):
290        # Ensure no valid header field octet breaks the parser
291        body = (
292            b'HTTP/1.1 200 OK\r\n'
293            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
294            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
295            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
296            b'obs-fold: text\r\n'
297            b' folded with space\r\n'
298            b'\tfolded with tab\r\n'
299            b'Content-Length: 0\r\n'
300            b'\r\n'
301        )
302        sock = FakeSocket(body)
303        resp = client.HTTPResponse(sock)
304        resp.begin()
305        self.assertEqual(resp.getheader('Content-Length'), '0')
306        self.assertEqual(resp.msg['Content-Length'], '0')
307        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
308        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
309        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
310        self.assertEqual(resp.getheader('VCHAR'), vchar)
311        self.assertEqual(resp.msg['VCHAR'], vchar)
312        self.assertIsNotNone(resp.getheader('obs-text'))
313        self.assertIn('obs-text', resp.msg)
314        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
315            self.assertTrue(folded.startswith('text'))
316            self.assertIn(' folded with space', folded)
317            self.assertTrue(folded.endswith('folded with tab'))
318
319    def test_invalid_headers(self):
320        conn = client.HTTPConnection('example.com')
321        conn.sock = FakeSocket('')
322        conn.putrequest('GET', '/')
323
324        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
325        # longer allowed in header names
326        cases = (
327            (b'Invalid\r\nName', b'ValidValue'),
328            (b'Invalid\rName', b'ValidValue'),
329            (b'Invalid\nName', b'ValidValue'),
330            (b'\r\nInvalidName', b'ValidValue'),
331            (b'\rInvalidName', b'ValidValue'),
332            (b'\nInvalidName', b'ValidValue'),
333            (b' InvalidName', b'ValidValue'),
334            (b'\tInvalidName', b'ValidValue'),
335            (b'Invalid:Name', b'ValidValue'),
336            (b':InvalidName', b'ValidValue'),
337            (b'ValidName', b'Invalid\r\nValue'),
338            (b'ValidName', b'Invalid\rValue'),
339            (b'ValidName', b'Invalid\nValue'),
340            (b'ValidName', b'InvalidValue\r\n'),
341            (b'ValidName', b'InvalidValue\r'),
342            (b'ValidName', b'InvalidValue\n'),
343        )
344        for name, value in cases:
345            with self.subTest((name, value)):
346                with self.assertRaisesRegex(ValueError, 'Invalid header'):
347                    conn.putheader(name, value)
348
349    def test_headers_debuglevel(self):
350        body = (
351            b'HTTP/1.1 200 OK\r\n'
352            b'First: val\r\n'
353            b'Second: val1\r\n'
354            b'Second: val2\r\n'
355        )
356        sock = FakeSocket(body)
357        resp = client.HTTPResponse(sock, debuglevel=1)
358        with support.captured_stdout() as output:
359            resp.begin()
360        lines = output.getvalue().splitlines()
361        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
362        self.assertEqual(lines[1], "header: First: val")
363        self.assertEqual(lines[2], "header: Second: val1")
364        self.assertEqual(lines[3], "header: Second: val2")
365
366
367class TransferEncodingTest(TestCase):
368    expected_body = b"It's just a flesh wound"
369
370    def test_endheaders_chunked(self):
371        conn = client.HTTPConnection('example.com')
372        conn.sock = FakeSocket(b'')
373        conn.putrequest('POST', '/')
374        conn.endheaders(self._make_body(), encode_chunked=True)
375
376        _, _, body = self._parse_request(conn.sock.data)
377        body = self._parse_chunked(body)
378        self.assertEqual(body, self.expected_body)
379
380    def test_explicit_headers(self):
381        # explicit chunked
382        conn = client.HTTPConnection('example.com')
383        conn.sock = FakeSocket(b'')
384        # this shouldn't actually be automatically chunk-encoded because the
385        # calling code has explicitly stated that it's taking care of it
386        conn.request(
387            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
388
389        _, headers, body = self._parse_request(conn.sock.data)
390        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
391        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
392        self.assertEqual(body, self.expected_body)
393
394        # explicit chunked, string body
395        conn = client.HTTPConnection('example.com')
396        conn.sock = FakeSocket(b'')
397        conn.request(
398            'POST', '/', self.expected_body.decode('latin-1'),
399            {'Transfer-Encoding': 'chunked'})
400
401        _, headers, body = self._parse_request(conn.sock.data)
402        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
403        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
404        self.assertEqual(body, self.expected_body)
405
406        # User-specified TE, but request() does the chunk encoding
407        conn = client.HTTPConnection('example.com')
408        conn.sock = FakeSocket(b'')
409        conn.request('POST', '/',
410            headers={'Transfer-Encoding': 'gzip, chunked'},
411            encode_chunked=True,
412            body=self._make_body())
413        _, headers, body = self._parse_request(conn.sock.data)
414        self.assertNotIn('content-length', [k.lower() for k in headers])
415        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
416        self.assertEqual(self._parse_chunked(body), self.expected_body)
417
418    def test_request(self):
419        for empty_lines in (False, True,):
420            conn = client.HTTPConnection('example.com')
421            conn.sock = FakeSocket(b'')
422            conn.request(
423                'POST', '/', self._make_body(empty_lines=empty_lines))
424
425            _, headers, body = self._parse_request(conn.sock.data)
426            body = self._parse_chunked(body)
427            self.assertEqual(body, self.expected_body)
428            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
429
430            # Content-Length and Transfer-Encoding SHOULD not be sent in the
431            # same request
432            self.assertNotIn('content-length', [k.lower() for k in headers])
433
434    def test_empty_body(self):
435        # Zero-length iterable should be treated like any other iterable
436        conn = client.HTTPConnection('example.com')
437        conn.sock = FakeSocket(b'')
438        conn.request('POST', '/', ())
439        _, headers, body = self._parse_request(conn.sock.data)
440        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
441        self.assertNotIn('content-length', [k.lower() for k in headers])
442        self.assertEqual(body, b"0\r\n\r\n")
443
444    def _make_body(self, empty_lines=False):
445        lines = self.expected_body.split(b' ')
446        for idx, line in enumerate(lines):
447            # for testing handling empty lines
448            if empty_lines and idx % 2:
449                yield b''
450            if idx < len(lines) - 1:
451                yield line + b' '
452            else:
453                yield line
454
455    def _parse_request(self, data):
456        lines = data.split(b'\r\n')
457        request = lines[0]
458        headers = {}
459        n = 1
460        while n < len(lines) and len(lines[n]) > 0:
461            key, val = lines[n].split(b':')
462            key = key.decode('latin-1').strip()
463            headers[key] = val.decode('latin-1').strip()
464            n += 1
465
466        return request, headers, b'\r\n'.join(lines[n + 1:])
467
468    def _parse_chunked(self, data):
469        body = []
470        trailers = {}
471        n = 0
472        lines = data.split(b'\r\n')
473        # parse body
474        while True:
475            size, chunk = lines[n:n+2]
476            size = int(size, 16)
477
478            if size == 0:
479                n += 1
480                break
481
482            self.assertEqual(size, len(chunk))
483            body.append(chunk)
484
485            n += 2
486            # we /should/ hit the end chunk, but check against the size of
487            # lines so we're not stuck in an infinite loop should we get
488            # malformed data
489            if n > len(lines):
490                break
491
492        return b''.join(body)
493
494
495class BasicTest(TestCase):
496    def test_status_lines(self):
497        # Test HTTP status lines
498
499        body = "HTTP/1.1 200 Ok\r\n\r\nText"
500        sock = FakeSocket(body)
501        resp = client.HTTPResponse(sock)
502        resp.begin()
503        self.assertEqual(resp.read(0), b'')  # Issue #20007
504        self.assertFalse(resp.isclosed())
505        self.assertFalse(resp.closed)
506        self.assertEqual(resp.read(), b"Text")
507        self.assertTrue(resp.isclosed())
508        self.assertFalse(resp.closed)
509        resp.close()
510        self.assertTrue(resp.closed)
511
512        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
513        sock = FakeSocket(body)
514        resp = client.HTTPResponse(sock)
515        self.assertRaises(client.BadStatusLine, resp.begin)
516
517    def test_bad_status_repr(self):
518        exc = client.BadStatusLine('')
519        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
520
521    def test_partial_reads(self):
522        # if we have Content-Length, HTTPResponse knows when to close itself,
523        # the same behaviour as when we read the whole thing with read()
524        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
525        sock = FakeSocket(body)
526        resp = client.HTTPResponse(sock)
527        resp.begin()
528        self.assertEqual(resp.read(2), b'Te')
529        self.assertFalse(resp.isclosed())
530        self.assertEqual(resp.read(2), b'xt')
531        self.assertTrue(resp.isclosed())
532        self.assertFalse(resp.closed)
533        resp.close()
534        self.assertTrue(resp.closed)
535
536    def test_mixed_reads(self):
537        # readline() should update the remaining length, so that read() knows
538        # how much data is left and does not raise IncompleteRead
539        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
540        sock = FakeSocket(body)
541        resp = client.HTTPResponse(sock)
542        resp.begin()
543        self.assertEqual(resp.readline(), b'Text\r\n')
544        self.assertFalse(resp.isclosed())
545        self.assertEqual(resp.read(), b'Another')
546        self.assertTrue(resp.isclosed())
547        self.assertFalse(resp.closed)
548        resp.close()
549        self.assertTrue(resp.closed)
550
551    def test_partial_readintos(self):
552        # if we have Content-Length, HTTPResponse knows when to close itself,
553        # the same behaviour as when we read the whole thing with read()
554        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
555        sock = FakeSocket(body)
556        resp = client.HTTPResponse(sock)
557        resp.begin()
558        b = bytearray(2)
559        n = resp.readinto(b)
560        self.assertEqual(n, 2)
561        self.assertEqual(bytes(b), b'Te')
562        self.assertFalse(resp.isclosed())
563        n = resp.readinto(b)
564        self.assertEqual(n, 2)
565        self.assertEqual(bytes(b), b'xt')
566        self.assertTrue(resp.isclosed())
567        self.assertFalse(resp.closed)
568        resp.close()
569        self.assertTrue(resp.closed)
570
571    def test_partial_reads_no_content_length(self):
572        # when no length is present, the socket should be gracefully closed when
573        # all data was read
574        body = "HTTP/1.1 200 Ok\r\n\r\nText"
575        sock = FakeSocket(body)
576        resp = client.HTTPResponse(sock)
577        resp.begin()
578        self.assertEqual(resp.read(2), b'Te')
579        self.assertFalse(resp.isclosed())
580        self.assertEqual(resp.read(2), b'xt')
581        self.assertEqual(resp.read(1), b'')
582        self.assertTrue(resp.isclosed())
583        self.assertFalse(resp.closed)
584        resp.close()
585        self.assertTrue(resp.closed)
586
587    def test_partial_readintos_no_content_length(self):
588        # when no length is present, the socket should be gracefully closed when
589        # all data was read
590        body = "HTTP/1.1 200 Ok\r\n\r\nText"
591        sock = FakeSocket(body)
592        resp = client.HTTPResponse(sock)
593        resp.begin()
594        b = bytearray(2)
595        n = resp.readinto(b)
596        self.assertEqual(n, 2)
597        self.assertEqual(bytes(b), b'Te')
598        self.assertFalse(resp.isclosed())
599        n = resp.readinto(b)
600        self.assertEqual(n, 2)
601        self.assertEqual(bytes(b), b'xt')
602        n = resp.readinto(b)
603        self.assertEqual(n, 0)
604        self.assertTrue(resp.isclosed())
605
606    def test_partial_reads_incomplete_body(self):
607        # if the server shuts down the connection before the whole
608        # content-length is delivered, the socket is gracefully closed
609        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
610        sock = FakeSocket(body)
611        resp = client.HTTPResponse(sock)
612        resp.begin()
613        self.assertEqual(resp.read(2), b'Te')
614        self.assertFalse(resp.isclosed())
615        self.assertEqual(resp.read(2), b'xt')
616        self.assertEqual(resp.read(1), b'')
617        self.assertTrue(resp.isclosed())
618
619    def test_partial_readintos_incomplete_body(self):
620        # if the server shuts down the connection before the whole
621        # content-length is delivered, the socket is gracefully closed
622        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
623        sock = FakeSocket(body)
624        resp = client.HTTPResponse(sock)
625        resp.begin()
626        b = bytearray(2)
627        n = resp.readinto(b)
628        self.assertEqual(n, 2)
629        self.assertEqual(bytes(b), b'Te')
630        self.assertFalse(resp.isclosed())
631        n = resp.readinto(b)
632        self.assertEqual(n, 2)
633        self.assertEqual(bytes(b), b'xt')
634        n = resp.readinto(b)
635        self.assertEqual(n, 0)
636        self.assertTrue(resp.isclosed())
637        self.assertFalse(resp.closed)
638        resp.close()
639        self.assertTrue(resp.closed)
640
641    def test_host_port(self):
642        # Check invalid host_port
643
644        for hp in ("www.python.org:abc", "user:password@www.python.org"):
645            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
646
647        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
648                          "fe80::207:e9ff:fe9b", 8000),
649                         ("www.python.org:80", "www.python.org", 80),
650                         ("www.python.org:", "www.python.org", 80),
651                         ("www.python.org", "www.python.org", 80),
652                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
653                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
654            c = client.HTTPConnection(hp)
655            self.assertEqual(h, c.host)
656            self.assertEqual(p, c.port)
657
658    def test_response_headers(self):
659        # test response with multiple message headers with the same field name.
660        text = ('HTTP/1.1 200 OK\r\n'
661                'Set-Cookie: Customer="WILE_E_COYOTE"; '
662                'Version="1"; Path="/acme"\r\n'
663                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
664                ' Path="/acme"\r\n'
665                '\r\n'
666                'No body\r\n')
667        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
668               ', '
669               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
670        s = FakeSocket(text)
671        r = client.HTTPResponse(s)
672        r.begin()
673        cookies = r.getheader("Set-Cookie")
674        self.assertEqual(cookies, hdr)
675
676    def test_read_head(self):
677        # Test that the library doesn't attempt to read any data
678        # from a HEAD request.  (Tickles SF bug #622042.)
679        sock = FakeSocket(
680            'HTTP/1.1 200 OK\r\n'
681            'Content-Length: 14432\r\n'
682            '\r\n',
683            NoEOFBytesIO)
684        resp = client.HTTPResponse(sock, method="HEAD")
685        resp.begin()
686        if resp.read():
687            self.fail("Did not expect response from HEAD request")
688
689    def test_readinto_head(self):
690        # Test that the library doesn't attempt to read any data
691        # from a HEAD request.  (Tickles SF bug #622042.)
692        sock = FakeSocket(
693            'HTTP/1.1 200 OK\r\n'
694            'Content-Length: 14432\r\n'
695            '\r\n',
696            NoEOFBytesIO)
697        resp = client.HTTPResponse(sock, method="HEAD")
698        resp.begin()
699        b = bytearray(5)
700        if resp.readinto(b) != 0:
701            self.fail("Did not expect response from HEAD request")
702        self.assertEqual(bytes(b), b'\x00'*5)
703
704    def test_too_many_headers(self):
705        headers = '\r\n'.join('Header%d: foo' % i
706                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
707        text = ('HTTP/1.1 200 OK\r\n' + headers)
708        s = FakeSocket(text)
709        r = client.HTTPResponse(s)
710        self.assertRaisesRegex(client.HTTPException,
711                               r"got more than \d+ headers", r.begin)
712
713    def test_send_file(self):
714        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
715                    b'Accept-Encoding: identity\r\n'
716                    b'Transfer-Encoding: chunked\r\n'
717                    b'\r\n')
718
719        with open(__file__, 'rb') as body:
720            conn = client.HTTPConnection('example.com')
721            sock = FakeSocket(body)
722            conn.sock = sock
723            conn.request('GET', '/foo', body)
724            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
725                    (sock.data[:len(expected)], expected))
726
727    def test_send(self):
728        expected = b'this is a test this is only a test'
729        conn = client.HTTPConnection('example.com')
730        sock = FakeSocket(None)
731        conn.sock = sock
732        conn.send(expected)
733        self.assertEqual(expected, sock.data)
734        sock.data = b''
735        conn.send(array.array('b', expected))
736        self.assertEqual(expected, sock.data)
737        sock.data = b''
738        conn.send(io.BytesIO(expected))
739        self.assertEqual(expected, sock.data)
740
741    def test_send_updating_file(self):
742        def data():
743            yield 'data'
744            yield None
745            yield 'data_two'
746
747        class UpdatingFile(io.TextIOBase):
748            mode = 'r'
749            d = data()
750            def read(self, blocksize=-1):
751                return next(self.d)
752
753        expected = b'data'
754
755        conn = client.HTTPConnection('example.com')
756        sock = FakeSocket("")
757        conn.sock = sock
758        conn.send(UpdatingFile())
759        self.assertEqual(sock.data, expected)
760
761
762    def test_send_iter(self):
763        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
764                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
765                   b'\r\nonetwothree'
766
767        def body():
768            yield b"one"
769            yield b"two"
770            yield b"three"
771
772        conn = client.HTTPConnection('example.com')
773        sock = FakeSocket("")
774        conn.sock = sock
775        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
776        self.assertEqual(sock.data, expected)
777
778    def test_blocksize_request(self):
779        """Check that request() respects the configured block size."""
780        blocksize = 8  # For easy debugging.
781        conn = client.HTTPConnection('example.com', blocksize=blocksize)
782        sock = FakeSocket(None)
783        conn.sock = sock
784        expected = b"a" * blocksize + b"b"
785        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
786        self.assertEqual(sock.sendall_calls, 3)
787        body = sock.data.split(b"\r\n\r\n", 1)[1]
788        self.assertEqual(body, expected)
789
790    def test_blocksize_send(self):
791        """Check that send() respects the configured block size."""
792        blocksize = 8  # For easy debugging.
793        conn = client.HTTPConnection('example.com', blocksize=blocksize)
794        sock = FakeSocket(None)
795        conn.sock = sock
796        expected = b"a" * blocksize + b"b"
797        conn.send(io.BytesIO(expected))
798        self.assertEqual(sock.sendall_calls, 2)
799        self.assertEqual(sock.data, expected)
800
801    def test_send_type_error(self):
802        # See: Issue #12676
803        conn = client.HTTPConnection('example.com')
804        conn.sock = FakeSocket('')
805        with self.assertRaises(TypeError):
806            conn.request('POST', 'test', conn)
807
808    def test_chunked(self):
809        expected = chunked_expected
810        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
811        resp = client.HTTPResponse(sock, method="GET")
812        resp.begin()
813        self.assertEqual(resp.read(), expected)
814        resp.close()
815
816        # Various read sizes
817        for n in range(1, 12):
818            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
819            resp = client.HTTPResponse(sock, method="GET")
820            resp.begin()
821            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
822            resp.close()
823
824        for x in ('', 'foo\r\n'):
825            sock = FakeSocket(chunked_start + x)
826            resp = client.HTTPResponse(sock, method="GET")
827            resp.begin()
828            try:
829                resp.read()
830            except client.IncompleteRead as i:
831                self.assertEqual(i.partial, expected)
832                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
833                self.assertEqual(repr(i), expected_message)
834                self.assertEqual(str(i), expected_message)
835            else:
836                self.fail('IncompleteRead expected')
837            finally:
838                resp.close()
839
840    def test_readinto_chunked(self):
841
842        expected = chunked_expected
843        nexpected = len(expected)
844        b = bytearray(128)
845
846        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
847        resp = client.HTTPResponse(sock, method="GET")
848        resp.begin()
849        n = resp.readinto(b)
850        self.assertEqual(b[:nexpected], expected)
851        self.assertEqual(n, nexpected)
852        resp.close()
853
854        # Various read sizes
855        for n in range(1, 12):
856            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
857            resp = client.HTTPResponse(sock, method="GET")
858            resp.begin()
859            m = memoryview(b)
860            i = resp.readinto(m[0:n])
861            i += resp.readinto(m[i:n + i])
862            i += resp.readinto(m[i:])
863            self.assertEqual(b[:nexpected], expected)
864            self.assertEqual(i, nexpected)
865            resp.close()
866
867        for x in ('', 'foo\r\n'):
868            sock = FakeSocket(chunked_start + x)
869            resp = client.HTTPResponse(sock, method="GET")
870            resp.begin()
871            try:
872                n = resp.readinto(b)
873            except client.IncompleteRead as i:
874                self.assertEqual(i.partial, expected)
875                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
876                self.assertEqual(repr(i), expected_message)
877                self.assertEqual(str(i), expected_message)
878            else:
879                self.fail('IncompleteRead expected')
880            finally:
881                resp.close()
882
883    def test_chunked_head(self):
884        chunked_start = (
885            'HTTP/1.1 200 OK\r\n'
886            'Transfer-Encoding: chunked\r\n\r\n'
887            'a\r\n'
888            'hello world\r\n'
889            '1\r\n'
890            'd\r\n'
891        )
892        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
893        resp = client.HTTPResponse(sock, method="HEAD")
894        resp.begin()
895        self.assertEqual(resp.read(), b'')
896        self.assertEqual(resp.status, 200)
897        self.assertEqual(resp.reason, 'OK')
898        self.assertTrue(resp.isclosed())
899        self.assertFalse(resp.closed)
900        resp.close()
901        self.assertTrue(resp.closed)
902
903    def test_readinto_chunked_head(self):
904        chunked_start = (
905            'HTTP/1.1 200 OK\r\n'
906            'Transfer-Encoding: chunked\r\n\r\n'
907            'a\r\n'
908            'hello world\r\n'
909            '1\r\n'
910            'd\r\n'
911        )
912        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
913        resp = client.HTTPResponse(sock, method="HEAD")
914        resp.begin()
915        b = bytearray(5)
916        n = resp.readinto(b)
917        self.assertEqual(n, 0)
918        self.assertEqual(bytes(b), b'\x00'*5)
919        self.assertEqual(resp.status, 200)
920        self.assertEqual(resp.reason, 'OK')
921        self.assertTrue(resp.isclosed())
922        self.assertFalse(resp.closed)
923        resp.close()
924        self.assertTrue(resp.closed)
925
926    def test_negative_content_length(self):
927        sock = FakeSocket(
928            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
929        resp = client.HTTPResponse(sock, method="GET")
930        resp.begin()
931        self.assertEqual(resp.read(), b'Hello\r\n')
932        self.assertTrue(resp.isclosed())
933
934    def test_incomplete_read(self):
935        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
936        resp = client.HTTPResponse(sock, method="GET")
937        resp.begin()
938        try:
939            resp.read()
940        except client.IncompleteRead as i:
941            self.assertEqual(i.partial, b'Hello\r\n')
942            self.assertEqual(repr(i),
943                             "IncompleteRead(7 bytes read, 3 more expected)")
944            self.assertEqual(str(i),
945                             "IncompleteRead(7 bytes read, 3 more expected)")
946            self.assertTrue(resp.isclosed())
947        else:
948            self.fail('IncompleteRead expected')
949
950    def test_epipe(self):
951        sock = EPipeSocket(
952            "HTTP/1.0 401 Authorization Required\r\n"
953            "Content-type: text/html\r\n"
954            "WWW-Authenticate: Basic realm=\"example\"\r\n",
955            b"Content-Length")
956        conn = client.HTTPConnection("example.com")
957        conn.sock = sock
958        self.assertRaises(OSError,
959                          lambda: conn.request("PUT", "/url", "body"))
960        resp = conn.getresponse()
961        self.assertEqual(401, resp.status)
962        self.assertEqual("Basic realm=\"example\"",
963                         resp.getheader("www-authenticate"))
964
965    # Test lines overflowing the max line size (_MAXLINE in http.client)
966
967    def test_overflowing_status_line(self):
968        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
969        resp = client.HTTPResponse(FakeSocket(body))
970        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
971
972    def test_overflowing_header_line(self):
973        body = (
974            'HTTP/1.1 200 OK\r\n'
975            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
976        )
977        resp = client.HTTPResponse(FakeSocket(body))
978        self.assertRaises(client.LineTooLong, resp.begin)
979
980    def test_overflowing_chunked_line(self):
981        body = (
982            'HTTP/1.1 200 OK\r\n'
983            'Transfer-Encoding: chunked\r\n\r\n'
984            + '0' * 65536 + 'a\r\n'
985            'hello world\r\n'
986            '0\r\n'
987            '\r\n'
988        )
989        resp = client.HTTPResponse(FakeSocket(body))
990        resp.begin()
991        self.assertRaises(client.LineTooLong, resp.read)
992
993    def test_early_eof(self):
994        # Test httpresponse with no \r\n termination,
995        body = "HTTP/1.1 200 Ok"
996        sock = FakeSocket(body)
997        resp = client.HTTPResponse(sock)
998        resp.begin()
999        self.assertEqual(resp.read(), b'')
1000        self.assertTrue(resp.isclosed())
1001        self.assertFalse(resp.closed)
1002        resp.close()
1003        self.assertTrue(resp.closed)
1004
1005    def test_error_leak(self):
1006        # Test that the socket is not leaked if getresponse() fails
1007        conn = client.HTTPConnection('example.com')
1008        response = None
1009        class Response(client.HTTPResponse):
1010            def __init__(self, *pos, **kw):
1011                nonlocal response
1012                response = self  # Avoid garbage collector closing the socket
1013                client.HTTPResponse.__init__(self, *pos, **kw)
1014        conn.response_class = Response
1015        conn.sock = FakeSocket('Invalid status line')
1016        conn.request('GET', '/')
1017        self.assertRaises(client.BadStatusLine, conn.getresponse)
1018        self.assertTrue(response.closed)
1019        self.assertTrue(conn.sock.file_closed)
1020
1021    def test_chunked_extension(self):
1022        extra = '3;foo=bar\r\n' + 'abc\r\n'
1023        expected = chunked_expected + b'abc'
1024
1025        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1026        resp = client.HTTPResponse(sock, method="GET")
1027        resp.begin()
1028        self.assertEqual(resp.read(), expected)
1029        resp.close()
1030
1031    def test_chunked_missing_end(self):
1032        """some servers may serve up a short chunked encoding stream"""
1033        expected = chunked_expected
1034        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1035        resp = client.HTTPResponse(sock, method="GET")
1036        resp.begin()
1037        self.assertEqual(resp.read(), expected)
1038        resp.close()
1039
1040    def test_chunked_trailers(self):
1041        """See that trailers are read and ignored"""
1042        expected = chunked_expected
1043        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1044        resp = client.HTTPResponse(sock, method="GET")
1045        resp.begin()
1046        self.assertEqual(resp.read(), expected)
1047        # we should have reached the end of the file
1048        self.assertEqual(sock.file.read(), b"") #we read to the end
1049        resp.close()
1050
1051    def test_chunked_sync(self):
1052        """Check that we don't read past the end of the chunked-encoding stream"""
1053        expected = chunked_expected
1054        extradata = "extradata"
1055        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1056        resp = client.HTTPResponse(sock, method="GET")
1057        resp.begin()
1058        self.assertEqual(resp.read(), expected)
1059        # the file should now have our extradata ready to be read
1060        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1061        resp.close()
1062
1063    def test_content_length_sync(self):
1064        """Check that we don't read past the end of the Content-Length stream"""
1065        extradata = b"extradata"
1066        expected = b"Hello123\r\n"
1067        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1068        resp = client.HTTPResponse(sock, method="GET")
1069        resp.begin()
1070        self.assertEqual(resp.read(), expected)
1071        # the file should now have our extradata ready to be read
1072        self.assertEqual(sock.file.read(), extradata) #we read to the end
1073        resp.close()
1074
1075    def test_readlines_content_length(self):
1076        extradata = b"extradata"
1077        expected = b"Hello123\r\n"
1078        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1079        resp = client.HTTPResponse(sock, method="GET")
1080        resp.begin()
1081        self.assertEqual(resp.readlines(2000), [expected])
1082        # the file should now have our extradata ready to be read
1083        self.assertEqual(sock.file.read(), extradata) #we read to the end
1084        resp.close()
1085
1086    def test_read1_content_length(self):
1087        extradata = b"extradata"
1088        expected = b"Hello123\r\n"
1089        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1090        resp = client.HTTPResponse(sock, method="GET")
1091        resp.begin()
1092        self.assertEqual(resp.read1(2000), expected)
1093        # the file should now have our extradata ready to be read
1094        self.assertEqual(sock.file.read(), extradata) #we read to the end
1095        resp.close()
1096
1097    def test_readline_bound_content_length(self):
1098        extradata = b"extradata"
1099        expected = b"Hello123\r\n"
1100        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1101        resp = client.HTTPResponse(sock, method="GET")
1102        resp.begin()
1103        self.assertEqual(resp.readline(10), expected)
1104        self.assertEqual(resp.readline(10), b"")
1105        # the file should now have our extradata ready to be read
1106        self.assertEqual(sock.file.read(), extradata) #we read to the end
1107        resp.close()
1108
1109    def test_read1_bound_content_length(self):
1110        extradata = b"extradata"
1111        expected = b"Hello123\r\n"
1112        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1113        resp = client.HTTPResponse(sock, method="GET")
1114        resp.begin()
1115        self.assertEqual(resp.read1(20), expected*2)
1116        self.assertEqual(resp.read(), expected)
1117        # the file should now have our extradata ready to be read
1118        self.assertEqual(sock.file.read(), extradata) #we read to the end
1119        resp.close()
1120
1121    def test_response_fileno(self):
1122        # Make sure fd returned by fileno is valid.
1123        serv = socket.create_server((HOST, 0))
1124        self.addCleanup(serv.close)
1125
1126        result = None
1127        def run_server():
1128            [conn, address] = serv.accept()
1129            with conn, conn.makefile("rb") as reader:
1130                # Read the request header until a blank line
1131                while True:
1132                    line = reader.readline()
1133                    if not line.rstrip(b"\r\n"):
1134                        break
1135                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1136                nonlocal result
1137                result = reader.read()
1138
1139        thread = threading.Thread(target=run_server)
1140        thread.start()
1141        self.addCleanup(thread.join, float(1))
1142        conn = client.HTTPConnection(*serv.getsockname())
1143        conn.request("CONNECT", "dummy:1234")
1144        response = conn.getresponse()
1145        try:
1146            self.assertEqual(response.status, client.OK)
1147            s = socket.socket(fileno=response.fileno())
1148            try:
1149                s.sendall(b"proxied data\n")
1150            finally:
1151                s.detach()
1152        finally:
1153            response.close()
1154            conn.close()
1155        thread.join()
1156        self.assertEqual(result, b"proxied data\n")
1157
1158    def test_putrequest_override_validation(self):
1159        """
1160        It should be possible to override the default validation
1161        behavior in putrequest (bpo-38216).
1162        """
1163        class UnsafeHTTPConnection(client.HTTPConnection):
1164            def _validate_path(self, url):
1165                pass
1166
1167        conn = UnsafeHTTPConnection('example.com')
1168        conn.sock = FakeSocket('')
1169        conn.putrequest('GET', '/\x00')
1170
1171    def test_putrequest_override_encoding(self):
1172        """
1173        It should be possible to override the default encoding
1174        to transmit bytes in another encoding even if invalid
1175        (bpo-36274).
1176        """
1177        class UnsafeHTTPConnection(client.HTTPConnection):
1178            def _encode_request(self, str_url):
1179                return str_url.encode('utf-8')
1180
1181        conn = UnsafeHTTPConnection('example.com')
1182        conn.sock = FakeSocket('')
1183        conn.putrequest('GET', '/☃')
1184
1185
1186class ExtendedReadTest(TestCase):
1187    """
1188    Test peek(), read1(), readline()
1189    """
1190    lines = (
1191        'HTTP/1.1 200 OK\r\n'
1192        '\r\n'
1193        'hello world!\n'
1194        'and now \n'
1195        'for something completely different\n'
1196        'foo'
1197        )
1198    lines_expected = lines[lines.find('hello'):].encode("ascii")
1199    lines_chunked = (
1200        'HTTP/1.1 200 OK\r\n'
1201        'Transfer-Encoding: chunked\r\n\r\n'
1202        'a\r\n'
1203        'hello worl\r\n'
1204        '3\r\n'
1205        'd!\n\r\n'
1206        '9\r\n'
1207        'and now \n\r\n'
1208        '23\r\n'
1209        'for something completely different\n\r\n'
1210        '3\r\n'
1211        'foo\r\n'
1212        '0\r\n' # terminating chunk
1213        '\r\n'  # end of trailers
1214    )
1215
1216    def setUp(self):
1217        sock = FakeSocket(self.lines)
1218        resp = client.HTTPResponse(sock, method="GET")
1219        resp.begin()
1220        resp.fp = io.BufferedReader(resp.fp)
1221        self.resp = resp
1222
1223
1224
1225    def test_peek(self):
1226        resp = self.resp
1227        # patch up the buffered peek so that it returns not too much stuff
1228        oldpeek = resp.fp.peek
1229        def mypeek(n=-1):
1230            p = oldpeek(n)
1231            if n >= 0:
1232                return p[:n]
1233            return p[:10]
1234        resp.fp.peek = mypeek
1235
1236        all = []
1237        while True:
1238            # try a short peek
1239            p = resp.peek(3)
1240            if p:
1241                self.assertGreater(len(p), 0)
1242                # then unbounded peek
1243                p2 = resp.peek()
1244                self.assertGreaterEqual(len(p2), len(p))
1245                self.assertTrue(p2.startswith(p))
1246                next = resp.read(len(p2))
1247                self.assertEqual(next, p2)
1248            else:
1249                next = resp.read()
1250                self.assertFalse(next)
1251            all.append(next)
1252            if not next:
1253                break
1254        self.assertEqual(b"".join(all), self.lines_expected)
1255
1256    def test_readline(self):
1257        resp = self.resp
1258        self._verify_readline(self.resp.readline, self.lines_expected)
1259
1260    def _verify_readline(self, readline, expected):
1261        all = []
1262        while True:
1263            # short readlines
1264            line = readline(5)
1265            if line and line != b"foo":
1266                if len(line) < 5:
1267                    self.assertTrue(line.endswith(b"\n"))
1268            all.append(line)
1269            if not line:
1270                break
1271        self.assertEqual(b"".join(all), expected)
1272
1273    def test_read1(self):
1274        resp = self.resp
1275        def r():
1276            res = resp.read1(4)
1277            self.assertLessEqual(len(res), 4)
1278            return res
1279        readliner = Readliner(r)
1280        self._verify_readline(readliner.readline, self.lines_expected)
1281
1282    def test_read1_unbounded(self):
1283        resp = self.resp
1284        all = []
1285        while True:
1286            data = resp.read1()
1287            if not data:
1288                break
1289            all.append(data)
1290        self.assertEqual(b"".join(all), self.lines_expected)
1291
1292    def test_read1_bounded(self):
1293        resp = self.resp
1294        all = []
1295        while True:
1296            data = resp.read1(10)
1297            if not data:
1298                break
1299            self.assertLessEqual(len(data), 10)
1300            all.append(data)
1301        self.assertEqual(b"".join(all), self.lines_expected)
1302
1303    def test_read1_0(self):
1304        self.assertEqual(self.resp.read1(0), b"")
1305
1306    def test_peek_0(self):
1307        p = self.resp.peek(0)
1308        self.assertLessEqual(0, len(p))
1309
1310
1311class ExtendedReadTestChunked(ExtendedReadTest):
1312    """
1313    Test peek(), read1(), readline() in chunked mode
1314    """
1315    lines = (
1316        'HTTP/1.1 200 OK\r\n'
1317        'Transfer-Encoding: chunked\r\n\r\n'
1318        'a\r\n'
1319        'hello worl\r\n'
1320        '3\r\n'
1321        'd!\n\r\n'
1322        '9\r\n'
1323        'and now \n\r\n'
1324        '23\r\n'
1325        'for something completely different\n\r\n'
1326        '3\r\n'
1327        'foo\r\n'
1328        '0\r\n' # terminating chunk
1329        '\r\n'  # end of trailers
1330    )
1331
1332
1333class Readliner:
1334    """
1335    a simple readline class that uses an arbitrary read function and buffering
1336    """
1337    def __init__(self, readfunc):
1338        self.readfunc = readfunc
1339        self.remainder = b""
1340
1341    def readline(self, limit):
1342        data = []
1343        datalen = 0
1344        read = self.remainder
1345        try:
1346            while True:
1347                idx = read.find(b'\n')
1348                if idx != -1:
1349                    break
1350                if datalen + len(read) >= limit:
1351                    idx = limit - datalen - 1
1352                # read more data
1353                data.append(read)
1354                read = self.readfunc()
1355                if not read:
1356                    idx = 0 #eof condition
1357                    break
1358            idx += 1
1359            data.append(read[:idx])
1360            self.remainder = read[idx:]
1361            return b"".join(data)
1362        except:
1363            self.remainder = b"".join(data)
1364            raise
1365
1366
1367class OfflineTest(TestCase):
1368    def test_all(self):
1369        # Documented objects defined in the module should be in __all__
1370        expected = {"responses"}  # White-list documented dict() object
1371        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1372        # intentionally omitted for simplicity
1373        blacklist = {"HTTPMessage", "parse_headers"}
1374        for name in dir(client):
1375            if name.startswith("_") or name in blacklist:
1376                continue
1377            module_object = getattr(client, name)
1378            if getattr(module_object, "__module__", None) == "http.client":
1379                expected.add(name)
1380        self.assertCountEqual(client.__all__, expected)
1381
1382    def test_responses(self):
1383        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1384
1385    def test_client_constants(self):
1386        # Make sure we don't break backward compatibility with 3.4
1387        expected = [
1388            'CONTINUE',
1389            'SWITCHING_PROTOCOLS',
1390            'PROCESSING',
1391            'OK',
1392            'CREATED',
1393            'ACCEPTED',
1394            'NON_AUTHORITATIVE_INFORMATION',
1395            'NO_CONTENT',
1396            'RESET_CONTENT',
1397            'PARTIAL_CONTENT',
1398            'MULTI_STATUS',
1399            'IM_USED',
1400            'MULTIPLE_CHOICES',
1401            'MOVED_PERMANENTLY',
1402            'FOUND',
1403            'SEE_OTHER',
1404            'NOT_MODIFIED',
1405            'USE_PROXY',
1406            'TEMPORARY_REDIRECT',
1407            'BAD_REQUEST',
1408            'UNAUTHORIZED',
1409            'PAYMENT_REQUIRED',
1410            'FORBIDDEN',
1411            'NOT_FOUND',
1412            'METHOD_NOT_ALLOWED',
1413            'NOT_ACCEPTABLE',
1414            'PROXY_AUTHENTICATION_REQUIRED',
1415            'REQUEST_TIMEOUT',
1416            'CONFLICT',
1417            'GONE',
1418            'LENGTH_REQUIRED',
1419            'PRECONDITION_FAILED',
1420            'REQUEST_ENTITY_TOO_LARGE',
1421            'REQUEST_URI_TOO_LONG',
1422            'UNSUPPORTED_MEDIA_TYPE',
1423            'REQUESTED_RANGE_NOT_SATISFIABLE',
1424            'EXPECTATION_FAILED',
1425            'MISDIRECTED_REQUEST',
1426            'UNPROCESSABLE_ENTITY',
1427            'LOCKED',
1428            'FAILED_DEPENDENCY',
1429            'UPGRADE_REQUIRED',
1430            'PRECONDITION_REQUIRED',
1431            'TOO_MANY_REQUESTS',
1432            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1433            'UNAVAILABLE_FOR_LEGAL_REASONS',
1434            'INTERNAL_SERVER_ERROR',
1435            'NOT_IMPLEMENTED',
1436            'BAD_GATEWAY',
1437            'SERVICE_UNAVAILABLE',
1438            'GATEWAY_TIMEOUT',
1439            'HTTP_VERSION_NOT_SUPPORTED',
1440            'INSUFFICIENT_STORAGE',
1441            'NOT_EXTENDED',
1442            'NETWORK_AUTHENTICATION_REQUIRED',
1443        ]
1444        for const in expected:
1445            with self.subTest(constant=const):
1446                self.assertTrue(hasattr(client, const))
1447
1448
1449class SourceAddressTest(TestCase):
1450    def setUp(self):
1451        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1452        self.port = support.bind_port(self.serv)
1453        self.source_port = support.find_unused_port()
1454        self.serv.listen()
1455        self.conn = None
1456
1457    def tearDown(self):
1458        if self.conn:
1459            self.conn.close()
1460            self.conn = None
1461        self.serv.close()
1462        self.serv = None
1463
1464    def testHTTPConnectionSourceAddress(self):
1465        self.conn = client.HTTPConnection(HOST, self.port,
1466                source_address=('', self.source_port))
1467        self.conn.connect()
1468        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1469
1470    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1471                     'http.client.HTTPSConnection not defined')
1472    def testHTTPSConnectionSourceAddress(self):
1473        self.conn = client.HTTPSConnection(HOST, self.port,
1474                source_address=('', self.source_port))
1475        # We don't test anything here other than the constructor not barfing as
1476        # this code doesn't deal with setting up an active running SSL server
1477        # for an ssl_wrapped connect() to actually return from.
1478
1479
1480class TimeoutTest(TestCase):
1481    PORT = None
1482
1483    def setUp(self):
1484        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1485        TimeoutTest.PORT = support.bind_port(self.serv)
1486        self.serv.listen()
1487
1488    def tearDown(self):
1489        self.serv.close()
1490        self.serv = None
1491
1492    def testTimeoutAttribute(self):
1493        # This will prove that the timeout gets through HTTPConnection
1494        # and into the socket.
1495
1496        # default -- use global socket timeout
1497        self.assertIsNone(socket.getdefaulttimeout())
1498        socket.setdefaulttimeout(30)
1499        try:
1500            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1501            httpConn.connect()
1502        finally:
1503            socket.setdefaulttimeout(None)
1504        self.assertEqual(httpConn.sock.gettimeout(), 30)
1505        httpConn.close()
1506
1507        # no timeout -- do not use global socket default
1508        self.assertIsNone(socket.getdefaulttimeout())
1509        socket.setdefaulttimeout(30)
1510        try:
1511            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1512                                              timeout=None)
1513            httpConn.connect()
1514        finally:
1515            socket.setdefaulttimeout(None)
1516        self.assertEqual(httpConn.sock.gettimeout(), None)
1517        httpConn.close()
1518
1519        # a value
1520        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1521        httpConn.connect()
1522        self.assertEqual(httpConn.sock.gettimeout(), 30)
1523        httpConn.close()
1524
1525
1526class PersistenceTest(TestCase):
1527
1528    def test_reuse_reconnect(self):
1529        # Should reuse or reconnect depending on header from server
1530        tests = (
1531            ('1.0', '', False),
1532            ('1.0', 'Connection: keep-alive\r\n', True),
1533            ('1.1', '', True),
1534            ('1.1', 'Connection: close\r\n', False),
1535            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1536            ('1.1', 'Connection: cloSE\r\n', False),
1537        )
1538        for version, header, reuse in tests:
1539            with self.subTest(version=version, header=header):
1540                msg = (
1541                    'HTTP/{} 200 OK\r\n'
1542                    '{}'
1543                    'Content-Length: 12\r\n'
1544                    '\r\n'
1545                    'Dummy body\r\n'
1546                ).format(version, header)
1547                conn = FakeSocketHTTPConnection(msg)
1548                self.assertIsNone(conn.sock)
1549                conn.request('GET', '/open-connection')
1550                with conn.getresponse() as response:
1551                    self.assertEqual(conn.sock is None, not reuse)
1552                    response.read()
1553                self.assertEqual(conn.sock is None, not reuse)
1554                self.assertEqual(conn.connections, 1)
1555                conn.request('GET', '/subsequent-request')
1556                self.assertEqual(conn.connections, 1 if reuse else 2)
1557
1558    def test_disconnected(self):
1559
1560        def make_reset_reader(text):
1561            """Return BufferedReader that raises ECONNRESET at EOF"""
1562            stream = io.BytesIO(text)
1563            def readinto(buffer):
1564                size = io.BytesIO.readinto(stream, buffer)
1565                if size == 0:
1566                    raise ConnectionResetError()
1567                return size
1568            stream.readinto = readinto
1569            return io.BufferedReader(stream)
1570
1571        tests = (
1572            (io.BytesIO, client.RemoteDisconnected),
1573            (make_reset_reader, ConnectionResetError),
1574        )
1575        for stream_factory, exception in tests:
1576            with self.subTest(exception=exception):
1577                conn = FakeSocketHTTPConnection(b'', stream_factory)
1578                conn.request('GET', '/eof-response')
1579                self.assertRaises(exception, conn.getresponse)
1580                self.assertIsNone(conn.sock)
1581                # HTTPConnection.connect() should be automatically invoked
1582                conn.request('GET', '/reconnect')
1583                self.assertEqual(conn.connections, 2)
1584
1585    def test_100_close(self):
1586        conn = FakeSocketHTTPConnection(
1587            b'HTTP/1.1 100 Continue\r\n'
1588            b'\r\n'
1589            # Missing final response
1590        )
1591        conn.request('GET', '/', headers={'Expect': '100-continue'})
1592        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1593        self.assertIsNone(conn.sock)
1594        conn.request('GET', '/reconnect')
1595        self.assertEqual(conn.connections, 2)
1596
1597
1598class HTTPSTest(TestCase):
1599
1600    def setUp(self):
1601        if not hasattr(client, 'HTTPSConnection'):
1602            self.skipTest('ssl support required')
1603
1604    def make_server(self, certfile):
1605        from test.ssl_servers import make_https_server
1606        return make_https_server(self, certfile=certfile)
1607
1608    def test_attributes(self):
1609        # simple test to check it's storing the timeout
1610        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1611        self.assertEqual(h.timeout, 30)
1612
1613    def test_networked(self):
1614        # Default settings: requires a valid cert from a trusted CA
1615        import ssl
1616        support.requires('network')
1617        with support.transient_internet('self-signed.pythontest.net'):
1618            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1619            with self.assertRaises(ssl.SSLError) as exc_info:
1620                h.request('GET', '/')
1621            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1622
1623    def test_networked_noverification(self):
1624        # Switch off cert verification
1625        import ssl
1626        support.requires('network')
1627        with support.transient_internet('self-signed.pythontest.net'):
1628            context = ssl._create_unverified_context()
1629            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1630                                       context=context)
1631            h.request('GET', '/')
1632            resp = h.getresponse()
1633            h.close()
1634            self.assertIn('nginx', resp.getheader('server'))
1635            resp.close()
1636
1637    @support.system_must_validate_cert
1638    def test_networked_trusted_by_default_cert(self):
1639        # Default settings: requires a valid cert from a trusted CA
1640        support.requires('network')
1641        with support.transient_internet('www.python.org'):
1642            h = client.HTTPSConnection('www.python.org', 443)
1643            h.request('GET', '/')
1644            resp = h.getresponse()
1645            content_type = resp.getheader('content-type')
1646            resp.close()
1647            h.close()
1648            self.assertIn('text/html', content_type)
1649
1650    def test_networked_good_cert(self):
1651        # We feed the server's cert as a validating cert
1652        import ssl
1653        support.requires('network')
1654        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1655        with support.transient_internet(selfsigned_pythontestdotnet):
1656            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1657            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1658            self.assertEqual(context.check_hostname, True)
1659            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1660            try:
1661                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1662                                           context=context)
1663                h.request('GET', '/')
1664                resp = h.getresponse()
1665            except ssl.SSLError as ssl_err:
1666                ssl_err_str = str(ssl_err)
1667                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1668                # modern Linux distros (Debian Buster, etc) default OpenSSL
1669                # configurations it'll fail saying "key too weak" until we
1670                # address https://bugs.python.org/issue36816 to use a proper
1671                # key size on self-signed.pythontest.net.
1672                if re.search(r'(?i)key.too.weak', ssl_err_str):
1673                    raise unittest.SkipTest(
1674                        f'Got {ssl_err_str} trying to connect '
1675                        f'to {selfsigned_pythontestdotnet}. '
1676                        'See https://bugs.python.org/issue36816.')
1677                raise
1678            server_string = resp.getheader('server')
1679            resp.close()
1680            h.close()
1681            self.assertIn('nginx', server_string)
1682
1683    def test_networked_bad_cert(self):
1684        # We feed a "CA" cert that is unrelated to the server's cert
1685        import ssl
1686        support.requires('network')
1687        with support.transient_internet('self-signed.pythontest.net'):
1688            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1689            context.load_verify_locations(CERT_localhost)
1690            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1691            with self.assertRaises(ssl.SSLError) as exc_info:
1692                h.request('GET', '/')
1693            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1694
1695    def test_local_unknown_cert(self):
1696        # The custom cert isn't known to the default trust bundle
1697        import ssl
1698        server = self.make_server(CERT_localhost)
1699        h = client.HTTPSConnection('localhost', server.port)
1700        with self.assertRaises(ssl.SSLError) as exc_info:
1701            h.request('GET', '/')
1702        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1703
1704    def test_local_good_hostname(self):
1705        # The (valid) cert validates the HTTP hostname
1706        import ssl
1707        server = self.make_server(CERT_localhost)
1708        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1709        context.load_verify_locations(CERT_localhost)
1710        h = client.HTTPSConnection('localhost', server.port, context=context)
1711        self.addCleanup(h.close)
1712        h.request('GET', '/nonexistent')
1713        resp = h.getresponse()
1714        self.addCleanup(resp.close)
1715        self.assertEqual(resp.status, 404)
1716
1717    def test_local_bad_hostname(self):
1718        # The (valid) cert doesn't validate the HTTP hostname
1719        import ssl
1720        server = self.make_server(CERT_fakehostname)
1721        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1722        context.load_verify_locations(CERT_fakehostname)
1723        h = client.HTTPSConnection('localhost', server.port, context=context)
1724        with self.assertRaises(ssl.CertificateError):
1725            h.request('GET', '/')
1726        # Same with explicit check_hostname=True
1727        with support.check_warnings(('', DeprecationWarning)):
1728            h = client.HTTPSConnection('localhost', server.port,
1729                                       context=context, check_hostname=True)
1730        with self.assertRaises(ssl.CertificateError):
1731            h.request('GET', '/')
1732        # With check_hostname=False, the mismatching is ignored
1733        context.check_hostname = False
1734        with support.check_warnings(('', DeprecationWarning)):
1735            h = client.HTTPSConnection('localhost', server.port,
1736                                       context=context, check_hostname=False)
1737        h.request('GET', '/nonexistent')
1738        resp = h.getresponse()
1739        resp.close()
1740        h.close()
1741        self.assertEqual(resp.status, 404)
1742        # The context's check_hostname setting is used if one isn't passed to
1743        # HTTPSConnection.
1744        context.check_hostname = False
1745        h = client.HTTPSConnection('localhost', server.port, context=context)
1746        h.request('GET', '/nonexistent')
1747        resp = h.getresponse()
1748        self.assertEqual(resp.status, 404)
1749        resp.close()
1750        h.close()
1751        # Passing check_hostname to HTTPSConnection should override the
1752        # context's setting.
1753        with support.check_warnings(('', DeprecationWarning)):
1754            h = client.HTTPSConnection('localhost', server.port,
1755                                       context=context, check_hostname=True)
1756        with self.assertRaises(ssl.CertificateError):
1757            h.request('GET', '/')
1758
1759    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1760                     'http.client.HTTPSConnection not available')
1761    def test_host_port(self):
1762        # Check invalid host_port
1763
1764        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1765            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1766
1767        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1768                          "fe80::207:e9ff:fe9b", 8000),
1769                         ("www.python.org:443", "www.python.org", 443),
1770                         ("www.python.org:", "www.python.org", 443),
1771                         ("www.python.org", "www.python.org", 443),
1772                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1773                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1774                             443)):
1775            c = client.HTTPSConnection(hp)
1776            self.assertEqual(h, c.host)
1777            self.assertEqual(p, c.port)
1778
1779    def test_tls13_pha(self):
1780        import ssl
1781        if not ssl.HAS_TLSv1_3:
1782            self.skipTest('TLS 1.3 support required')
1783        # just check status of PHA flag
1784        h = client.HTTPSConnection('localhost', 443)
1785        self.assertTrue(h._context.post_handshake_auth)
1786
1787        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1788        self.assertFalse(context.post_handshake_auth)
1789        h = client.HTTPSConnection('localhost', 443, context=context)
1790        self.assertIs(h._context, context)
1791        self.assertFalse(h._context.post_handshake_auth)
1792
1793        with warnings.catch_warnings():
1794            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1795                                    DeprecationWarning)
1796            h = client.HTTPSConnection('localhost', 443, context=context,
1797                                       cert_file=CERT_localhost)
1798        self.assertTrue(h._context.post_handshake_auth)
1799
1800
1801class RequestBodyTest(TestCase):
1802    """Test cases where a request includes a message body."""
1803
1804    def setUp(self):
1805        self.conn = client.HTTPConnection('example.com')
1806        self.conn.sock = self.sock = FakeSocket("")
1807        self.conn.sock = self.sock
1808
1809    def get_headers_and_fp(self):
1810        f = io.BytesIO(self.sock.data)
1811        f.readline()  # read the request line
1812        message = client.parse_headers(f)
1813        return message, f
1814
1815    def test_list_body(self):
1816        # Note that no content-length is automatically calculated for
1817        # an iterable.  The request will fall back to send chunked
1818        # transfer encoding.
1819        cases = (
1820            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1821            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1822        )
1823        for body, expected in cases:
1824            with self.subTest(body):
1825                self.conn = client.HTTPConnection('example.com')
1826                self.conn.sock = self.sock = FakeSocket('')
1827
1828                self.conn.request('PUT', '/url', body)
1829                msg, f = self.get_headers_and_fp()
1830                self.assertNotIn('Content-Type', msg)
1831                self.assertNotIn('Content-Length', msg)
1832                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1833                self.assertEqual(expected, f.read())
1834
1835    def test_manual_content_length(self):
1836        # Set an incorrect content-length so that we can verify that
1837        # it will not be over-ridden by the library.
1838        self.conn.request("PUT", "/url", "body",
1839                          {"Content-Length": "42"})
1840        message, f = self.get_headers_and_fp()
1841        self.assertEqual("42", message.get("content-length"))
1842        self.assertEqual(4, len(f.read()))
1843
1844    def test_ascii_body(self):
1845        self.conn.request("PUT", "/url", "body")
1846        message, f = self.get_headers_and_fp()
1847        self.assertEqual("text/plain", message.get_content_type())
1848        self.assertIsNone(message.get_charset())
1849        self.assertEqual("4", message.get("content-length"))
1850        self.assertEqual(b'body', f.read())
1851
1852    def test_latin1_body(self):
1853        self.conn.request("PUT", "/url", "body\xc1")
1854        message, f = self.get_headers_and_fp()
1855        self.assertEqual("text/plain", message.get_content_type())
1856        self.assertIsNone(message.get_charset())
1857        self.assertEqual("5", message.get("content-length"))
1858        self.assertEqual(b'body\xc1', f.read())
1859
1860    def test_bytes_body(self):
1861        self.conn.request("PUT", "/url", b"body\xc1")
1862        message, f = self.get_headers_and_fp()
1863        self.assertEqual("text/plain", message.get_content_type())
1864        self.assertIsNone(message.get_charset())
1865        self.assertEqual("5", message.get("content-length"))
1866        self.assertEqual(b'body\xc1', f.read())
1867
1868    def test_text_file_body(self):
1869        self.addCleanup(support.unlink, support.TESTFN)
1870        with open(support.TESTFN, "w") as f:
1871            f.write("body")
1872        with open(support.TESTFN) as f:
1873            self.conn.request("PUT", "/url", f)
1874            message, f = self.get_headers_and_fp()
1875            self.assertEqual("text/plain", message.get_content_type())
1876            self.assertIsNone(message.get_charset())
1877            # No content-length will be determined for files; the body
1878            # will be sent using chunked transfer encoding instead.
1879            self.assertIsNone(message.get("content-length"))
1880            self.assertEqual("chunked", message.get("transfer-encoding"))
1881            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1882
1883    def test_binary_file_body(self):
1884        self.addCleanup(support.unlink, support.TESTFN)
1885        with open(support.TESTFN, "wb") as f:
1886            f.write(b"body\xc1")
1887        with open(support.TESTFN, "rb") as f:
1888            self.conn.request("PUT", "/url", f)
1889            message, f = self.get_headers_and_fp()
1890            self.assertEqual("text/plain", message.get_content_type())
1891            self.assertIsNone(message.get_charset())
1892            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1893            self.assertNotIn("Content-Length", message)
1894            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1895
1896
1897class HTTPResponseTest(TestCase):
1898
1899    def setUp(self):
1900        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1901                second-value\r\n\r\nText"
1902        sock = FakeSocket(body)
1903        self.resp = client.HTTPResponse(sock)
1904        self.resp.begin()
1905
1906    def test_getting_header(self):
1907        header = self.resp.getheader('My-Header')
1908        self.assertEqual(header, 'first-value, second-value')
1909
1910        header = self.resp.getheader('My-Header', 'some default')
1911        self.assertEqual(header, 'first-value, second-value')
1912
1913    def test_getting_nonexistent_header_with_string_default(self):
1914        header = self.resp.getheader('No-Such-Header', 'default-value')
1915        self.assertEqual(header, 'default-value')
1916
1917    def test_getting_nonexistent_header_with_iterable_default(self):
1918        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1919        self.assertEqual(header, 'default, values')
1920
1921        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1922        self.assertEqual(header, 'default, values')
1923
1924    def test_getting_nonexistent_header_without_default(self):
1925        header = self.resp.getheader('No-Such-Header')
1926        self.assertEqual(header, None)
1927
1928    def test_getting_header_defaultint(self):
1929        header = self.resp.getheader('No-Such-Header',default=42)
1930        self.assertEqual(header, 42)
1931
1932class TunnelTests(TestCase):
1933    def setUp(self):
1934        response_text = (
1935            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1936            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1937            'Content-Length: 42\r\n\r\n'
1938        )
1939        self.host = 'proxy.com'
1940        self.conn = client.HTTPConnection(self.host)
1941        self.conn._create_connection = self._create_connection(response_text)
1942
1943    def tearDown(self):
1944        self.conn.close()
1945
1946    def _create_connection(self, response_text):
1947        def create_connection(address, timeout=None, source_address=None):
1948            return FakeSocket(response_text, host=address[0], port=address[1])
1949        return create_connection
1950
1951    def test_set_tunnel_host_port_headers(self):
1952        tunnel_host = 'destination.com'
1953        tunnel_port = 8888
1954        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
1955        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
1956                             headers=tunnel_headers)
1957        self.conn.request('HEAD', '/', '')
1958        self.assertEqual(self.conn.sock.host, self.host)
1959        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1960        self.assertEqual(self.conn._tunnel_host, tunnel_host)
1961        self.assertEqual(self.conn._tunnel_port, tunnel_port)
1962        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
1963
1964    def test_disallow_set_tunnel_after_connect(self):
1965        # Once connected, we shouldn't be able to tunnel anymore
1966        self.conn.connect()
1967        self.assertRaises(RuntimeError, self.conn.set_tunnel,
1968                          'destination.com')
1969
1970    def test_connect_with_tunnel(self):
1971        self.conn.set_tunnel('destination.com')
1972        self.conn.request('HEAD', '/', '')
1973        self.assertEqual(self.conn.sock.host, self.host)
1974        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1975        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1976        # issue22095
1977        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
1978        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1979
1980        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
1981        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
1982
1983    def test_connect_put_request(self):
1984        self.conn.set_tunnel('destination.com')
1985        self.conn.request('PUT', '/', '')
1986        self.assertEqual(self.conn.sock.host, self.host)
1987        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1988        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1989        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1990
1991    def test_tunnel_debuglog(self):
1992        expected_header = 'X-Dummy: 1'
1993        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
1994
1995        self.conn.set_debuglevel(1)
1996        self.conn._create_connection = self._create_connection(response_text)
1997        self.conn.set_tunnel('destination.com')
1998
1999        with support.captured_stdout() as output:
2000            self.conn.request('PUT', '/', '')
2001        lines = output.getvalue().splitlines()
2002        self.assertIn('header: {}'.format(expected_header), lines)
2003
2004
2005if __name__ == '__main__':
2006    unittest.main(verbosity=2)
2007