• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client
3import io
4import itertools
5import os
6import array
7import socket
8
9import unittest
10TestCase = unittest.TestCase
11
12from test import support
13
14here = os.path.dirname(__file__)
15# Self-signed cert file for 'localhost'
16CERT_localhost = os.path.join(here, 'keycert.pem')
17# Self-signed cert file for 'fakehostname'
18CERT_fakehostname = os.path.join(here, 'keycert2.pem')
19# Self-signed cert file for self-signed.pythontest.net
20CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
21
22# constants for testing chunked encoding
23chunked_start = (
24    'HTTP/1.1 200 OK\r\n'
25    'Transfer-Encoding: chunked\r\n\r\n'
26    'a\r\n'
27    'hello worl\r\n'
28    '3\r\n'
29    'd! \r\n'
30    '8\r\n'
31    'and now \r\n'
32    '22\r\n'
33    'for something completely different\r\n'
34)
35chunked_expected = b'hello world! and now for something completely different'
36chunk_extension = ";foo=bar"
37last_chunk = "0\r\n"
38last_chunk_extended = "0" + chunk_extension + "\r\n"
39trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
40chunked_end = "\r\n"
41
42HOST = support.HOST
43
44class FakeSocket:
45    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
46        if isinstance(text, str):
47            text = text.encode("ascii")
48        self.text = text
49        self.fileclass = fileclass
50        self.data = b''
51        self.sendall_calls = 0
52        self.file_closed = False
53        self.host = host
54        self.port = port
55
56    def sendall(self, data):
57        self.sendall_calls += 1
58        self.data += data
59
60    def makefile(self, mode, bufsize=None):
61        if mode != 'r' and mode != 'rb':
62            raise client.UnimplementedFileMode()
63        # keep the file around so we can check how much was read from it
64        self.file = self.fileclass(self.text)
65        self.file.close = self.file_close #nerf close ()
66        return self.file
67
68    def file_close(self):
69        self.file_closed = True
70
71    def close(self):
72        pass
73
74    def setsockopt(self, level, optname, value):
75        pass
76
77class EPipeSocket(FakeSocket):
78
79    def __init__(self, text, pipe_trigger):
80        # When sendall() is called with pipe_trigger, raise EPIPE.
81        FakeSocket.__init__(self, text)
82        self.pipe_trigger = pipe_trigger
83
84    def sendall(self, data):
85        if self.pipe_trigger in data:
86            raise OSError(errno.EPIPE, "gotcha")
87        self.data += data
88
89    def close(self):
90        pass
91
92class NoEOFBytesIO(io.BytesIO):
93    """Like BytesIO, but raises AssertionError on EOF.
94
95    This is used below to test that http.client doesn't try to read
96    more from the underlying file than it should.
97    """
98    def read(self, n=-1):
99        data = io.BytesIO.read(self, n)
100        if data == b'':
101            raise AssertionError('caller tried to read past EOF')
102        return data
103
104    def readline(self, length=None):
105        data = io.BytesIO.readline(self, length)
106        if data == b'':
107            raise AssertionError('caller tried to read past EOF')
108        return data
109
110class FakeSocketHTTPConnection(client.HTTPConnection):
111    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
112
113    def __init__(self, *args):
114        self.connections = 0
115        super().__init__('example.com')
116        self.fake_socket_args = args
117        self._create_connection = self.create_connection
118
119    def connect(self):
120        """Count the number of times connect() is invoked"""
121        self.connections += 1
122        return super().connect()
123
124    def create_connection(self, *pos, **kw):
125        return FakeSocket(*self.fake_socket_args)
126
127class HeaderTests(TestCase):
128    def test_auto_headers(self):
129        # Some headers are added automatically, but should not be added by
130        # .request() if they are explicitly set.
131
132        class HeaderCountingBuffer(list):
133            def __init__(self):
134                self.count = {}
135            def append(self, item):
136                kv = item.split(b':')
137                if len(kv) > 1:
138                    # item is a 'Key: Value' header string
139                    lcKey = kv[0].decode('ascii').lower()
140                    self.count.setdefault(lcKey, 0)
141                    self.count[lcKey] += 1
142                list.append(self, item)
143
144        for explicit_header in True, False:
145            for header in 'Content-length', 'Host', 'Accept-encoding':
146                conn = client.HTTPConnection('example.com')
147                conn.sock = FakeSocket('blahblahblah')
148                conn._buffer = HeaderCountingBuffer()
149
150                body = 'spamspamspam'
151                headers = {}
152                if explicit_header:
153                    headers[header] = str(len(body))
154                conn.request('POST', '/', body, headers)
155                self.assertEqual(conn._buffer.count[header.lower()], 1)
156
157    def test_content_length_0(self):
158
159        class ContentLengthChecker(list):
160            def __init__(self):
161                list.__init__(self)
162                self.content_length = None
163            def append(self, item):
164                kv = item.split(b':', 1)
165                if len(kv) > 1 and kv[0].lower() == b'content-length':
166                    self.content_length = kv[1].strip()
167                list.append(self, item)
168
169        # Here, we're testing that methods expecting a body get a
170        # content-length set to zero if the body is empty (either None or '')
171        bodies = (None, '')
172        methods_with_body = ('PUT', 'POST', 'PATCH')
173        for method, body in itertools.product(methods_with_body, bodies):
174            conn = client.HTTPConnection('example.com')
175            conn.sock = FakeSocket(None)
176            conn._buffer = ContentLengthChecker()
177            conn.request(method, '/', body)
178            self.assertEqual(
179                conn._buffer.content_length, b'0',
180                'Header Content-Length incorrect on {}'.format(method)
181            )
182
183        # For these methods, we make sure that content-length is not set when
184        # the body is None because it might cause unexpected behaviour on the
185        # server.
186        methods_without_body = (
187             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
188        )
189        for method in methods_without_body:
190            conn = client.HTTPConnection('example.com')
191            conn.sock = FakeSocket(None)
192            conn._buffer = ContentLengthChecker()
193            conn.request(method, '/', None)
194            self.assertEqual(
195                conn._buffer.content_length, None,
196                'Header Content-Length set for empty body on {}'.format(method)
197            )
198
199        # If the body is set to '', that's considered to be "present but
200        # empty" rather than "missing", so content length would be set, even
201        # for methods that don't expect a body.
202        for method in methods_without_body:
203            conn = client.HTTPConnection('example.com')
204            conn.sock = FakeSocket(None)
205            conn._buffer = ContentLengthChecker()
206            conn.request(method, '/', '')
207            self.assertEqual(
208                conn._buffer.content_length, b'0',
209                'Header Content-Length incorrect on {}'.format(method)
210            )
211
212        # If the body is set, make sure Content-Length is set.
213        for method in itertools.chain(methods_without_body, methods_with_body):
214            conn = client.HTTPConnection('example.com')
215            conn.sock = FakeSocket(None)
216            conn._buffer = ContentLengthChecker()
217            conn.request(method, '/', ' ')
218            self.assertEqual(
219                conn._buffer.content_length, b'1',
220                'Header Content-Length incorrect on {}'.format(method)
221            )
222
223    def test_putheader(self):
224        conn = client.HTTPConnection('example.com')
225        conn.sock = FakeSocket(None)
226        conn.putrequest('GET','/')
227        conn.putheader('Content-length', 42)
228        self.assertIn(b'Content-length: 42', conn._buffer)
229
230        conn.putheader('Foo', ' bar ')
231        self.assertIn(b'Foo:  bar ', conn._buffer)
232        conn.putheader('Bar', '\tbaz\t')
233        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
234        conn.putheader('Authorization', 'Bearer mytoken')
235        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
236        conn.putheader('IterHeader', 'IterA', 'IterB')
237        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
238        conn.putheader('LatinHeader', b'\xFF')
239        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
240        conn.putheader('Utf8Header', b'\xc3\x80')
241        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
242        conn.putheader('C1-Control', b'next\x85line')
243        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
244        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
245        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
246        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
247        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
248        conn.putheader('Key Space', 'value')
249        self.assertIn(b'Key Space: value', conn._buffer)
250        conn.putheader('KeySpace ', 'value')
251        self.assertIn(b'KeySpace : value', conn._buffer)
252        conn.putheader(b'Nonbreak\xa0Space', 'value')
253        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
254        conn.putheader(b'\xa0NonbreakSpace', 'value')
255        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
256
257    def test_ipv6host_header(self):
258        # Default host header on IPv6 transaction should be wrapped by [] if
259        # it is an IPv6 address
260        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
261                   b'Accept-Encoding: identity\r\n\r\n'
262        conn = client.HTTPConnection('[2001::]:81')
263        sock = FakeSocket('')
264        conn.sock = sock
265        conn.request('GET', '/foo')
266        self.assertTrue(sock.data.startswith(expected))
267
268        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
269                   b'Accept-Encoding: identity\r\n\r\n'
270        conn = client.HTTPConnection('[2001:102A::]')
271        sock = FakeSocket('')
272        conn.sock = sock
273        conn.request('GET', '/foo')
274        self.assertTrue(sock.data.startswith(expected))
275
276    def test_malformed_headers_coped_with(self):
277        # Issue 19996
278        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
279        sock = FakeSocket(body)
280        resp = client.HTTPResponse(sock)
281        resp.begin()
282
283        self.assertEqual(resp.getheader('First'), 'val')
284        self.assertEqual(resp.getheader('Second'), 'val')
285
286    def test_parse_all_octets(self):
287        # Ensure no valid header field octet breaks the parser
288        body = (
289            b'HTTP/1.1 200 OK\r\n'
290            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
291            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
292            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
293            b'obs-fold: text\r\n'
294            b' folded with space\r\n'
295            b'\tfolded with tab\r\n'
296            b'Content-Length: 0\r\n'
297            b'\r\n'
298        )
299        sock = FakeSocket(body)
300        resp = client.HTTPResponse(sock)
301        resp.begin()
302        self.assertEqual(resp.getheader('Content-Length'), '0')
303        self.assertEqual(resp.msg['Content-Length'], '0')
304        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
305        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
306        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
307        self.assertEqual(resp.getheader('VCHAR'), vchar)
308        self.assertEqual(resp.msg['VCHAR'], vchar)
309        self.assertIsNotNone(resp.getheader('obs-text'))
310        self.assertIn('obs-text', resp.msg)
311        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
312            self.assertTrue(folded.startswith('text'))
313            self.assertIn(' folded with space', folded)
314            self.assertTrue(folded.endswith('folded with tab'))
315
316    def test_invalid_headers(self):
317        conn = client.HTTPConnection('example.com')
318        conn.sock = FakeSocket('')
319        conn.putrequest('GET', '/')
320
321        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
322        # longer allowed in header names
323        cases = (
324            (b'Invalid\r\nName', b'ValidValue'),
325            (b'Invalid\rName', b'ValidValue'),
326            (b'Invalid\nName', b'ValidValue'),
327            (b'\r\nInvalidName', b'ValidValue'),
328            (b'\rInvalidName', b'ValidValue'),
329            (b'\nInvalidName', b'ValidValue'),
330            (b' InvalidName', b'ValidValue'),
331            (b'\tInvalidName', b'ValidValue'),
332            (b'Invalid:Name', b'ValidValue'),
333            (b':InvalidName', b'ValidValue'),
334            (b'ValidName', b'Invalid\r\nValue'),
335            (b'ValidName', b'Invalid\rValue'),
336            (b'ValidName', b'Invalid\nValue'),
337            (b'ValidName', b'InvalidValue\r\n'),
338            (b'ValidName', b'InvalidValue\r'),
339            (b'ValidName', b'InvalidValue\n'),
340        )
341        for name, value in cases:
342            with self.subTest((name, value)):
343                with self.assertRaisesRegex(ValueError, 'Invalid header'):
344                    conn.putheader(name, value)
345
346
347class TransferEncodingTest(TestCase):
348    expected_body = b"It's just a flesh wound"
349
350    def test_endheaders_chunked(self):
351        conn = client.HTTPConnection('example.com')
352        conn.sock = FakeSocket(b'')
353        conn.putrequest('POST', '/')
354        conn.endheaders(self._make_body(), encode_chunked=True)
355
356        _, _, body = self._parse_request(conn.sock.data)
357        body = self._parse_chunked(body)
358        self.assertEqual(body, self.expected_body)
359
360    def test_explicit_headers(self):
361        # explicit chunked
362        conn = client.HTTPConnection('example.com')
363        conn.sock = FakeSocket(b'')
364        # this shouldn't actually be automatically chunk-encoded because the
365        # calling code has explicitly stated that it's taking care of it
366        conn.request(
367            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
368
369        _, headers, body = self._parse_request(conn.sock.data)
370        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
371        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
372        self.assertEqual(body, self.expected_body)
373
374        # explicit chunked, string body
375        conn = client.HTTPConnection('example.com')
376        conn.sock = FakeSocket(b'')
377        conn.request(
378            'POST', '/', self.expected_body.decode('latin-1'),
379            {'Transfer-Encoding': 'chunked'})
380
381        _, headers, body = self._parse_request(conn.sock.data)
382        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
383        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
384        self.assertEqual(body, self.expected_body)
385
386        # User-specified TE, but request() does the chunk encoding
387        conn = client.HTTPConnection('example.com')
388        conn.sock = FakeSocket(b'')
389        conn.request('POST', '/',
390            headers={'Transfer-Encoding': 'gzip, chunked'},
391            encode_chunked=True,
392            body=self._make_body())
393        _, headers, body = self._parse_request(conn.sock.data)
394        self.assertNotIn('content-length', [k.lower() for k in headers])
395        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
396        self.assertEqual(self._parse_chunked(body), self.expected_body)
397
398    def test_request(self):
399        for empty_lines in (False, True,):
400            conn = client.HTTPConnection('example.com')
401            conn.sock = FakeSocket(b'')
402            conn.request(
403                'POST', '/', self._make_body(empty_lines=empty_lines))
404
405            _, headers, body = self._parse_request(conn.sock.data)
406            body = self._parse_chunked(body)
407            self.assertEqual(body, self.expected_body)
408            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
409
410            # Content-Length and Transfer-Encoding SHOULD not be sent in the
411            # same request
412            self.assertNotIn('content-length', [k.lower() for k in headers])
413
414    def test_empty_body(self):
415        # Zero-length iterable should be treated like any other iterable
416        conn = client.HTTPConnection('example.com')
417        conn.sock = FakeSocket(b'')
418        conn.request('POST', '/', ())
419        _, headers, body = self._parse_request(conn.sock.data)
420        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
421        self.assertNotIn('content-length', [k.lower() for k in headers])
422        self.assertEqual(body, b"0\r\n\r\n")
423
424    def _make_body(self, empty_lines=False):
425        lines = self.expected_body.split(b' ')
426        for idx, line in enumerate(lines):
427            # for testing handling empty lines
428            if empty_lines and idx % 2:
429                yield b''
430            if idx < len(lines) - 1:
431                yield line + b' '
432            else:
433                yield line
434
435    def _parse_request(self, data):
436        lines = data.split(b'\r\n')
437        request = lines[0]
438        headers = {}
439        n = 1
440        while n < len(lines) and len(lines[n]) > 0:
441            key, val = lines[n].split(b':')
442            key = key.decode('latin-1').strip()
443            headers[key] = val.decode('latin-1').strip()
444            n += 1
445
446        return request, headers, b'\r\n'.join(lines[n + 1:])
447
448    def _parse_chunked(self, data):
449        body = []
450        trailers = {}
451        n = 0
452        lines = data.split(b'\r\n')
453        # parse body
454        while True:
455            size, chunk = lines[n:n+2]
456            size = int(size, 16)
457
458            if size == 0:
459                n += 1
460                break
461
462            self.assertEqual(size, len(chunk))
463            body.append(chunk)
464
465            n += 2
466            # we /should/ hit the end chunk, but check against the size of
467            # lines so we're not stuck in an infinite loop should we get
468            # malformed data
469            if n > len(lines):
470                break
471
472        return b''.join(body)
473
474
475class BasicTest(TestCase):
476    def test_status_lines(self):
477        # Test HTTP status lines
478
479        body = "HTTP/1.1 200 Ok\r\n\r\nText"
480        sock = FakeSocket(body)
481        resp = client.HTTPResponse(sock)
482        resp.begin()
483        self.assertEqual(resp.read(0), b'')  # Issue #20007
484        self.assertFalse(resp.isclosed())
485        self.assertFalse(resp.closed)
486        self.assertEqual(resp.read(), b"Text")
487        self.assertTrue(resp.isclosed())
488        self.assertFalse(resp.closed)
489        resp.close()
490        self.assertTrue(resp.closed)
491
492        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
493        sock = FakeSocket(body)
494        resp = client.HTTPResponse(sock)
495        self.assertRaises(client.BadStatusLine, resp.begin)
496
497    def test_bad_status_repr(self):
498        exc = client.BadStatusLine('')
499        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
500
501    def test_partial_reads(self):
502        # if we have Content-Length, HTTPResponse knows when to close itself,
503        # the same behaviour as when we read the whole thing with read()
504        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
505        sock = FakeSocket(body)
506        resp = client.HTTPResponse(sock)
507        resp.begin()
508        self.assertEqual(resp.read(2), b'Te')
509        self.assertFalse(resp.isclosed())
510        self.assertEqual(resp.read(2), b'xt')
511        self.assertTrue(resp.isclosed())
512        self.assertFalse(resp.closed)
513        resp.close()
514        self.assertTrue(resp.closed)
515
516    def test_mixed_reads(self):
517        # readline() should update the remaining length, so that read() knows
518        # how much data is left and does not raise IncompleteRead
519        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
520        sock = FakeSocket(body)
521        resp = client.HTTPResponse(sock)
522        resp.begin()
523        self.assertEqual(resp.readline(), b'Text\r\n')
524        self.assertFalse(resp.isclosed())
525        self.assertEqual(resp.read(), b'Another')
526        self.assertTrue(resp.isclosed())
527        self.assertFalse(resp.closed)
528        resp.close()
529        self.assertTrue(resp.closed)
530
531    def test_partial_readintos(self):
532        # if we have Content-Length, HTTPResponse knows when to close itself,
533        # the same behaviour as when we read the whole thing with read()
534        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
535        sock = FakeSocket(body)
536        resp = client.HTTPResponse(sock)
537        resp.begin()
538        b = bytearray(2)
539        n = resp.readinto(b)
540        self.assertEqual(n, 2)
541        self.assertEqual(bytes(b), b'Te')
542        self.assertFalse(resp.isclosed())
543        n = resp.readinto(b)
544        self.assertEqual(n, 2)
545        self.assertEqual(bytes(b), b'xt')
546        self.assertTrue(resp.isclosed())
547        self.assertFalse(resp.closed)
548        resp.close()
549        self.assertTrue(resp.closed)
550
551    def test_partial_reads_no_content_length(self):
552        # when no length is present, the socket should be gracefully closed when
553        # all data was read
554        body = "HTTP/1.1 200 Ok\r\n\r\nText"
555        sock = FakeSocket(body)
556        resp = client.HTTPResponse(sock)
557        resp.begin()
558        self.assertEqual(resp.read(2), b'Te')
559        self.assertFalse(resp.isclosed())
560        self.assertEqual(resp.read(2), b'xt')
561        self.assertEqual(resp.read(1), b'')
562        self.assertTrue(resp.isclosed())
563        self.assertFalse(resp.closed)
564        resp.close()
565        self.assertTrue(resp.closed)
566
567    def test_partial_readintos_no_content_length(self):
568        # when no length is present, the socket should be gracefully closed when
569        # all data was read
570        body = "HTTP/1.1 200 Ok\r\n\r\nText"
571        sock = FakeSocket(body)
572        resp = client.HTTPResponse(sock)
573        resp.begin()
574        b = bytearray(2)
575        n = resp.readinto(b)
576        self.assertEqual(n, 2)
577        self.assertEqual(bytes(b), b'Te')
578        self.assertFalse(resp.isclosed())
579        n = resp.readinto(b)
580        self.assertEqual(n, 2)
581        self.assertEqual(bytes(b), b'xt')
582        n = resp.readinto(b)
583        self.assertEqual(n, 0)
584        self.assertTrue(resp.isclosed())
585
586    def test_partial_reads_incomplete_body(self):
587        # if the server shuts down the connection before the whole
588        # content-length is delivered, the socket is gracefully closed
589        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
590        sock = FakeSocket(body)
591        resp = client.HTTPResponse(sock)
592        resp.begin()
593        self.assertEqual(resp.read(2), b'Te')
594        self.assertFalse(resp.isclosed())
595        self.assertEqual(resp.read(2), b'xt')
596        self.assertEqual(resp.read(1), b'')
597        self.assertTrue(resp.isclosed())
598
599    def test_partial_readintos_incomplete_body(self):
600        # if the server shuts down the connection before the whole
601        # content-length is delivered, the socket is gracefully closed
602        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
603        sock = FakeSocket(body)
604        resp = client.HTTPResponse(sock)
605        resp.begin()
606        b = bytearray(2)
607        n = resp.readinto(b)
608        self.assertEqual(n, 2)
609        self.assertEqual(bytes(b), b'Te')
610        self.assertFalse(resp.isclosed())
611        n = resp.readinto(b)
612        self.assertEqual(n, 2)
613        self.assertEqual(bytes(b), b'xt')
614        n = resp.readinto(b)
615        self.assertEqual(n, 0)
616        self.assertTrue(resp.isclosed())
617        self.assertFalse(resp.closed)
618        resp.close()
619        self.assertTrue(resp.closed)
620
621    def test_host_port(self):
622        # Check invalid host_port
623
624        for hp in ("www.python.org:abc", "user:password@www.python.org"):
625            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
626
627        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
628                          "fe80::207:e9ff:fe9b", 8000),
629                         ("www.python.org:80", "www.python.org", 80),
630                         ("www.python.org:", "www.python.org", 80),
631                         ("www.python.org", "www.python.org", 80),
632                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
633                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
634            c = client.HTTPConnection(hp)
635            self.assertEqual(h, c.host)
636            self.assertEqual(p, c.port)
637
638    def test_response_headers(self):
639        # test response with multiple message headers with the same field name.
640        text = ('HTTP/1.1 200 OK\r\n'
641                'Set-Cookie: Customer="WILE_E_COYOTE"; '
642                'Version="1"; Path="/acme"\r\n'
643                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
644                ' Path="/acme"\r\n'
645                '\r\n'
646                'No body\r\n')
647        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
648               ', '
649               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
650        s = FakeSocket(text)
651        r = client.HTTPResponse(s)
652        r.begin()
653        cookies = r.getheader("Set-Cookie")
654        self.assertEqual(cookies, hdr)
655
656    def test_read_head(self):
657        # Test that the library doesn't attempt to read any data
658        # from a HEAD request.  (Tickles SF bug #622042.)
659        sock = FakeSocket(
660            'HTTP/1.1 200 OK\r\n'
661            'Content-Length: 14432\r\n'
662            '\r\n',
663            NoEOFBytesIO)
664        resp = client.HTTPResponse(sock, method="HEAD")
665        resp.begin()
666        if resp.read():
667            self.fail("Did not expect response from HEAD request")
668
669    def test_readinto_head(self):
670        # Test that the library doesn't attempt to read any data
671        # from a HEAD request.  (Tickles SF bug #622042.)
672        sock = FakeSocket(
673            'HTTP/1.1 200 OK\r\n'
674            'Content-Length: 14432\r\n'
675            '\r\n',
676            NoEOFBytesIO)
677        resp = client.HTTPResponse(sock, method="HEAD")
678        resp.begin()
679        b = bytearray(5)
680        if resp.readinto(b) != 0:
681            self.fail("Did not expect response from HEAD request")
682        self.assertEqual(bytes(b), b'\x00'*5)
683
684    def test_too_many_headers(self):
685        headers = '\r\n'.join('Header%d: foo' % i
686                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
687        text = ('HTTP/1.1 200 OK\r\n' + headers)
688        s = FakeSocket(text)
689        r = client.HTTPResponse(s)
690        self.assertRaisesRegex(client.HTTPException,
691                               r"got more than \d+ headers", r.begin)
692
693    def test_send_file(self):
694        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
695                    b'Accept-Encoding: identity\r\n'
696                    b'Transfer-Encoding: chunked\r\n'
697                    b'\r\n')
698
699        with open(__file__, 'rb') as body:
700            conn = client.HTTPConnection('example.com')
701            sock = FakeSocket(body)
702            conn.sock = sock
703            conn.request('GET', '/foo', body)
704            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
705                    (sock.data[:len(expected)], expected))
706
707    def test_send(self):
708        expected = b'this is a test this is only a test'
709        conn = client.HTTPConnection('example.com')
710        sock = FakeSocket(None)
711        conn.sock = sock
712        conn.send(expected)
713        self.assertEqual(expected, sock.data)
714        sock.data = b''
715        conn.send(array.array('b', expected))
716        self.assertEqual(expected, sock.data)
717        sock.data = b''
718        conn.send(io.BytesIO(expected))
719        self.assertEqual(expected, sock.data)
720
721    def test_send_updating_file(self):
722        def data():
723            yield 'data'
724            yield None
725            yield 'data_two'
726
727        class UpdatingFile(io.TextIOBase):
728            mode = 'r'
729            d = data()
730            def read(self, blocksize=-1):
731                return next(self.d)
732
733        expected = b'data'
734
735        conn = client.HTTPConnection('example.com')
736        sock = FakeSocket("")
737        conn.sock = sock
738        conn.send(UpdatingFile())
739        self.assertEqual(sock.data, expected)
740
741
742    def test_send_iter(self):
743        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
744                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
745                   b'\r\nonetwothree'
746
747        def body():
748            yield b"one"
749            yield b"two"
750            yield b"three"
751
752        conn = client.HTTPConnection('example.com')
753        sock = FakeSocket("")
754        conn.sock = sock
755        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
756        self.assertEqual(sock.data, expected)
757
758    def test_send_type_error(self):
759        # See: Issue #12676
760        conn = client.HTTPConnection('example.com')
761        conn.sock = FakeSocket('')
762        with self.assertRaises(TypeError):
763            conn.request('POST', 'test', conn)
764
765    def test_chunked(self):
766        expected = chunked_expected
767        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
768        resp = client.HTTPResponse(sock, method="GET")
769        resp.begin()
770        self.assertEqual(resp.read(), expected)
771        resp.close()
772
773        # Various read sizes
774        for n in range(1, 12):
775            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
776            resp = client.HTTPResponse(sock, method="GET")
777            resp.begin()
778            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
779            resp.close()
780
781        for x in ('', 'foo\r\n'):
782            sock = FakeSocket(chunked_start + x)
783            resp = client.HTTPResponse(sock, method="GET")
784            resp.begin()
785            try:
786                resp.read()
787            except client.IncompleteRead as i:
788                self.assertEqual(i.partial, expected)
789                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
790                self.assertEqual(repr(i), expected_message)
791                self.assertEqual(str(i), expected_message)
792            else:
793                self.fail('IncompleteRead expected')
794            finally:
795                resp.close()
796
797    def test_readinto_chunked(self):
798
799        expected = chunked_expected
800        nexpected = len(expected)
801        b = bytearray(128)
802
803        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
804        resp = client.HTTPResponse(sock, method="GET")
805        resp.begin()
806        n = resp.readinto(b)
807        self.assertEqual(b[:nexpected], expected)
808        self.assertEqual(n, nexpected)
809        resp.close()
810
811        # Various read sizes
812        for n in range(1, 12):
813            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
814            resp = client.HTTPResponse(sock, method="GET")
815            resp.begin()
816            m = memoryview(b)
817            i = resp.readinto(m[0:n])
818            i += resp.readinto(m[i:n + i])
819            i += resp.readinto(m[i:])
820            self.assertEqual(b[:nexpected], expected)
821            self.assertEqual(i, nexpected)
822            resp.close()
823
824        for x in ('', 'foo\r\n'):
825            sock = FakeSocket(chunked_start + x)
826            resp = client.HTTPResponse(sock, method="GET")
827            resp.begin()
828            try:
829                n = resp.readinto(b)
830            except client.IncompleteRead as i:
831                self.assertEqual(i.partial, expected)
832                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
833                self.assertEqual(repr(i), expected_message)
834                self.assertEqual(str(i), expected_message)
835            else:
836                self.fail('IncompleteRead expected')
837            finally:
838                resp.close()
839
840    def test_chunked_head(self):
841        chunked_start = (
842            'HTTP/1.1 200 OK\r\n'
843            'Transfer-Encoding: chunked\r\n\r\n'
844            'a\r\n'
845            'hello world\r\n'
846            '1\r\n'
847            'd\r\n'
848        )
849        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
850        resp = client.HTTPResponse(sock, method="HEAD")
851        resp.begin()
852        self.assertEqual(resp.read(), b'')
853        self.assertEqual(resp.status, 200)
854        self.assertEqual(resp.reason, 'OK')
855        self.assertTrue(resp.isclosed())
856        self.assertFalse(resp.closed)
857        resp.close()
858        self.assertTrue(resp.closed)
859
860    def test_readinto_chunked_head(self):
861        chunked_start = (
862            'HTTP/1.1 200 OK\r\n'
863            'Transfer-Encoding: chunked\r\n\r\n'
864            'a\r\n'
865            'hello world\r\n'
866            '1\r\n'
867            'd\r\n'
868        )
869        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
870        resp = client.HTTPResponse(sock, method="HEAD")
871        resp.begin()
872        b = bytearray(5)
873        n = resp.readinto(b)
874        self.assertEqual(n, 0)
875        self.assertEqual(bytes(b), b'\x00'*5)
876        self.assertEqual(resp.status, 200)
877        self.assertEqual(resp.reason, 'OK')
878        self.assertTrue(resp.isclosed())
879        self.assertFalse(resp.closed)
880        resp.close()
881        self.assertTrue(resp.closed)
882
883    def test_negative_content_length(self):
884        sock = FakeSocket(
885            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
886        resp = client.HTTPResponse(sock, method="GET")
887        resp.begin()
888        self.assertEqual(resp.read(), b'Hello\r\n')
889        self.assertTrue(resp.isclosed())
890
891    def test_incomplete_read(self):
892        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
893        resp = client.HTTPResponse(sock, method="GET")
894        resp.begin()
895        try:
896            resp.read()
897        except client.IncompleteRead as i:
898            self.assertEqual(i.partial, b'Hello\r\n')
899            self.assertEqual(repr(i),
900                             "IncompleteRead(7 bytes read, 3 more expected)")
901            self.assertEqual(str(i),
902                             "IncompleteRead(7 bytes read, 3 more expected)")
903            self.assertTrue(resp.isclosed())
904        else:
905            self.fail('IncompleteRead expected')
906
907    def test_epipe(self):
908        sock = EPipeSocket(
909            "HTTP/1.0 401 Authorization Required\r\n"
910            "Content-type: text/html\r\n"
911            "WWW-Authenticate: Basic realm=\"example\"\r\n",
912            b"Content-Length")
913        conn = client.HTTPConnection("example.com")
914        conn.sock = sock
915        self.assertRaises(OSError,
916                          lambda: conn.request("PUT", "/url", "body"))
917        resp = conn.getresponse()
918        self.assertEqual(401, resp.status)
919        self.assertEqual("Basic realm=\"example\"",
920                         resp.getheader("www-authenticate"))
921
922    # Test lines overflowing the max line size (_MAXLINE in http.client)
923
924    def test_overflowing_status_line(self):
925        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
926        resp = client.HTTPResponse(FakeSocket(body))
927        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
928
929    def test_overflowing_header_line(self):
930        body = (
931            'HTTP/1.1 200 OK\r\n'
932            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
933        )
934        resp = client.HTTPResponse(FakeSocket(body))
935        self.assertRaises(client.LineTooLong, resp.begin)
936
937    def test_overflowing_chunked_line(self):
938        body = (
939            'HTTP/1.1 200 OK\r\n'
940            'Transfer-Encoding: chunked\r\n\r\n'
941            + '0' * 65536 + 'a\r\n'
942            'hello world\r\n'
943            '0\r\n'
944            '\r\n'
945        )
946        resp = client.HTTPResponse(FakeSocket(body))
947        resp.begin()
948        self.assertRaises(client.LineTooLong, resp.read)
949
950    def test_early_eof(self):
951        # Test httpresponse with no \r\n termination,
952        body = "HTTP/1.1 200 Ok"
953        sock = FakeSocket(body)
954        resp = client.HTTPResponse(sock)
955        resp.begin()
956        self.assertEqual(resp.read(), b'')
957        self.assertTrue(resp.isclosed())
958        self.assertFalse(resp.closed)
959        resp.close()
960        self.assertTrue(resp.closed)
961
962    def test_error_leak(self):
963        # Test that the socket is not leaked if getresponse() fails
964        conn = client.HTTPConnection('example.com')
965        response = None
966        class Response(client.HTTPResponse):
967            def __init__(self, *pos, **kw):
968                nonlocal response
969                response = self  # Avoid garbage collector closing the socket
970                client.HTTPResponse.__init__(self, *pos, **kw)
971        conn.response_class = Response
972        conn.sock = FakeSocket('Invalid status line')
973        conn.request('GET', '/')
974        self.assertRaises(client.BadStatusLine, conn.getresponse)
975        self.assertTrue(response.closed)
976        self.assertTrue(conn.sock.file_closed)
977
978    def test_chunked_extension(self):
979        extra = '3;foo=bar\r\n' + 'abc\r\n'
980        expected = chunked_expected + b'abc'
981
982        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
983        resp = client.HTTPResponse(sock, method="GET")
984        resp.begin()
985        self.assertEqual(resp.read(), expected)
986        resp.close()
987
988    def test_chunked_missing_end(self):
989        """some servers may serve up a short chunked encoding stream"""
990        expected = chunked_expected
991        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
992        resp = client.HTTPResponse(sock, method="GET")
993        resp.begin()
994        self.assertEqual(resp.read(), expected)
995        resp.close()
996
997    def test_chunked_trailers(self):
998        """See that trailers are read and ignored"""
999        expected = chunked_expected
1000        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1001        resp = client.HTTPResponse(sock, method="GET")
1002        resp.begin()
1003        self.assertEqual(resp.read(), expected)
1004        # we should have reached the end of the file
1005        self.assertEqual(sock.file.read(), b"") #we read to the end
1006        resp.close()
1007
1008    def test_chunked_sync(self):
1009        """Check that we don't read past the end of the chunked-encoding stream"""
1010        expected = chunked_expected
1011        extradata = "extradata"
1012        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1013        resp = client.HTTPResponse(sock, method="GET")
1014        resp.begin()
1015        self.assertEqual(resp.read(), expected)
1016        # the file should now have our extradata ready to be read
1017        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1018        resp.close()
1019
1020    def test_content_length_sync(self):
1021        """Check that we don't read past the end of the Content-Length stream"""
1022        extradata = b"extradata"
1023        expected = b"Hello123\r\n"
1024        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1025        resp = client.HTTPResponse(sock, method="GET")
1026        resp.begin()
1027        self.assertEqual(resp.read(), expected)
1028        # the file should now have our extradata ready to be read
1029        self.assertEqual(sock.file.read(), extradata) #we read to the end
1030        resp.close()
1031
1032    def test_readlines_content_length(self):
1033        extradata = b"extradata"
1034        expected = b"Hello123\r\n"
1035        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1036        resp = client.HTTPResponse(sock, method="GET")
1037        resp.begin()
1038        self.assertEqual(resp.readlines(2000), [expected])
1039        # the file should now have our extradata ready to be read
1040        self.assertEqual(sock.file.read(), extradata) #we read to the end
1041        resp.close()
1042
1043    def test_read1_content_length(self):
1044        extradata = b"extradata"
1045        expected = b"Hello123\r\n"
1046        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1047        resp = client.HTTPResponse(sock, method="GET")
1048        resp.begin()
1049        self.assertEqual(resp.read1(2000), expected)
1050        # the file should now have our extradata ready to be read
1051        self.assertEqual(sock.file.read(), extradata) #we read to the end
1052        resp.close()
1053
1054    def test_readline_bound_content_length(self):
1055        extradata = b"extradata"
1056        expected = b"Hello123\r\n"
1057        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1058        resp = client.HTTPResponse(sock, method="GET")
1059        resp.begin()
1060        self.assertEqual(resp.readline(10), expected)
1061        self.assertEqual(resp.readline(10), b"")
1062        # the file should now have our extradata ready to be read
1063        self.assertEqual(sock.file.read(), extradata) #we read to the end
1064        resp.close()
1065
1066    def test_read1_bound_content_length(self):
1067        extradata = b"extradata"
1068        expected = b"Hello123\r\n"
1069        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1070        resp = client.HTTPResponse(sock, method="GET")
1071        resp.begin()
1072        self.assertEqual(resp.read1(20), expected*2)
1073        self.assertEqual(resp.read(), expected)
1074        # the file should now have our extradata ready to be read
1075        self.assertEqual(sock.file.read(), extradata) #we read to the end
1076        resp.close()
1077
1078    def test_response_fileno(self):
1079        # Make sure fd returned by fileno is valid.
1080        threading = support.import_module("threading")
1081
1082        serv = socket.socket(
1083            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
1084        self.addCleanup(serv.close)
1085        serv.bind((HOST, 0))
1086        serv.listen()
1087
1088        result = None
1089        def run_server():
1090            [conn, address] = serv.accept()
1091            with conn, conn.makefile("rb") as reader:
1092                # Read the request header until a blank line
1093                while True:
1094                    line = reader.readline()
1095                    if not line.rstrip(b"\r\n"):
1096                        break
1097                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1098                nonlocal result
1099                result = reader.read()
1100
1101        thread = threading.Thread(target=run_server)
1102        thread.start()
1103        self.addCleanup(thread.join, float(1))
1104        conn = client.HTTPConnection(*serv.getsockname())
1105        conn.request("CONNECT", "dummy:1234")
1106        response = conn.getresponse()
1107        try:
1108            self.assertEqual(response.status, client.OK)
1109            s = socket.socket(fileno=response.fileno())
1110            try:
1111                s.sendall(b"proxied data\n")
1112            finally:
1113                s.detach()
1114        finally:
1115            response.close()
1116            conn.close()
1117        thread.join()
1118        self.assertEqual(result, b"proxied data\n")
1119
1120class ExtendedReadTest(TestCase):
1121    """
1122    Test peek(), read1(), readline()
1123    """
1124    lines = (
1125        'HTTP/1.1 200 OK\r\n'
1126        '\r\n'
1127        'hello world!\n'
1128        'and now \n'
1129        'for something completely different\n'
1130        'foo'
1131        )
1132    lines_expected = lines[lines.find('hello'):].encode("ascii")
1133    lines_chunked = (
1134        'HTTP/1.1 200 OK\r\n'
1135        'Transfer-Encoding: chunked\r\n\r\n'
1136        'a\r\n'
1137        'hello worl\r\n'
1138        '3\r\n'
1139        'd!\n\r\n'
1140        '9\r\n'
1141        'and now \n\r\n'
1142        '23\r\n'
1143        'for something completely different\n\r\n'
1144        '3\r\n'
1145        'foo\r\n'
1146        '0\r\n' # terminating chunk
1147        '\r\n'  # end of trailers
1148    )
1149
1150    def setUp(self):
1151        sock = FakeSocket(self.lines)
1152        resp = client.HTTPResponse(sock, method="GET")
1153        resp.begin()
1154        resp.fp = io.BufferedReader(resp.fp)
1155        self.resp = resp
1156
1157
1158
1159    def test_peek(self):
1160        resp = self.resp
1161        # patch up the buffered peek so that it returns not too much stuff
1162        oldpeek = resp.fp.peek
1163        def mypeek(n=-1):
1164            p = oldpeek(n)
1165            if n >= 0:
1166                return p[:n]
1167            return p[:10]
1168        resp.fp.peek = mypeek
1169
1170        all = []
1171        while True:
1172            # try a short peek
1173            p = resp.peek(3)
1174            if p:
1175                self.assertGreater(len(p), 0)
1176                # then unbounded peek
1177                p2 = resp.peek()
1178                self.assertGreaterEqual(len(p2), len(p))
1179                self.assertTrue(p2.startswith(p))
1180                next = resp.read(len(p2))
1181                self.assertEqual(next, p2)
1182            else:
1183                next = resp.read()
1184                self.assertFalse(next)
1185            all.append(next)
1186            if not next:
1187                break
1188        self.assertEqual(b"".join(all), self.lines_expected)
1189
1190    def test_readline(self):
1191        resp = self.resp
1192        self._verify_readline(self.resp.readline, self.lines_expected)
1193
1194    def _verify_readline(self, readline, expected):
1195        all = []
1196        while True:
1197            # short readlines
1198            line = readline(5)
1199            if line and line != b"foo":
1200                if len(line) < 5:
1201                    self.assertTrue(line.endswith(b"\n"))
1202            all.append(line)
1203            if not line:
1204                break
1205        self.assertEqual(b"".join(all), expected)
1206
1207    def test_read1(self):
1208        resp = self.resp
1209        def r():
1210            res = resp.read1(4)
1211            self.assertLessEqual(len(res), 4)
1212            return res
1213        readliner = Readliner(r)
1214        self._verify_readline(readliner.readline, self.lines_expected)
1215
1216    def test_read1_unbounded(self):
1217        resp = self.resp
1218        all = []
1219        while True:
1220            data = resp.read1()
1221            if not data:
1222                break
1223            all.append(data)
1224        self.assertEqual(b"".join(all), self.lines_expected)
1225
1226    def test_read1_bounded(self):
1227        resp = self.resp
1228        all = []
1229        while True:
1230            data = resp.read1(10)
1231            if not data:
1232                break
1233            self.assertLessEqual(len(data), 10)
1234            all.append(data)
1235        self.assertEqual(b"".join(all), self.lines_expected)
1236
1237    def test_read1_0(self):
1238        self.assertEqual(self.resp.read1(0), b"")
1239
1240    def test_peek_0(self):
1241        p = self.resp.peek(0)
1242        self.assertLessEqual(0, len(p))
1243
1244class ExtendedReadTestChunked(ExtendedReadTest):
1245    """
1246    Test peek(), read1(), readline() in chunked mode
1247    """
1248    lines = (
1249        'HTTP/1.1 200 OK\r\n'
1250        'Transfer-Encoding: chunked\r\n\r\n'
1251        'a\r\n'
1252        'hello worl\r\n'
1253        '3\r\n'
1254        'd!\n\r\n'
1255        '9\r\n'
1256        'and now \n\r\n'
1257        '23\r\n'
1258        'for something completely different\n\r\n'
1259        '3\r\n'
1260        'foo\r\n'
1261        '0\r\n' # terminating chunk
1262        '\r\n'  # end of trailers
1263    )
1264
1265
1266class Readliner:
1267    """
1268    a simple readline class that uses an arbitrary read function and buffering
1269    """
1270    def __init__(self, readfunc):
1271        self.readfunc = readfunc
1272        self.remainder = b""
1273
1274    def readline(self, limit):
1275        data = []
1276        datalen = 0
1277        read = self.remainder
1278        try:
1279            while True:
1280                idx = read.find(b'\n')
1281                if idx != -1:
1282                    break
1283                if datalen + len(read) >= limit:
1284                    idx = limit - datalen - 1
1285                # read more data
1286                data.append(read)
1287                read = self.readfunc()
1288                if not read:
1289                    idx = 0 #eof condition
1290                    break
1291            idx += 1
1292            data.append(read[:idx])
1293            self.remainder = read[idx:]
1294            return b"".join(data)
1295        except:
1296            self.remainder = b"".join(data)
1297            raise
1298
1299
1300class OfflineTest(TestCase):
1301    def test_all(self):
1302        # Documented objects defined in the module should be in __all__
1303        expected = {"responses"}  # White-list documented dict() object
1304        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1305        # intentionally omitted for simplicity
1306        blacklist = {"HTTPMessage", "parse_headers"}
1307        for name in dir(client):
1308            if name.startswith("_") or name in blacklist:
1309                continue
1310            module_object = getattr(client, name)
1311            if getattr(module_object, "__module__", None) == "http.client":
1312                expected.add(name)
1313        self.assertCountEqual(client.__all__, expected)
1314
1315    def test_responses(self):
1316        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1317
1318    def test_client_constants(self):
1319        # Make sure we don't break backward compatibility with 3.4
1320        expected = [
1321            'CONTINUE',
1322            'SWITCHING_PROTOCOLS',
1323            'PROCESSING',
1324            'OK',
1325            'CREATED',
1326            'ACCEPTED',
1327            'NON_AUTHORITATIVE_INFORMATION',
1328            'NO_CONTENT',
1329            'RESET_CONTENT',
1330            'PARTIAL_CONTENT',
1331            'MULTI_STATUS',
1332            'IM_USED',
1333            'MULTIPLE_CHOICES',
1334            'MOVED_PERMANENTLY',
1335            'FOUND',
1336            'SEE_OTHER',
1337            'NOT_MODIFIED',
1338            'USE_PROXY',
1339            'TEMPORARY_REDIRECT',
1340            'BAD_REQUEST',
1341            'UNAUTHORIZED',
1342            'PAYMENT_REQUIRED',
1343            'FORBIDDEN',
1344            'NOT_FOUND',
1345            'METHOD_NOT_ALLOWED',
1346            'NOT_ACCEPTABLE',
1347            'PROXY_AUTHENTICATION_REQUIRED',
1348            'REQUEST_TIMEOUT',
1349            'CONFLICT',
1350            'GONE',
1351            'LENGTH_REQUIRED',
1352            'PRECONDITION_FAILED',
1353            'REQUEST_ENTITY_TOO_LARGE',
1354            'REQUEST_URI_TOO_LONG',
1355            'UNSUPPORTED_MEDIA_TYPE',
1356            'REQUESTED_RANGE_NOT_SATISFIABLE',
1357            'EXPECTATION_FAILED',
1358            'UNPROCESSABLE_ENTITY',
1359            'LOCKED',
1360            'FAILED_DEPENDENCY',
1361            'UPGRADE_REQUIRED',
1362            'PRECONDITION_REQUIRED',
1363            'TOO_MANY_REQUESTS',
1364            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1365            'INTERNAL_SERVER_ERROR',
1366            'NOT_IMPLEMENTED',
1367            'BAD_GATEWAY',
1368            'SERVICE_UNAVAILABLE',
1369            'GATEWAY_TIMEOUT',
1370            'HTTP_VERSION_NOT_SUPPORTED',
1371            'INSUFFICIENT_STORAGE',
1372            'NOT_EXTENDED',
1373            'NETWORK_AUTHENTICATION_REQUIRED',
1374        ]
1375        for const in expected:
1376            with self.subTest(constant=const):
1377                self.assertTrue(hasattr(client, const))
1378
1379
1380class SourceAddressTest(TestCase):
1381    def setUp(self):
1382        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1383        self.port = support.bind_port(self.serv)
1384        self.source_port = support.find_unused_port()
1385        self.serv.listen()
1386        self.conn = None
1387
1388    def tearDown(self):
1389        if self.conn:
1390            self.conn.close()
1391            self.conn = None
1392        self.serv.close()
1393        self.serv = None
1394
1395    def testHTTPConnectionSourceAddress(self):
1396        self.conn = client.HTTPConnection(HOST, self.port,
1397                source_address=('', self.source_port))
1398        self.conn.connect()
1399        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1400
1401    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1402                     'http.client.HTTPSConnection not defined')
1403    def testHTTPSConnectionSourceAddress(self):
1404        self.conn = client.HTTPSConnection(HOST, self.port,
1405                source_address=('', self.source_port))
1406        # We don't test anything here other than the constructor not barfing as
1407        # this code doesn't deal with setting up an active running SSL server
1408        # for an ssl_wrapped connect() to actually return from.
1409
1410
1411class TimeoutTest(TestCase):
1412    PORT = None
1413
1414    def setUp(self):
1415        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1416        TimeoutTest.PORT = support.bind_port(self.serv)
1417        self.serv.listen()
1418
1419    def tearDown(self):
1420        self.serv.close()
1421        self.serv = None
1422
1423    def testTimeoutAttribute(self):
1424        # This will prove that the timeout gets through HTTPConnection
1425        # and into the socket.
1426
1427        # default -- use global socket timeout
1428        self.assertIsNone(socket.getdefaulttimeout())
1429        socket.setdefaulttimeout(30)
1430        try:
1431            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1432            httpConn.connect()
1433        finally:
1434            socket.setdefaulttimeout(None)
1435        self.assertEqual(httpConn.sock.gettimeout(), 30)
1436        httpConn.close()
1437
1438        # no timeout -- do not use global socket default
1439        self.assertIsNone(socket.getdefaulttimeout())
1440        socket.setdefaulttimeout(30)
1441        try:
1442            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1443                                              timeout=None)
1444            httpConn.connect()
1445        finally:
1446            socket.setdefaulttimeout(None)
1447        self.assertEqual(httpConn.sock.gettimeout(), None)
1448        httpConn.close()
1449
1450        # a value
1451        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1452        httpConn.connect()
1453        self.assertEqual(httpConn.sock.gettimeout(), 30)
1454        httpConn.close()
1455
1456
1457class PersistenceTest(TestCase):
1458
1459    def test_reuse_reconnect(self):
1460        # Should reuse or reconnect depending on header from server
1461        tests = (
1462            ('1.0', '', False),
1463            ('1.0', 'Connection: keep-alive\r\n', True),
1464            ('1.1', '', True),
1465            ('1.1', 'Connection: close\r\n', False),
1466            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1467            ('1.1', 'Connection: cloSE\r\n', False),
1468        )
1469        for version, header, reuse in tests:
1470            with self.subTest(version=version, header=header):
1471                msg = (
1472                    'HTTP/{} 200 OK\r\n'
1473                    '{}'
1474                    'Content-Length: 12\r\n'
1475                    '\r\n'
1476                    'Dummy body\r\n'
1477                ).format(version, header)
1478                conn = FakeSocketHTTPConnection(msg)
1479                self.assertIsNone(conn.sock)
1480                conn.request('GET', '/open-connection')
1481                with conn.getresponse() as response:
1482                    self.assertEqual(conn.sock is None, not reuse)
1483                    response.read()
1484                self.assertEqual(conn.sock is None, not reuse)
1485                self.assertEqual(conn.connections, 1)
1486                conn.request('GET', '/subsequent-request')
1487                self.assertEqual(conn.connections, 1 if reuse else 2)
1488
1489    def test_disconnected(self):
1490
1491        def make_reset_reader(text):
1492            """Return BufferedReader that raises ECONNRESET at EOF"""
1493            stream = io.BytesIO(text)
1494            def readinto(buffer):
1495                size = io.BytesIO.readinto(stream, buffer)
1496                if size == 0:
1497                    raise ConnectionResetError()
1498                return size
1499            stream.readinto = readinto
1500            return io.BufferedReader(stream)
1501
1502        tests = (
1503            (io.BytesIO, client.RemoteDisconnected),
1504            (make_reset_reader, ConnectionResetError),
1505        )
1506        for stream_factory, exception in tests:
1507            with self.subTest(exception=exception):
1508                conn = FakeSocketHTTPConnection(b'', stream_factory)
1509                conn.request('GET', '/eof-response')
1510                self.assertRaises(exception, conn.getresponse)
1511                self.assertIsNone(conn.sock)
1512                # HTTPConnection.connect() should be automatically invoked
1513                conn.request('GET', '/reconnect')
1514                self.assertEqual(conn.connections, 2)
1515
1516    def test_100_close(self):
1517        conn = FakeSocketHTTPConnection(
1518            b'HTTP/1.1 100 Continue\r\n'
1519            b'\r\n'
1520            # Missing final response
1521        )
1522        conn.request('GET', '/', headers={'Expect': '100-continue'})
1523        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1524        self.assertIsNone(conn.sock)
1525        conn.request('GET', '/reconnect')
1526        self.assertEqual(conn.connections, 2)
1527
1528
1529class HTTPSTest(TestCase):
1530
1531    def setUp(self):
1532        if not hasattr(client, 'HTTPSConnection'):
1533            self.skipTest('ssl support required')
1534
1535    def make_server(self, certfile):
1536        from test.ssl_servers import make_https_server
1537        return make_https_server(self, certfile=certfile)
1538
1539    def test_attributes(self):
1540        # simple test to check it's storing the timeout
1541        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1542        self.assertEqual(h.timeout, 30)
1543
1544    def test_networked(self):
1545        # Default settings: requires a valid cert from a trusted CA
1546        import ssl
1547        support.requires('network')
1548        with support.transient_internet('self-signed.pythontest.net'):
1549            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1550            with self.assertRaises(ssl.SSLError) as exc_info:
1551                h.request('GET', '/')
1552            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1553
1554    def test_networked_noverification(self):
1555        # Switch off cert verification
1556        import ssl
1557        support.requires('network')
1558        with support.transient_internet('self-signed.pythontest.net'):
1559            context = ssl._create_unverified_context()
1560            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1561                                       context=context)
1562            h.request('GET', '/')
1563            resp = h.getresponse()
1564            h.close()
1565            self.assertIn('nginx', resp.getheader('server'))
1566            resp.close()
1567
1568    @support.system_must_validate_cert
1569    def test_networked_trusted_by_default_cert(self):
1570        # Default settings: requires a valid cert from a trusted CA
1571        support.requires('network')
1572        with support.transient_internet('www.python.org'):
1573            h = client.HTTPSConnection('www.python.org', 443)
1574            h.request('GET', '/')
1575            resp = h.getresponse()
1576            content_type = resp.getheader('content-type')
1577            resp.close()
1578            h.close()
1579            self.assertIn('text/html', content_type)
1580
1581    def test_networked_good_cert(self):
1582        # We feed the server's cert as a validating cert
1583        import ssl
1584        support.requires('network')
1585        with support.transient_internet('self-signed.pythontest.net'):
1586            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1587            context.verify_mode = ssl.CERT_REQUIRED
1588            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1589            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1590            h.request('GET', '/')
1591            resp = h.getresponse()
1592            server_string = resp.getheader('server')
1593            resp.close()
1594            h.close()
1595            self.assertIn('nginx', server_string)
1596
1597    def test_networked_bad_cert(self):
1598        # We feed a "CA" cert that is unrelated to the server's cert
1599        import ssl
1600        support.requires('network')
1601        with support.transient_internet('self-signed.pythontest.net'):
1602            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1603            context.verify_mode = ssl.CERT_REQUIRED
1604            context.load_verify_locations(CERT_localhost)
1605            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1606            with self.assertRaises(ssl.SSLError) as exc_info:
1607                h.request('GET', '/')
1608            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1609
1610    def test_local_unknown_cert(self):
1611        # The custom cert isn't known to the default trust bundle
1612        import ssl
1613        server = self.make_server(CERT_localhost)
1614        h = client.HTTPSConnection('localhost', server.port)
1615        with self.assertRaises(ssl.SSLError) as exc_info:
1616            h.request('GET', '/')
1617        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1618
1619    def test_local_good_hostname(self):
1620        # The (valid) cert validates the HTTP hostname
1621        import ssl
1622        server = self.make_server(CERT_localhost)
1623        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1624        context.verify_mode = ssl.CERT_REQUIRED
1625        context.load_verify_locations(CERT_localhost)
1626        h = client.HTTPSConnection('localhost', server.port, context=context)
1627        self.addCleanup(h.close)
1628        h.request('GET', '/nonexistent')
1629        resp = h.getresponse()
1630        self.addCleanup(resp.close)
1631        self.assertEqual(resp.status, 404)
1632
1633    def test_local_bad_hostname(self):
1634        # The (valid) cert doesn't validate the HTTP hostname
1635        import ssl
1636        server = self.make_server(CERT_fakehostname)
1637        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1638        context.verify_mode = ssl.CERT_REQUIRED
1639        context.check_hostname = True
1640        context.load_verify_locations(CERT_fakehostname)
1641        h = client.HTTPSConnection('localhost', server.port, context=context)
1642        with self.assertRaises(ssl.CertificateError):
1643            h.request('GET', '/')
1644        # Same with explicit check_hostname=True
1645        with support.check_warnings(('', DeprecationWarning)):
1646            h = client.HTTPSConnection('localhost', server.port,
1647                                       context=context, check_hostname=True)
1648        with self.assertRaises(ssl.CertificateError):
1649            h.request('GET', '/')
1650        # With check_hostname=False, the mismatching is ignored
1651        context.check_hostname = False
1652        with support.check_warnings(('', DeprecationWarning)):
1653            h = client.HTTPSConnection('localhost', server.port,
1654                                       context=context, check_hostname=False)
1655        h.request('GET', '/nonexistent')
1656        resp = h.getresponse()
1657        resp.close()
1658        h.close()
1659        self.assertEqual(resp.status, 404)
1660        # The context's check_hostname setting is used if one isn't passed to
1661        # HTTPSConnection.
1662        context.check_hostname = False
1663        h = client.HTTPSConnection('localhost', server.port, context=context)
1664        h.request('GET', '/nonexistent')
1665        resp = h.getresponse()
1666        self.assertEqual(resp.status, 404)
1667        resp.close()
1668        h.close()
1669        # Passing check_hostname to HTTPSConnection should override the
1670        # context's setting.
1671        with support.check_warnings(('', DeprecationWarning)):
1672            h = client.HTTPSConnection('localhost', server.port,
1673                                       context=context, check_hostname=True)
1674        with self.assertRaises(ssl.CertificateError):
1675            h.request('GET', '/')
1676
1677    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1678                     'http.client.HTTPSConnection not available')
1679    def test_host_port(self):
1680        # Check invalid host_port
1681
1682        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1683            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1684
1685        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1686                          "fe80::207:e9ff:fe9b", 8000),
1687                         ("www.python.org:443", "www.python.org", 443),
1688                         ("www.python.org:", "www.python.org", 443),
1689                         ("www.python.org", "www.python.org", 443),
1690                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1691                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1692                             443)):
1693            c = client.HTTPSConnection(hp)
1694            self.assertEqual(h, c.host)
1695            self.assertEqual(p, c.port)
1696
1697
1698class RequestBodyTest(TestCase):
1699    """Test cases where a request includes a message body."""
1700
1701    def setUp(self):
1702        self.conn = client.HTTPConnection('example.com')
1703        self.conn.sock = self.sock = FakeSocket("")
1704        self.conn.sock = self.sock
1705
1706    def get_headers_and_fp(self):
1707        f = io.BytesIO(self.sock.data)
1708        f.readline()  # read the request line
1709        message = client.parse_headers(f)
1710        return message, f
1711
1712    def test_list_body(self):
1713        # Note that no content-length is automatically calculated for
1714        # an iterable.  The request will fall back to send chunked
1715        # transfer encoding.
1716        cases = (
1717            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1718            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1719        )
1720        for body, expected in cases:
1721            with self.subTest(body):
1722                self.conn = client.HTTPConnection('example.com')
1723                self.conn.sock = self.sock = FakeSocket('')
1724
1725                self.conn.request('PUT', '/url', body)
1726                msg, f = self.get_headers_and_fp()
1727                self.assertNotIn('Content-Type', msg)
1728                self.assertNotIn('Content-Length', msg)
1729                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1730                self.assertEqual(expected, f.read())
1731
1732    def test_manual_content_length(self):
1733        # Set an incorrect content-length so that we can verify that
1734        # it will not be over-ridden by the library.
1735        self.conn.request("PUT", "/url", "body",
1736                          {"Content-Length": "42"})
1737        message, f = self.get_headers_and_fp()
1738        self.assertEqual("42", message.get("content-length"))
1739        self.assertEqual(4, len(f.read()))
1740
1741    def test_ascii_body(self):
1742        self.conn.request("PUT", "/url", "body")
1743        message, f = self.get_headers_and_fp()
1744        self.assertEqual("text/plain", message.get_content_type())
1745        self.assertIsNone(message.get_charset())
1746        self.assertEqual("4", message.get("content-length"))
1747        self.assertEqual(b'body', f.read())
1748
1749    def test_latin1_body(self):
1750        self.conn.request("PUT", "/url", "body\xc1")
1751        message, f = self.get_headers_and_fp()
1752        self.assertEqual("text/plain", message.get_content_type())
1753        self.assertIsNone(message.get_charset())
1754        self.assertEqual("5", message.get("content-length"))
1755        self.assertEqual(b'body\xc1', f.read())
1756
1757    def test_bytes_body(self):
1758        self.conn.request("PUT", "/url", b"body\xc1")
1759        message, f = self.get_headers_and_fp()
1760        self.assertEqual("text/plain", message.get_content_type())
1761        self.assertIsNone(message.get_charset())
1762        self.assertEqual("5", message.get("content-length"))
1763        self.assertEqual(b'body\xc1', f.read())
1764
1765    def test_text_file_body(self):
1766        self.addCleanup(support.unlink, support.TESTFN)
1767        with open(support.TESTFN, "w") as f:
1768            f.write("body")
1769        with open(support.TESTFN) as f:
1770            self.conn.request("PUT", "/url", f)
1771            message, f = self.get_headers_and_fp()
1772            self.assertEqual("text/plain", message.get_content_type())
1773            self.assertIsNone(message.get_charset())
1774            # No content-length will be determined for files; the body
1775            # will be sent using chunked transfer encoding instead.
1776            self.assertIsNone(message.get("content-length"))
1777            self.assertEqual("chunked", message.get("transfer-encoding"))
1778            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1779
1780    def test_binary_file_body(self):
1781        self.addCleanup(support.unlink, support.TESTFN)
1782        with open(support.TESTFN, "wb") as f:
1783            f.write(b"body\xc1")
1784        with open(support.TESTFN, "rb") as f:
1785            self.conn.request("PUT", "/url", f)
1786            message, f = self.get_headers_and_fp()
1787            self.assertEqual("text/plain", message.get_content_type())
1788            self.assertIsNone(message.get_charset())
1789            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1790            self.assertNotIn("Content-Length", message)
1791            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1792
1793
1794class HTTPResponseTest(TestCase):
1795
1796    def setUp(self):
1797        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1798                second-value\r\n\r\nText"
1799        sock = FakeSocket(body)
1800        self.resp = client.HTTPResponse(sock)
1801        self.resp.begin()
1802
1803    def test_getting_header(self):
1804        header = self.resp.getheader('My-Header')
1805        self.assertEqual(header, 'first-value, second-value')
1806
1807        header = self.resp.getheader('My-Header', 'some default')
1808        self.assertEqual(header, 'first-value, second-value')
1809
1810    def test_getting_nonexistent_header_with_string_default(self):
1811        header = self.resp.getheader('No-Such-Header', 'default-value')
1812        self.assertEqual(header, 'default-value')
1813
1814    def test_getting_nonexistent_header_with_iterable_default(self):
1815        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1816        self.assertEqual(header, 'default, values')
1817
1818        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1819        self.assertEqual(header, 'default, values')
1820
1821    def test_getting_nonexistent_header_without_default(self):
1822        header = self.resp.getheader('No-Such-Header')
1823        self.assertEqual(header, None)
1824
1825    def test_getting_header_defaultint(self):
1826        header = self.resp.getheader('No-Such-Header',default=42)
1827        self.assertEqual(header, 42)
1828
1829class TunnelTests(TestCase):
1830    def setUp(self):
1831        response_text = (
1832            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1833            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1834            'Content-Length: 42\r\n\r\n'
1835        )
1836        self.host = 'proxy.com'
1837        self.conn = client.HTTPConnection(self.host)
1838        self.conn._create_connection = self._create_connection(response_text)
1839
1840    def tearDown(self):
1841        self.conn.close()
1842
1843    def _create_connection(self, response_text):
1844        def create_connection(address, timeout=None, source_address=None):
1845            return FakeSocket(response_text, host=address[0], port=address[1])
1846        return create_connection
1847
1848    def test_set_tunnel_host_port_headers(self):
1849        tunnel_host = 'destination.com'
1850        tunnel_port = 8888
1851        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
1852        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
1853                             headers=tunnel_headers)
1854        self.conn.request('HEAD', '/', '')
1855        self.assertEqual(self.conn.sock.host, self.host)
1856        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1857        self.assertEqual(self.conn._tunnel_host, tunnel_host)
1858        self.assertEqual(self.conn._tunnel_port, tunnel_port)
1859        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
1860
1861    def test_disallow_set_tunnel_after_connect(self):
1862        # Once connected, we shouldn't be able to tunnel anymore
1863        self.conn.connect()
1864        self.assertRaises(RuntimeError, self.conn.set_tunnel,
1865                          'destination.com')
1866
1867    def test_connect_with_tunnel(self):
1868        self.conn.set_tunnel('destination.com')
1869        self.conn.request('HEAD', '/', '')
1870        self.assertEqual(self.conn.sock.host, self.host)
1871        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1872        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1873        # issue22095
1874        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
1875        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1876
1877        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
1878        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
1879
1880    def test_connect_put_request(self):
1881        self.conn.set_tunnel('destination.com')
1882        self.conn.request('PUT', '/', '')
1883        self.assertEqual(self.conn.sock.host, self.host)
1884        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1885        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1886        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1887
1888    def test_tunnel_debuglog(self):
1889        expected_header = 'X-Dummy: 1'
1890        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
1891
1892        self.conn.set_debuglevel(1)
1893        self.conn._create_connection = self._create_connection(response_text)
1894        self.conn.set_tunnel('destination.com')
1895
1896        with support.captured_stdout() as output:
1897            self.conn.request('PUT', '/', '')
1898        lines = output.getvalue().splitlines()
1899        self.assertIn('header: {}'.format(expected_header), lines)
1900
1901
1902if __name__ == '__main__':
1903    unittest.main(verbosity=2)
1904