• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2from http import client
3import io
4import itertools
5import os
6import array
7import socket
8import threading
9
10import unittest
11TestCase = unittest.TestCase
12
13from test import support
14
15here = os.path.dirname(__file__)
16# Self-signed cert file for 'localhost'
17CERT_localhost = os.path.join(here, 'keycert.pem')
18# Self-signed cert file for 'fakehostname'
19CERT_fakehostname = os.path.join(here, 'keycert2.pem')
20# Self-signed cert file for self-signed.pythontest.net
21CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
22
23# constants for testing chunked encoding
24chunked_start = (
25    'HTTP/1.1 200 OK\r\n'
26    'Transfer-Encoding: chunked\r\n\r\n'
27    'a\r\n'
28    'hello worl\r\n'
29    '3\r\n'
30    'd! \r\n'
31    '8\r\n'
32    'and now \r\n'
33    '22\r\n'
34    'for something completely different\r\n'
35)
36chunked_expected = b'hello world! and now for something completely different'
37chunk_extension = ";foo=bar"
38last_chunk = "0\r\n"
39last_chunk_extended = "0" + chunk_extension + "\r\n"
40trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
41chunked_end = "\r\n"
42
43HOST = support.HOST
44
45class FakeSocket:
46    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
47        if isinstance(text, str):
48            text = text.encode("ascii")
49        self.text = text
50        self.fileclass = fileclass
51        self.data = b''
52        self.sendall_calls = 0
53        self.file_closed = False
54        self.host = host
55        self.port = port
56
57    def sendall(self, data):
58        self.sendall_calls += 1
59        self.data += data
60
61    def makefile(self, mode, bufsize=None):
62        if mode != 'r' and mode != 'rb':
63            raise client.UnimplementedFileMode()
64        # keep the file around so we can check how much was read from it
65        self.file = self.fileclass(self.text)
66        self.file.close = self.file_close #nerf close ()
67        return self.file
68
69    def file_close(self):
70        self.file_closed = True
71
72    def close(self):
73        pass
74
75    def setsockopt(self, level, optname, value):
76        pass
77
78class EPipeSocket(FakeSocket):
79
80    def __init__(self, text, pipe_trigger):
81        # When sendall() is called with pipe_trigger, raise EPIPE.
82        FakeSocket.__init__(self, text)
83        self.pipe_trigger = pipe_trigger
84
85    def sendall(self, data):
86        if self.pipe_trigger in data:
87            raise OSError(errno.EPIPE, "gotcha")
88        self.data += data
89
90    def close(self):
91        pass
92
93class NoEOFBytesIO(io.BytesIO):
94    """Like BytesIO, but raises AssertionError on EOF.
95
96    This is used below to test that http.client doesn't try to read
97    more from the underlying file than it should.
98    """
99    def read(self, n=-1):
100        data = io.BytesIO.read(self, n)
101        if data == b'':
102            raise AssertionError('caller tried to read past EOF')
103        return data
104
105    def readline(self, length=None):
106        data = io.BytesIO.readline(self, length)
107        if data == b'':
108            raise AssertionError('caller tried to read past EOF')
109        return data
110
111class FakeSocketHTTPConnection(client.HTTPConnection):
112    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
113
114    def __init__(self, *args):
115        self.connections = 0
116        super().__init__('example.com')
117        self.fake_socket_args = args
118        self._create_connection = self.create_connection
119
120    def connect(self):
121        """Count the number of times connect() is invoked"""
122        self.connections += 1
123        return super().connect()
124
125    def create_connection(self, *pos, **kw):
126        return FakeSocket(*self.fake_socket_args)
127
128class HeaderTests(TestCase):
129    def test_auto_headers(self):
130        # Some headers are added automatically, but should not be added by
131        # .request() if they are explicitly set.
132
133        class HeaderCountingBuffer(list):
134            def __init__(self):
135                self.count = {}
136            def append(self, item):
137                kv = item.split(b':')
138                if len(kv) > 1:
139                    # item is a 'Key: Value' header string
140                    lcKey = kv[0].decode('ascii').lower()
141                    self.count.setdefault(lcKey, 0)
142                    self.count[lcKey] += 1
143                list.append(self, item)
144
145        for explicit_header in True, False:
146            for header in 'Content-length', 'Host', 'Accept-encoding':
147                conn = client.HTTPConnection('example.com')
148                conn.sock = FakeSocket('blahblahblah')
149                conn._buffer = HeaderCountingBuffer()
150
151                body = 'spamspamspam'
152                headers = {}
153                if explicit_header:
154                    headers[header] = str(len(body))
155                conn.request('POST', '/', body, headers)
156                self.assertEqual(conn._buffer.count[header.lower()], 1)
157
158    def test_content_length_0(self):
159
160        class ContentLengthChecker(list):
161            def __init__(self):
162                list.__init__(self)
163                self.content_length = None
164            def append(self, item):
165                kv = item.split(b':', 1)
166                if len(kv) > 1 and kv[0].lower() == b'content-length':
167                    self.content_length = kv[1].strip()
168                list.append(self, item)
169
170        # Here, we're testing that methods expecting a body get a
171        # content-length set to zero if the body is empty (either None or '')
172        bodies = (None, '')
173        methods_with_body = ('PUT', 'POST', 'PATCH')
174        for method, body in itertools.product(methods_with_body, bodies):
175            conn = client.HTTPConnection('example.com')
176            conn.sock = FakeSocket(None)
177            conn._buffer = ContentLengthChecker()
178            conn.request(method, '/', body)
179            self.assertEqual(
180                conn._buffer.content_length, b'0',
181                'Header Content-Length incorrect on {}'.format(method)
182            )
183
184        # For these methods, we make sure that content-length is not set when
185        # the body is None because it might cause unexpected behaviour on the
186        # server.
187        methods_without_body = (
188             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
189        )
190        for method in methods_without_body:
191            conn = client.HTTPConnection('example.com')
192            conn.sock = FakeSocket(None)
193            conn._buffer = ContentLengthChecker()
194            conn.request(method, '/', None)
195            self.assertEqual(
196                conn._buffer.content_length, None,
197                'Header Content-Length set for empty body on {}'.format(method)
198            )
199
200        # If the body is set to '', that's considered to be "present but
201        # empty" rather than "missing", so content length would be set, even
202        # for methods that don't expect a body.
203        for method in methods_without_body:
204            conn = client.HTTPConnection('example.com')
205            conn.sock = FakeSocket(None)
206            conn._buffer = ContentLengthChecker()
207            conn.request(method, '/', '')
208            self.assertEqual(
209                conn._buffer.content_length, b'0',
210                'Header Content-Length incorrect on {}'.format(method)
211            )
212
213        # If the body is set, make sure Content-Length is set.
214        for method in itertools.chain(methods_without_body, methods_with_body):
215            conn = client.HTTPConnection('example.com')
216            conn.sock = FakeSocket(None)
217            conn._buffer = ContentLengthChecker()
218            conn.request(method, '/', ' ')
219            self.assertEqual(
220                conn._buffer.content_length, b'1',
221                'Header Content-Length incorrect on {}'.format(method)
222            )
223
224    def test_putheader(self):
225        conn = client.HTTPConnection('example.com')
226        conn.sock = FakeSocket(None)
227        conn.putrequest('GET','/')
228        conn.putheader('Content-length', 42)
229        self.assertIn(b'Content-length: 42', conn._buffer)
230
231        conn.putheader('Foo', ' bar ')
232        self.assertIn(b'Foo:  bar ', conn._buffer)
233        conn.putheader('Bar', '\tbaz\t')
234        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
235        conn.putheader('Authorization', 'Bearer mytoken')
236        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
237        conn.putheader('IterHeader', 'IterA', 'IterB')
238        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
239        conn.putheader('LatinHeader', b'\xFF')
240        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
241        conn.putheader('Utf8Header', b'\xc3\x80')
242        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
243        conn.putheader('C1-Control', b'next\x85line')
244        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
245        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
246        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
247        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
248        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
249        conn.putheader('Key Space', 'value')
250        self.assertIn(b'Key Space: value', conn._buffer)
251        conn.putheader('KeySpace ', 'value')
252        self.assertIn(b'KeySpace : value', conn._buffer)
253        conn.putheader(b'Nonbreak\xa0Space', 'value')
254        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
255        conn.putheader(b'\xa0NonbreakSpace', 'value')
256        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
257
258    def test_ipv6host_header(self):
259        # Default host header on IPv6 transaction should be wrapped by [] if
260        # it is an IPv6 address
261        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
262                   b'Accept-Encoding: identity\r\n\r\n'
263        conn = client.HTTPConnection('[2001::]:81')
264        sock = FakeSocket('')
265        conn.sock = sock
266        conn.request('GET', '/foo')
267        self.assertTrue(sock.data.startswith(expected))
268
269        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
270                   b'Accept-Encoding: identity\r\n\r\n'
271        conn = client.HTTPConnection('[2001:102A::]')
272        sock = FakeSocket('')
273        conn.sock = sock
274        conn.request('GET', '/foo')
275        self.assertTrue(sock.data.startswith(expected))
276
277    def test_malformed_headers_coped_with(self):
278        # Issue 19996
279        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
280        sock = FakeSocket(body)
281        resp = client.HTTPResponse(sock)
282        resp.begin()
283
284        self.assertEqual(resp.getheader('First'), 'val')
285        self.assertEqual(resp.getheader('Second'), 'val')
286
287    def test_parse_all_octets(self):
288        # Ensure no valid header field octet breaks the parser
289        body = (
290            b'HTTP/1.1 200 OK\r\n'
291            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
292            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
293            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
294            b'obs-fold: text\r\n'
295            b' folded with space\r\n'
296            b'\tfolded with tab\r\n'
297            b'Content-Length: 0\r\n'
298            b'\r\n'
299        )
300        sock = FakeSocket(body)
301        resp = client.HTTPResponse(sock)
302        resp.begin()
303        self.assertEqual(resp.getheader('Content-Length'), '0')
304        self.assertEqual(resp.msg['Content-Length'], '0')
305        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
306        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
307        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
308        self.assertEqual(resp.getheader('VCHAR'), vchar)
309        self.assertEqual(resp.msg['VCHAR'], vchar)
310        self.assertIsNotNone(resp.getheader('obs-text'))
311        self.assertIn('obs-text', resp.msg)
312        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
313            self.assertTrue(folded.startswith('text'))
314            self.assertIn(' folded with space', folded)
315            self.assertTrue(folded.endswith('folded with tab'))
316
317    def test_invalid_headers(self):
318        conn = client.HTTPConnection('example.com')
319        conn.sock = FakeSocket('')
320        conn.putrequest('GET', '/')
321
322        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
323        # longer allowed in header names
324        cases = (
325            (b'Invalid\r\nName', b'ValidValue'),
326            (b'Invalid\rName', b'ValidValue'),
327            (b'Invalid\nName', b'ValidValue'),
328            (b'\r\nInvalidName', b'ValidValue'),
329            (b'\rInvalidName', b'ValidValue'),
330            (b'\nInvalidName', b'ValidValue'),
331            (b' InvalidName', b'ValidValue'),
332            (b'\tInvalidName', b'ValidValue'),
333            (b'Invalid:Name', b'ValidValue'),
334            (b':InvalidName', b'ValidValue'),
335            (b'ValidName', b'Invalid\r\nValue'),
336            (b'ValidName', b'Invalid\rValue'),
337            (b'ValidName', b'Invalid\nValue'),
338            (b'ValidName', b'InvalidValue\r\n'),
339            (b'ValidName', b'InvalidValue\r'),
340            (b'ValidName', b'InvalidValue\n'),
341        )
342        for name, value in cases:
343            with self.subTest((name, value)):
344                with self.assertRaisesRegex(ValueError, 'Invalid header'):
345                    conn.putheader(name, value)
346
347    def test_headers_debuglevel(self):
348        body = (
349            b'HTTP/1.1 200 OK\r\n'
350            b'First: val\r\n'
351            b'Second: val\r\n'
352        )
353        sock = FakeSocket(body)
354        resp = client.HTTPResponse(sock, debuglevel=1)
355        with support.captured_stdout() as output:
356            resp.begin()
357        lines = output.getvalue().splitlines()
358        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
359        self.assertEqual(lines[1], "header: First: val")
360        self.assertEqual(lines[2], "header: Second: val")
361
362
363class TransferEncodingTest(TestCase):
364    expected_body = b"It's just a flesh wound"
365
366    def test_endheaders_chunked(self):
367        conn = client.HTTPConnection('example.com')
368        conn.sock = FakeSocket(b'')
369        conn.putrequest('POST', '/')
370        conn.endheaders(self._make_body(), encode_chunked=True)
371
372        _, _, body = self._parse_request(conn.sock.data)
373        body = self._parse_chunked(body)
374        self.assertEqual(body, self.expected_body)
375
376    def test_explicit_headers(self):
377        # explicit chunked
378        conn = client.HTTPConnection('example.com')
379        conn.sock = FakeSocket(b'')
380        # this shouldn't actually be automatically chunk-encoded because the
381        # calling code has explicitly stated that it's taking care of it
382        conn.request(
383            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
384
385        _, headers, body = self._parse_request(conn.sock.data)
386        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
387        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
388        self.assertEqual(body, self.expected_body)
389
390        # explicit chunked, string body
391        conn = client.HTTPConnection('example.com')
392        conn.sock = FakeSocket(b'')
393        conn.request(
394            'POST', '/', self.expected_body.decode('latin-1'),
395            {'Transfer-Encoding': 'chunked'})
396
397        _, headers, body = self._parse_request(conn.sock.data)
398        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
399        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
400        self.assertEqual(body, self.expected_body)
401
402        # User-specified TE, but request() does the chunk encoding
403        conn = client.HTTPConnection('example.com')
404        conn.sock = FakeSocket(b'')
405        conn.request('POST', '/',
406            headers={'Transfer-Encoding': 'gzip, chunked'},
407            encode_chunked=True,
408            body=self._make_body())
409        _, headers, body = self._parse_request(conn.sock.data)
410        self.assertNotIn('content-length', [k.lower() for k in headers])
411        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
412        self.assertEqual(self._parse_chunked(body), self.expected_body)
413
414    def test_request(self):
415        for empty_lines in (False, True,):
416            conn = client.HTTPConnection('example.com')
417            conn.sock = FakeSocket(b'')
418            conn.request(
419                'POST', '/', self._make_body(empty_lines=empty_lines))
420
421            _, headers, body = self._parse_request(conn.sock.data)
422            body = self._parse_chunked(body)
423            self.assertEqual(body, self.expected_body)
424            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
425
426            # Content-Length and Transfer-Encoding SHOULD not be sent in the
427            # same request
428            self.assertNotIn('content-length', [k.lower() for k in headers])
429
430    def test_empty_body(self):
431        # Zero-length iterable should be treated like any other iterable
432        conn = client.HTTPConnection('example.com')
433        conn.sock = FakeSocket(b'')
434        conn.request('POST', '/', ())
435        _, headers, body = self._parse_request(conn.sock.data)
436        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
437        self.assertNotIn('content-length', [k.lower() for k in headers])
438        self.assertEqual(body, b"0\r\n\r\n")
439
440    def _make_body(self, empty_lines=False):
441        lines = self.expected_body.split(b' ')
442        for idx, line in enumerate(lines):
443            # for testing handling empty lines
444            if empty_lines and idx % 2:
445                yield b''
446            if idx < len(lines) - 1:
447                yield line + b' '
448            else:
449                yield line
450
451    def _parse_request(self, data):
452        lines = data.split(b'\r\n')
453        request = lines[0]
454        headers = {}
455        n = 1
456        while n < len(lines) and len(lines[n]) > 0:
457            key, val = lines[n].split(b':')
458            key = key.decode('latin-1').strip()
459            headers[key] = val.decode('latin-1').strip()
460            n += 1
461
462        return request, headers, b'\r\n'.join(lines[n + 1:])
463
464    def _parse_chunked(self, data):
465        body = []
466        trailers = {}
467        n = 0
468        lines = data.split(b'\r\n')
469        # parse body
470        while True:
471            size, chunk = lines[n:n+2]
472            size = int(size, 16)
473
474            if size == 0:
475                n += 1
476                break
477
478            self.assertEqual(size, len(chunk))
479            body.append(chunk)
480
481            n += 2
482            # we /should/ hit the end chunk, but check against the size of
483            # lines so we're not stuck in an infinite loop should we get
484            # malformed data
485            if n > len(lines):
486                break
487
488        return b''.join(body)
489
490
491class BasicTest(TestCase):
492    def test_status_lines(self):
493        # Test HTTP status lines
494
495        body = "HTTP/1.1 200 Ok\r\n\r\nText"
496        sock = FakeSocket(body)
497        resp = client.HTTPResponse(sock)
498        resp.begin()
499        self.assertEqual(resp.read(0), b'')  # Issue #20007
500        self.assertFalse(resp.isclosed())
501        self.assertFalse(resp.closed)
502        self.assertEqual(resp.read(), b"Text")
503        self.assertTrue(resp.isclosed())
504        self.assertFalse(resp.closed)
505        resp.close()
506        self.assertTrue(resp.closed)
507
508        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
509        sock = FakeSocket(body)
510        resp = client.HTTPResponse(sock)
511        self.assertRaises(client.BadStatusLine, resp.begin)
512
513    def test_bad_status_repr(self):
514        exc = client.BadStatusLine('')
515        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
516
517    def test_partial_reads(self):
518        # if we have Content-Length, HTTPResponse knows when to close itself,
519        # the same behaviour as when we read the whole thing with read()
520        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
521        sock = FakeSocket(body)
522        resp = client.HTTPResponse(sock)
523        resp.begin()
524        self.assertEqual(resp.read(2), b'Te')
525        self.assertFalse(resp.isclosed())
526        self.assertEqual(resp.read(2), b'xt')
527        self.assertTrue(resp.isclosed())
528        self.assertFalse(resp.closed)
529        resp.close()
530        self.assertTrue(resp.closed)
531
532    def test_mixed_reads(self):
533        # readline() should update the remaining length, so that read() knows
534        # how much data is left and does not raise IncompleteRead
535        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
536        sock = FakeSocket(body)
537        resp = client.HTTPResponse(sock)
538        resp.begin()
539        self.assertEqual(resp.readline(), b'Text\r\n')
540        self.assertFalse(resp.isclosed())
541        self.assertEqual(resp.read(), b'Another')
542        self.assertTrue(resp.isclosed())
543        self.assertFalse(resp.closed)
544        resp.close()
545        self.assertTrue(resp.closed)
546
547    def test_partial_readintos(self):
548        # if we have Content-Length, HTTPResponse knows when to close itself,
549        # the same behaviour as when we read the whole thing with read()
550        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
551        sock = FakeSocket(body)
552        resp = client.HTTPResponse(sock)
553        resp.begin()
554        b = bytearray(2)
555        n = resp.readinto(b)
556        self.assertEqual(n, 2)
557        self.assertEqual(bytes(b), b'Te')
558        self.assertFalse(resp.isclosed())
559        n = resp.readinto(b)
560        self.assertEqual(n, 2)
561        self.assertEqual(bytes(b), b'xt')
562        self.assertTrue(resp.isclosed())
563        self.assertFalse(resp.closed)
564        resp.close()
565        self.assertTrue(resp.closed)
566
567    def test_partial_reads_no_content_length(self):
568        # when no length is present, the socket should be gracefully closed when
569        # all data was read
570        body = "HTTP/1.1 200 Ok\r\n\r\nText"
571        sock = FakeSocket(body)
572        resp = client.HTTPResponse(sock)
573        resp.begin()
574        self.assertEqual(resp.read(2), b'Te')
575        self.assertFalse(resp.isclosed())
576        self.assertEqual(resp.read(2), b'xt')
577        self.assertEqual(resp.read(1), b'')
578        self.assertTrue(resp.isclosed())
579        self.assertFalse(resp.closed)
580        resp.close()
581        self.assertTrue(resp.closed)
582
583    def test_partial_readintos_no_content_length(self):
584        # when no length is present, the socket should be gracefully closed when
585        # all data was read
586        body = "HTTP/1.1 200 Ok\r\n\r\nText"
587        sock = FakeSocket(body)
588        resp = client.HTTPResponse(sock)
589        resp.begin()
590        b = bytearray(2)
591        n = resp.readinto(b)
592        self.assertEqual(n, 2)
593        self.assertEqual(bytes(b), b'Te')
594        self.assertFalse(resp.isclosed())
595        n = resp.readinto(b)
596        self.assertEqual(n, 2)
597        self.assertEqual(bytes(b), b'xt')
598        n = resp.readinto(b)
599        self.assertEqual(n, 0)
600        self.assertTrue(resp.isclosed())
601
602    def test_partial_reads_incomplete_body(self):
603        # if the server shuts down the connection before the whole
604        # content-length is delivered, the socket is gracefully closed
605        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
606        sock = FakeSocket(body)
607        resp = client.HTTPResponse(sock)
608        resp.begin()
609        self.assertEqual(resp.read(2), b'Te')
610        self.assertFalse(resp.isclosed())
611        self.assertEqual(resp.read(2), b'xt')
612        self.assertEqual(resp.read(1), b'')
613        self.assertTrue(resp.isclosed())
614
615    def test_partial_readintos_incomplete_body(self):
616        # if the server shuts down the connection before the whole
617        # content-length is delivered, the socket is gracefully closed
618        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
619        sock = FakeSocket(body)
620        resp = client.HTTPResponse(sock)
621        resp.begin()
622        b = bytearray(2)
623        n = resp.readinto(b)
624        self.assertEqual(n, 2)
625        self.assertEqual(bytes(b), b'Te')
626        self.assertFalse(resp.isclosed())
627        n = resp.readinto(b)
628        self.assertEqual(n, 2)
629        self.assertEqual(bytes(b), b'xt')
630        n = resp.readinto(b)
631        self.assertEqual(n, 0)
632        self.assertTrue(resp.isclosed())
633        self.assertFalse(resp.closed)
634        resp.close()
635        self.assertTrue(resp.closed)
636
637    def test_host_port(self):
638        # Check invalid host_port
639
640        for hp in ("www.python.org:abc", "user:password@www.python.org"):
641            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
642
643        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
644                          "fe80::207:e9ff:fe9b", 8000),
645                         ("www.python.org:80", "www.python.org", 80),
646                         ("www.python.org:", "www.python.org", 80),
647                         ("www.python.org", "www.python.org", 80),
648                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
649                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
650            c = client.HTTPConnection(hp)
651            self.assertEqual(h, c.host)
652            self.assertEqual(p, c.port)
653
654    def test_response_headers(self):
655        # test response with multiple message headers with the same field name.
656        text = ('HTTP/1.1 200 OK\r\n'
657                'Set-Cookie: Customer="WILE_E_COYOTE"; '
658                'Version="1"; Path="/acme"\r\n'
659                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
660                ' Path="/acme"\r\n'
661                '\r\n'
662                'No body\r\n')
663        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
664               ', '
665               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
666        s = FakeSocket(text)
667        r = client.HTTPResponse(s)
668        r.begin()
669        cookies = r.getheader("Set-Cookie")
670        self.assertEqual(cookies, hdr)
671
672    def test_read_head(self):
673        # Test that the library doesn't attempt to read any data
674        # from a HEAD request.  (Tickles SF bug #622042.)
675        sock = FakeSocket(
676            'HTTP/1.1 200 OK\r\n'
677            'Content-Length: 14432\r\n'
678            '\r\n',
679            NoEOFBytesIO)
680        resp = client.HTTPResponse(sock, method="HEAD")
681        resp.begin()
682        if resp.read():
683            self.fail("Did not expect response from HEAD request")
684
685    def test_readinto_head(self):
686        # Test that the library doesn't attempt to read any data
687        # from a HEAD request.  (Tickles SF bug #622042.)
688        sock = FakeSocket(
689            'HTTP/1.1 200 OK\r\n'
690            'Content-Length: 14432\r\n'
691            '\r\n',
692            NoEOFBytesIO)
693        resp = client.HTTPResponse(sock, method="HEAD")
694        resp.begin()
695        b = bytearray(5)
696        if resp.readinto(b) != 0:
697            self.fail("Did not expect response from HEAD request")
698        self.assertEqual(bytes(b), b'\x00'*5)
699
700    def test_too_many_headers(self):
701        headers = '\r\n'.join('Header%d: foo' % i
702                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
703        text = ('HTTP/1.1 200 OK\r\n' + headers)
704        s = FakeSocket(text)
705        r = client.HTTPResponse(s)
706        self.assertRaisesRegex(client.HTTPException,
707                               r"got more than \d+ headers", r.begin)
708
709    def test_send_file(self):
710        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
711                    b'Accept-Encoding: identity\r\n'
712                    b'Transfer-Encoding: chunked\r\n'
713                    b'\r\n')
714
715        with open(__file__, 'rb') as body:
716            conn = client.HTTPConnection('example.com')
717            sock = FakeSocket(body)
718            conn.sock = sock
719            conn.request('GET', '/foo', body)
720            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
721                    (sock.data[:len(expected)], expected))
722
723    def test_send(self):
724        expected = b'this is a test this is only a test'
725        conn = client.HTTPConnection('example.com')
726        sock = FakeSocket(None)
727        conn.sock = sock
728        conn.send(expected)
729        self.assertEqual(expected, sock.data)
730        sock.data = b''
731        conn.send(array.array('b', expected))
732        self.assertEqual(expected, sock.data)
733        sock.data = b''
734        conn.send(io.BytesIO(expected))
735        self.assertEqual(expected, sock.data)
736
737    def test_send_updating_file(self):
738        def data():
739            yield 'data'
740            yield None
741            yield 'data_two'
742
743        class UpdatingFile(io.TextIOBase):
744            mode = 'r'
745            d = data()
746            def read(self, blocksize=-1):
747                return next(self.d)
748
749        expected = b'data'
750
751        conn = client.HTTPConnection('example.com')
752        sock = FakeSocket("")
753        conn.sock = sock
754        conn.send(UpdatingFile())
755        self.assertEqual(sock.data, expected)
756
757
758    def test_send_iter(self):
759        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
760                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
761                   b'\r\nonetwothree'
762
763        def body():
764            yield b"one"
765            yield b"two"
766            yield b"three"
767
768        conn = client.HTTPConnection('example.com')
769        sock = FakeSocket("")
770        conn.sock = sock
771        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
772        self.assertEqual(sock.data, expected)
773
774    def test_blocksize_request(self):
775        """Check that request() respects the configured block size."""
776        blocksize = 8  # For easy debugging.
777        conn = client.HTTPConnection('example.com', blocksize=blocksize)
778        sock = FakeSocket(None)
779        conn.sock = sock
780        expected = b"a" * blocksize + b"b"
781        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
782        self.assertEqual(sock.sendall_calls, 3)
783        body = sock.data.split(b"\r\n\r\n", 1)[1]
784        self.assertEqual(body, expected)
785
786    def test_blocksize_send(self):
787        """Check that send() respects the configured block size."""
788        blocksize = 8  # For easy debugging.
789        conn = client.HTTPConnection('example.com', blocksize=blocksize)
790        sock = FakeSocket(None)
791        conn.sock = sock
792        expected = b"a" * blocksize + b"b"
793        conn.send(io.BytesIO(expected))
794        self.assertEqual(sock.sendall_calls, 2)
795        self.assertEqual(sock.data, expected)
796
797    def test_send_type_error(self):
798        # See: Issue #12676
799        conn = client.HTTPConnection('example.com')
800        conn.sock = FakeSocket('')
801        with self.assertRaises(TypeError):
802            conn.request('POST', 'test', conn)
803
804    def test_chunked(self):
805        expected = chunked_expected
806        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
807        resp = client.HTTPResponse(sock, method="GET")
808        resp.begin()
809        self.assertEqual(resp.read(), expected)
810        resp.close()
811
812        # Various read sizes
813        for n in range(1, 12):
814            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
815            resp = client.HTTPResponse(sock, method="GET")
816            resp.begin()
817            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
818            resp.close()
819
820        for x in ('', 'foo\r\n'):
821            sock = FakeSocket(chunked_start + x)
822            resp = client.HTTPResponse(sock, method="GET")
823            resp.begin()
824            try:
825                resp.read()
826            except client.IncompleteRead as i:
827                self.assertEqual(i.partial, expected)
828                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
829                self.assertEqual(repr(i), expected_message)
830                self.assertEqual(str(i), expected_message)
831            else:
832                self.fail('IncompleteRead expected')
833            finally:
834                resp.close()
835
836    def test_readinto_chunked(self):
837
838        expected = chunked_expected
839        nexpected = len(expected)
840        b = bytearray(128)
841
842        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
843        resp = client.HTTPResponse(sock, method="GET")
844        resp.begin()
845        n = resp.readinto(b)
846        self.assertEqual(b[:nexpected], expected)
847        self.assertEqual(n, nexpected)
848        resp.close()
849
850        # Various read sizes
851        for n in range(1, 12):
852            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
853            resp = client.HTTPResponse(sock, method="GET")
854            resp.begin()
855            m = memoryview(b)
856            i = resp.readinto(m[0:n])
857            i += resp.readinto(m[i:n + i])
858            i += resp.readinto(m[i:])
859            self.assertEqual(b[:nexpected], expected)
860            self.assertEqual(i, nexpected)
861            resp.close()
862
863        for x in ('', 'foo\r\n'):
864            sock = FakeSocket(chunked_start + x)
865            resp = client.HTTPResponse(sock, method="GET")
866            resp.begin()
867            try:
868                n = resp.readinto(b)
869            except client.IncompleteRead as i:
870                self.assertEqual(i.partial, expected)
871                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
872                self.assertEqual(repr(i), expected_message)
873                self.assertEqual(str(i), expected_message)
874            else:
875                self.fail('IncompleteRead expected')
876            finally:
877                resp.close()
878
879    def test_chunked_head(self):
880        chunked_start = (
881            'HTTP/1.1 200 OK\r\n'
882            'Transfer-Encoding: chunked\r\n\r\n'
883            'a\r\n'
884            'hello world\r\n'
885            '1\r\n'
886            'd\r\n'
887        )
888        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
889        resp = client.HTTPResponse(sock, method="HEAD")
890        resp.begin()
891        self.assertEqual(resp.read(), b'')
892        self.assertEqual(resp.status, 200)
893        self.assertEqual(resp.reason, 'OK')
894        self.assertTrue(resp.isclosed())
895        self.assertFalse(resp.closed)
896        resp.close()
897        self.assertTrue(resp.closed)
898
899    def test_readinto_chunked_head(self):
900        chunked_start = (
901            'HTTP/1.1 200 OK\r\n'
902            'Transfer-Encoding: chunked\r\n\r\n'
903            'a\r\n'
904            'hello world\r\n'
905            '1\r\n'
906            'd\r\n'
907        )
908        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
909        resp = client.HTTPResponse(sock, method="HEAD")
910        resp.begin()
911        b = bytearray(5)
912        n = resp.readinto(b)
913        self.assertEqual(n, 0)
914        self.assertEqual(bytes(b), b'\x00'*5)
915        self.assertEqual(resp.status, 200)
916        self.assertEqual(resp.reason, 'OK')
917        self.assertTrue(resp.isclosed())
918        self.assertFalse(resp.closed)
919        resp.close()
920        self.assertTrue(resp.closed)
921
922    def test_negative_content_length(self):
923        sock = FakeSocket(
924            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
925        resp = client.HTTPResponse(sock, method="GET")
926        resp.begin()
927        self.assertEqual(resp.read(), b'Hello\r\n')
928        self.assertTrue(resp.isclosed())
929
930    def test_incomplete_read(self):
931        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
932        resp = client.HTTPResponse(sock, method="GET")
933        resp.begin()
934        try:
935            resp.read()
936        except client.IncompleteRead as i:
937            self.assertEqual(i.partial, b'Hello\r\n')
938            self.assertEqual(repr(i),
939                             "IncompleteRead(7 bytes read, 3 more expected)")
940            self.assertEqual(str(i),
941                             "IncompleteRead(7 bytes read, 3 more expected)")
942            self.assertTrue(resp.isclosed())
943        else:
944            self.fail('IncompleteRead expected')
945
946    def test_epipe(self):
947        sock = EPipeSocket(
948            "HTTP/1.0 401 Authorization Required\r\n"
949            "Content-type: text/html\r\n"
950            "WWW-Authenticate: Basic realm=\"example\"\r\n",
951            b"Content-Length")
952        conn = client.HTTPConnection("example.com")
953        conn.sock = sock
954        self.assertRaises(OSError,
955                          lambda: conn.request("PUT", "/url", "body"))
956        resp = conn.getresponse()
957        self.assertEqual(401, resp.status)
958        self.assertEqual("Basic realm=\"example\"",
959                         resp.getheader("www-authenticate"))
960
961    # Test lines overflowing the max line size (_MAXLINE in http.client)
962
963    def test_overflowing_status_line(self):
964        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
965        resp = client.HTTPResponse(FakeSocket(body))
966        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
967
968    def test_overflowing_header_line(self):
969        body = (
970            'HTTP/1.1 200 OK\r\n'
971            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
972        )
973        resp = client.HTTPResponse(FakeSocket(body))
974        self.assertRaises(client.LineTooLong, resp.begin)
975
976    def test_overflowing_chunked_line(self):
977        body = (
978            'HTTP/1.1 200 OK\r\n'
979            'Transfer-Encoding: chunked\r\n\r\n'
980            + '0' * 65536 + 'a\r\n'
981            'hello world\r\n'
982            '0\r\n'
983            '\r\n'
984        )
985        resp = client.HTTPResponse(FakeSocket(body))
986        resp.begin()
987        self.assertRaises(client.LineTooLong, resp.read)
988
989    def test_early_eof(self):
990        # Test httpresponse with no \r\n termination,
991        body = "HTTP/1.1 200 Ok"
992        sock = FakeSocket(body)
993        resp = client.HTTPResponse(sock)
994        resp.begin()
995        self.assertEqual(resp.read(), b'')
996        self.assertTrue(resp.isclosed())
997        self.assertFalse(resp.closed)
998        resp.close()
999        self.assertTrue(resp.closed)
1000
1001    def test_error_leak(self):
1002        # Test that the socket is not leaked if getresponse() fails
1003        conn = client.HTTPConnection('example.com')
1004        response = None
1005        class Response(client.HTTPResponse):
1006            def __init__(self, *pos, **kw):
1007                nonlocal response
1008                response = self  # Avoid garbage collector closing the socket
1009                client.HTTPResponse.__init__(self, *pos, **kw)
1010        conn.response_class = Response
1011        conn.sock = FakeSocket('Invalid status line')
1012        conn.request('GET', '/')
1013        self.assertRaises(client.BadStatusLine, conn.getresponse)
1014        self.assertTrue(response.closed)
1015        self.assertTrue(conn.sock.file_closed)
1016
1017    def test_chunked_extension(self):
1018        extra = '3;foo=bar\r\n' + 'abc\r\n'
1019        expected = chunked_expected + b'abc'
1020
1021        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1022        resp = client.HTTPResponse(sock, method="GET")
1023        resp.begin()
1024        self.assertEqual(resp.read(), expected)
1025        resp.close()
1026
1027    def test_chunked_missing_end(self):
1028        """some servers may serve up a short chunked encoding stream"""
1029        expected = chunked_expected
1030        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1031        resp = client.HTTPResponse(sock, method="GET")
1032        resp.begin()
1033        self.assertEqual(resp.read(), expected)
1034        resp.close()
1035
1036    def test_chunked_trailers(self):
1037        """See that trailers are read and ignored"""
1038        expected = chunked_expected
1039        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1040        resp = client.HTTPResponse(sock, method="GET")
1041        resp.begin()
1042        self.assertEqual(resp.read(), expected)
1043        # we should have reached the end of the file
1044        self.assertEqual(sock.file.read(), b"") #we read to the end
1045        resp.close()
1046
1047    def test_chunked_sync(self):
1048        """Check that we don't read past the end of the chunked-encoding stream"""
1049        expected = chunked_expected
1050        extradata = "extradata"
1051        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1052        resp = client.HTTPResponse(sock, method="GET")
1053        resp.begin()
1054        self.assertEqual(resp.read(), expected)
1055        # the file should now have our extradata ready to be read
1056        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1057        resp.close()
1058
1059    def test_content_length_sync(self):
1060        """Check that we don't read past the end of the Content-Length stream"""
1061        extradata = b"extradata"
1062        expected = b"Hello123\r\n"
1063        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1064        resp = client.HTTPResponse(sock, method="GET")
1065        resp.begin()
1066        self.assertEqual(resp.read(), expected)
1067        # the file should now have our extradata ready to be read
1068        self.assertEqual(sock.file.read(), extradata) #we read to the end
1069        resp.close()
1070
1071    def test_readlines_content_length(self):
1072        extradata = b"extradata"
1073        expected = b"Hello123\r\n"
1074        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1075        resp = client.HTTPResponse(sock, method="GET")
1076        resp.begin()
1077        self.assertEqual(resp.readlines(2000), [expected])
1078        # the file should now have our extradata ready to be read
1079        self.assertEqual(sock.file.read(), extradata) #we read to the end
1080        resp.close()
1081
1082    def test_read1_content_length(self):
1083        extradata = b"extradata"
1084        expected = b"Hello123\r\n"
1085        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1086        resp = client.HTTPResponse(sock, method="GET")
1087        resp.begin()
1088        self.assertEqual(resp.read1(2000), expected)
1089        # the file should now have our extradata ready to be read
1090        self.assertEqual(sock.file.read(), extradata) #we read to the end
1091        resp.close()
1092
1093    def test_readline_bound_content_length(self):
1094        extradata = b"extradata"
1095        expected = b"Hello123\r\n"
1096        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1097        resp = client.HTTPResponse(sock, method="GET")
1098        resp.begin()
1099        self.assertEqual(resp.readline(10), expected)
1100        self.assertEqual(resp.readline(10), b"")
1101        # the file should now have our extradata ready to be read
1102        self.assertEqual(sock.file.read(), extradata) #we read to the end
1103        resp.close()
1104
1105    def test_read1_bound_content_length(self):
1106        extradata = b"extradata"
1107        expected = b"Hello123\r\n"
1108        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1109        resp = client.HTTPResponse(sock, method="GET")
1110        resp.begin()
1111        self.assertEqual(resp.read1(20), expected*2)
1112        self.assertEqual(resp.read(), expected)
1113        # the file should now have our extradata ready to be read
1114        self.assertEqual(sock.file.read(), extradata) #we read to the end
1115        resp.close()
1116
1117    def test_response_fileno(self):
1118        # Make sure fd returned by fileno is valid.
1119        serv = socket.socket(
1120            socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
1121        self.addCleanup(serv.close)
1122        serv.bind((HOST, 0))
1123        serv.listen()
1124
1125        result = None
1126        def run_server():
1127            [conn, address] = serv.accept()
1128            with conn, conn.makefile("rb") as reader:
1129                # Read the request header until a blank line
1130                while True:
1131                    line = reader.readline()
1132                    if not line.rstrip(b"\r\n"):
1133                        break
1134                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1135                nonlocal result
1136                result = reader.read()
1137
1138        thread = threading.Thread(target=run_server)
1139        thread.start()
1140        self.addCleanup(thread.join, float(1))
1141        conn = client.HTTPConnection(*serv.getsockname())
1142        conn.request("CONNECT", "dummy:1234")
1143        response = conn.getresponse()
1144        try:
1145            self.assertEqual(response.status, client.OK)
1146            s = socket.socket(fileno=response.fileno())
1147            try:
1148                s.sendall(b"proxied data\n")
1149            finally:
1150                s.detach()
1151        finally:
1152            response.close()
1153            conn.close()
1154        thread.join()
1155        self.assertEqual(result, b"proxied data\n")
1156
1157class ExtendedReadTest(TestCase):
1158    """
1159    Test peek(), read1(), readline()
1160    """
1161    lines = (
1162        'HTTP/1.1 200 OK\r\n'
1163        '\r\n'
1164        'hello world!\n'
1165        'and now \n'
1166        'for something completely different\n'
1167        'foo'
1168        )
1169    lines_expected = lines[lines.find('hello'):].encode("ascii")
1170    lines_chunked = (
1171        'HTTP/1.1 200 OK\r\n'
1172        'Transfer-Encoding: chunked\r\n\r\n'
1173        'a\r\n'
1174        'hello worl\r\n'
1175        '3\r\n'
1176        'd!\n\r\n'
1177        '9\r\n'
1178        'and now \n\r\n'
1179        '23\r\n'
1180        'for something completely different\n\r\n'
1181        '3\r\n'
1182        'foo\r\n'
1183        '0\r\n' # terminating chunk
1184        '\r\n'  # end of trailers
1185    )
1186
1187    def setUp(self):
1188        sock = FakeSocket(self.lines)
1189        resp = client.HTTPResponse(sock, method="GET")
1190        resp.begin()
1191        resp.fp = io.BufferedReader(resp.fp)
1192        self.resp = resp
1193
1194
1195
1196    def test_peek(self):
1197        resp = self.resp
1198        # patch up the buffered peek so that it returns not too much stuff
1199        oldpeek = resp.fp.peek
1200        def mypeek(n=-1):
1201            p = oldpeek(n)
1202            if n >= 0:
1203                return p[:n]
1204            return p[:10]
1205        resp.fp.peek = mypeek
1206
1207        all = []
1208        while True:
1209            # try a short peek
1210            p = resp.peek(3)
1211            if p:
1212                self.assertGreater(len(p), 0)
1213                # then unbounded peek
1214                p2 = resp.peek()
1215                self.assertGreaterEqual(len(p2), len(p))
1216                self.assertTrue(p2.startswith(p))
1217                next = resp.read(len(p2))
1218                self.assertEqual(next, p2)
1219            else:
1220                next = resp.read()
1221                self.assertFalse(next)
1222            all.append(next)
1223            if not next:
1224                break
1225        self.assertEqual(b"".join(all), self.lines_expected)
1226
1227    def test_readline(self):
1228        resp = self.resp
1229        self._verify_readline(self.resp.readline, self.lines_expected)
1230
1231    def _verify_readline(self, readline, expected):
1232        all = []
1233        while True:
1234            # short readlines
1235            line = readline(5)
1236            if line and line != b"foo":
1237                if len(line) < 5:
1238                    self.assertTrue(line.endswith(b"\n"))
1239            all.append(line)
1240            if not line:
1241                break
1242        self.assertEqual(b"".join(all), expected)
1243
1244    def test_read1(self):
1245        resp = self.resp
1246        def r():
1247            res = resp.read1(4)
1248            self.assertLessEqual(len(res), 4)
1249            return res
1250        readliner = Readliner(r)
1251        self._verify_readline(readliner.readline, self.lines_expected)
1252
1253    def test_read1_unbounded(self):
1254        resp = self.resp
1255        all = []
1256        while True:
1257            data = resp.read1()
1258            if not data:
1259                break
1260            all.append(data)
1261        self.assertEqual(b"".join(all), self.lines_expected)
1262
1263    def test_read1_bounded(self):
1264        resp = self.resp
1265        all = []
1266        while True:
1267            data = resp.read1(10)
1268            if not data:
1269                break
1270            self.assertLessEqual(len(data), 10)
1271            all.append(data)
1272        self.assertEqual(b"".join(all), self.lines_expected)
1273
1274    def test_read1_0(self):
1275        self.assertEqual(self.resp.read1(0), b"")
1276
1277    def test_peek_0(self):
1278        p = self.resp.peek(0)
1279        self.assertLessEqual(0, len(p))
1280
1281class ExtendedReadTestChunked(ExtendedReadTest):
1282    """
1283    Test peek(), read1(), readline() in chunked mode
1284    """
1285    lines = (
1286        'HTTP/1.1 200 OK\r\n'
1287        'Transfer-Encoding: chunked\r\n\r\n'
1288        'a\r\n'
1289        'hello worl\r\n'
1290        '3\r\n'
1291        'd!\n\r\n'
1292        '9\r\n'
1293        'and now \n\r\n'
1294        '23\r\n'
1295        'for something completely different\n\r\n'
1296        '3\r\n'
1297        'foo\r\n'
1298        '0\r\n' # terminating chunk
1299        '\r\n'  # end of trailers
1300    )
1301
1302
1303class Readliner:
1304    """
1305    a simple readline class that uses an arbitrary read function and buffering
1306    """
1307    def __init__(self, readfunc):
1308        self.readfunc = readfunc
1309        self.remainder = b""
1310
1311    def readline(self, limit):
1312        data = []
1313        datalen = 0
1314        read = self.remainder
1315        try:
1316            while True:
1317                idx = read.find(b'\n')
1318                if idx != -1:
1319                    break
1320                if datalen + len(read) >= limit:
1321                    idx = limit - datalen - 1
1322                # read more data
1323                data.append(read)
1324                read = self.readfunc()
1325                if not read:
1326                    idx = 0 #eof condition
1327                    break
1328            idx += 1
1329            data.append(read[:idx])
1330            self.remainder = read[idx:]
1331            return b"".join(data)
1332        except:
1333            self.remainder = b"".join(data)
1334            raise
1335
1336
1337class OfflineTest(TestCase):
1338    def test_all(self):
1339        # Documented objects defined in the module should be in __all__
1340        expected = {"responses"}  # White-list documented dict() object
1341        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1342        # intentionally omitted for simplicity
1343        blacklist = {"HTTPMessage", "parse_headers"}
1344        for name in dir(client):
1345            if name.startswith("_") or name in blacklist:
1346                continue
1347            module_object = getattr(client, name)
1348            if getattr(module_object, "__module__", None) == "http.client":
1349                expected.add(name)
1350        self.assertCountEqual(client.__all__, expected)
1351
1352    def test_responses(self):
1353        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1354
1355    def test_client_constants(self):
1356        # Make sure we don't break backward compatibility with 3.4
1357        expected = [
1358            'CONTINUE',
1359            'SWITCHING_PROTOCOLS',
1360            'PROCESSING',
1361            'OK',
1362            'CREATED',
1363            'ACCEPTED',
1364            'NON_AUTHORITATIVE_INFORMATION',
1365            'NO_CONTENT',
1366            'RESET_CONTENT',
1367            'PARTIAL_CONTENT',
1368            'MULTI_STATUS',
1369            'IM_USED',
1370            'MULTIPLE_CHOICES',
1371            'MOVED_PERMANENTLY',
1372            'FOUND',
1373            'SEE_OTHER',
1374            'NOT_MODIFIED',
1375            'USE_PROXY',
1376            'TEMPORARY_REDIRECT',
1377            'BAD_REQUEST',
1378            'UNAUTHORIZED',
1379            'PAYMENT_REQUIRED',
1380            'FORBIDDEN',
1381            'NOT_FOUND',
1382            'METHOD_NOT_ALLOWED',
1383            'NOT_ACCEPTABLE',
1384            'PROXY_AUTHENTICATION_REQUIRED',
1385            'REQUEST_TIMEOUT',
1386            'CONFLICT',
1387            'GONE',
1388            'LENGTH_REQUIRED',
1389            'PRECONDITION_FAILED',
1390            'REQUEST_ENTITY_TOO_LARGE',
1391            'REQUEST_URI_TOO_LONG',
1392            'UNSUPPORTED_MEDIA_TYPE',
1393            'REQUESTED_RANGE_NOT_SATISFIABLE',
1394            'EXPECTATION_FAILED',
1395            'MISDIRECTED_REQUEST',
1396            'UNPROCESSABLE_ENTITY',
1397            'LOCKED',
1398            'FAILED_DEPENDENCY',
1399            'UPGRADE_REQUIRED',
1400            'PRECONDITION_REQUIRED',
1401            'TOO_MANY_REQUESTS',
1402            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1403            'INTERNAL_SERVER_ERROR',
1404            'NOT_IMPLEMENTED',
1405            'BAD_GATEWAY',
1406            'SERVICE_UNAVAILABLE',
1407            'GATEWAY_TIMEOUT',
1408            'HTTP_VERSION_NOT_SUPPORTED',
1409            'INSUFFICIENT_STORAGE',
1410            'NOT_EXTENDED',
1411            'NETWORK_AUTHENTICATION_REQUIRED',
1412        ]
1413        for const in expected:
1414            with self.subTest(constant=const):
1415                self.assertTrue(hasattr(client, const))
1416
1417
1418class SourceAddressTest(TestCase):
1419    def setUp(self):
1420        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1421        self.port = support.bind_port(self.serv)
1422        self.source_port = support.find_unused_port()
1423        self.serv.listen()
1424        self.conn = None
1425
1426    def tearDown(self):
1427        if self.conn:
1428            self.conn.close()
1429            self.conn = None
1430        self.serv.close()
1431        self.serv = None
1432
1433    def testHTTPConnectionSourceAddress(self):
1434        self.conn = client.HTTPConnection(HOST, self.port,
1435                source_address=('', self.source_port))
1436        self.conn.connect()
1437        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1438
1439    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1440                     'http.client.HTTPSConnection not defined')
1441    def testHTTPSConnectionSourceAddress(self):
1442        self.conn = client.HTTPSConnection(HOST, self.port,
1443                source_address=('', self.source_port))
1444        # We don't test anything here other than the constructor not barfing as
1445        # this code doesn't deal with setting up an active running SSL server
1446        # for an ssl_wrapped connect() to actually return from.
1447
1448
1449class TimeoutTest(TestCase):
1450    PORT = None
1451
1452    def setUp(self):
1453        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1454        TimeoutTest.PORT = support.bind_port(self.serv)
1455        self.serv.listen()
1456
1457    def tearDown(self):
1458        self.serv.close()
1459        self.serv = None
1460
1461    def testTimeoutAttribute(self):
1462        # This will prove that the timeout gets through HTTPConnection
1463        # and into the socket.
1464
1465        # default -- use global socket timeout
1466        self.assertIsNone(socket.getdefaulttimeout())
1467        socket.setdefaulttimeout(30)
1468        try:
1469            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1470            httpConn.connect()
1471        finally:
1472            socket.setdefaulttimeout(None)
1473        self.assertEqual(httpConn.sock.gettimeout(), 30)
1474        httpConn.close()
1475
1476        # no timeout -- do not use global socket default
1477        self.assertIsNone(socket.getdefaulttimeout())
1478        socket.setdefaulttimeout(30)
1479        try:
1480            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1481                                              timeout=None)
1482            httpConn.connect()
1483        finally:
1484            socket.setdefaulttimeout(None)
1485        self.assertEqual(httpConn.sock.gettimeout(), None)
1486        httpConn.close()
1487
1488        # a value
1489        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1490        httpConn.connect()
1491        self.assertEqual(httpConn.sock.gettimeout(), 30)
1492        httpConn.close()
1493
1494
1495class PersistenceTest(TestCase):
1496
1497    def test_reuse_reconnect(self):
1498        # Should reuse or reconnect depending on header from server
1499        tests = (
1500            ('1.0', '', False),
1501            ('1.0', 'Connection: keep-alive\r\n', True),
1502            ('1.1', '', True),
1503            ('1.1', 'Connection: close\r\n', False),
1504            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1505            ('1.1', 'Connection: cloSE\r\n', False),
1506        )
1507        for version, header, reuse in tests:
1508            with self.subTest(version=version, header=header):
1509                msg = (
1510                    'HTTP/{} 200 OK\r\n'
1511                    '{}'
1512                    'Content-Length: 12\r\n'
1513                    '\r\n'
1514                    'Dummy body\r\n'
1515                ).format(version, header)
1516                conn = FakeSocketHTTPConnection(msg)
1517                self.assertIsNone(conn.sock)
1518                conn.request('GET', '/open-connection')
1519                with conn.getresponse() as response:
1520                    self.assertEqual(conn.sock is None, not reuse)
1521                    response.read()
1522                self.assertEqual(conn.sock is None, not reuse)
1523                self.assertEqual(conn.connections, 1)
1524                conn.request('GET', '/subsequent-request')
1525                self.assertEqual(conn.connections, 1 if reuse else 2)
1526
1527    def test_disconnected(self):
1528
1529        def make_reset_reader(text):
1530            """Return BufferedReader that raises ECONNRESET at EOF"""
1531            stream = io.BytesIO(text)
1532            def readinto(buffer):
1533                size = io.BytesIO.readinto(stream, buffer)
1534                if size == 0:
1535                    raise ConnectionResetError()
1536                return size
1537            stream.readinto = readinto
1538            return io.BufferedReader(stream)
1539
1540        tests = (
1541            (io.BytesIO, client.RemoteDisconnected),
1542            (make_reset_reader, ConnectionResetError),
1543        )
1544        for stream_factory, exception in tests:
1545            with self.subTest(exception=exception):
1546                conn = FakeSocketHTTPConnection(b'', stream_factory)
1547                conn.request('GET', '/eof-response')
1548                self.assertRaises(exception, conn.getresponse)
1549                self.assertIsNone(conn.sock)
1550                # HTTPConnection.connect() should be automatically invoked
1551                conn.request('GET', '/reconnect')
1552                self.assertEqual(conn.connections, 2)
1553
1554    def test_100_close(self):
1555        conn = FakeSocketHTTPConnection(
1556            b'HTTP/1.1 100 Continue\r\n'
1557            b'\r\n'
1558            # Missing final response
1559        )
1560        conn.request('GET', '/', headers={'Expect': '100-continue'})
1561        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1562        self.assertIsNone(conn.sock)
1563        conn.request('GET', '/reconnect')
1564        self.assertEqual(conn.connections, 2)
1565
1566
1567class HTTPSTest(TestCase):
1568
1569    def setUp(self):
1570        if not hasattr(client, 'HTTPSConnection'):
1571            self.skipTest('ssl support required')
1572
1573    def make_server(self, certfile):
1574        from test.ssl_servers import make_https_server
1575        return make_https_server(self, certfile=certfile)
1576
1577    def test_attributes(self):
1578        # simple test to check it's storing the timeout
1579        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1580        self.assertEqual(h.timeout, 30)
1581
1582    def test_networked(self):
1583        # Default settings: requires a valid cert from a trusted CA
1584        import ssl
1585        support.requires('network')
1586        with support.transient_internet('self-signed.pythontest.net'):
1587            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1588            with self.assertRaises(ssl.SSLError) as exc_info:
1589                h.request('GET', '/')
1590            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1591
1592    def test_networked_noverification(self):
1593        # Switch off cert verification
1594        import ssl
1595        support.requires('network')
1596        with support.transient_internet('self-signed.pythontest.net'):
1597            context = ssl._create_unverified_context()
1598            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1599                                       context=context)
1600            h.request('GET', '/')
1601            resp = h.getresponse()
1602            h.close()
1603            self.assertIn('nginx', resp.getheader('server'))
1604            resp.close()
1605
1606    @support.system_must_validate_cert
1607    def test_networked_trusted_by_default_cert(self):
1608        # Default settings: requires a valid cert from a trusted CA
1609        support.requires('network')
1610        with support.transient_internet('www.python.org'):
1611            h = client.HTTPSConnection('www.python.org', 443)
1612            h.request('GET', '/')
1613            resp = h.getresponse()
1614            content_type = resp.getheader('content-type')
1615            resp.close()
1616            h.close()
1617            self.assertIn('text/html', content_type)
1618
1619    def test_networked_good_cert(self):
1620        # We feed the server's cert as a validating cert
1621        import ssl
1622        support.requires('network')
1623        with support.transient_internet('self-signed.pythontest.net'):
1624            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1625            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1626            self.assertEqual(context.check_hostname, True)
1627            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1628            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1629            h.request('GET', '/')
1630            resp = h.getresponse()
1631            server_string = resp.getheader('server')
1632            resp.close()
1633            h.close()
1634            self.assertIn('nginx', server_string)
1635
1636    def test_networked_bad_cert(self):
1637        # We feed a "CA" cert that is unrelated to the server's cert
1638        import ssl
1639        support.requires('network')
1640        with support.transient_internet('self-signed.pythontest.net'):
1641            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1642            context.load_verify_locations(CERT_localhost)
1643            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1644            with self.assertRaises(ssl.SSLError) as exc_info:
1645                h.request('GET', '/')
1646            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1647
1648    def test_local_unknown_cert(self):
1649        # The custom cert isn't known to the default trust bundle
1650        import ssl
1651        server = self.make_server(CERT_localhost)
1652        h = client.HTTPSConnection('localhost', server.port)
1653        with self.assertRaises(ssl.SSLError) as exc_info:
1654            h.request('GET', '/')
1655        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1656
1657    def test_local_good_hostname(self):
1658        # The (valid) cert validates the HTTP hostname
1659        import ssl
1660        server = self.make_server(CERT_localhost)
1661        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1662        context.load_verify_locations(CERT_localhost)
1663        h = client.HTTPSConnection('localhost', server.port, context=context)
1664        self.addCleanup(h.close)
1665        h.request('GET', '/nonexistent')
1666        resp = h.getresponse()
1667        self.addCleanup(resp.close)
1668        self.assertEqual(resp.status, 404)
1669
1670    def test_local_bad_hostname(self):
1671        # The (valid) cert doesn't validate the HTTP hostname
1672        import ssl
1673        server = self.make_server(CERT_fakehostname)
1674        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1675        context.load_verify_locations(CERT_fakehostname)
1676        h = client.HTTPSConnection('localhost', server.port, context=context)
1677        with self.assertRaises(ssl.CertificateError):
1678            h.request('GET', '/')
1679        # Same with explicit check_hostname=True
1680        with support.check_warnings(('', DeprecationWarning)):
1681            h = client.HTTPSConnection('localhost', server.port,
1682                                       context=context, check_hostname=True)
1683        with self.assertRaises(ssl.CertificateError):
1684            h.request('GET', '/')
1685        # With check_hostname=False, the mismatching is ignored
1686        context.check_hostname = False
1687        with support.check_warnings(('', DeprecationWarning)):
1688            h = client.HTTPSConnection('localhost', server.port,
1689                                       context=context, check_hostname=False)
1690        h.request('GET', '/nonexistent')
1691        resp = h.getresponse()
1692        resp.close()
1693        h.close()
1694        self.assertEqual(resp.status, 404)
1695        # The context's check_hostname setting is used if one isn't passed to
1696        # HTTPSConnection.
1697        context.check_hostname = False
1698        h = client.HTTPSConnection('localhost', server.port, context=context)
1699        h.request('GET', '/nonexistent')
1700        resp = h.getresponse()
1701        self.assertEqual(resp.status, 404)
1702        resp.close()
1703        h.close()
1704        # Passing check_hostname to HTTPSConnection should override the
1705        # context's setting.
1706        with support.check_warnings(('', DeprecationWarning)):
1707            h = client.HTTPSConnection('localhost', server.port,
1708                                       context=context, check_hostname=True)
1709        with self.assertRaises(ssl.CertificateError):
1710            h.request('GET', '/')
1711
1712    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1713                     'http.client.HTTPSConnection not available')
1714    def test_host_port(self):
1715        # Check invalid host_port
1716
1717        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1718            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1719
1720        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1721                          "fe80::207:e9ff:fe9b", 8000),
1722                         ("www.python.org:443", "www.python.org", 443),
1723                         ("www.python.org:", "www.python.org", 443),
1724                         ("www.python.org", "www.python.org", 443),
1725                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1726                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1727                             443)):
1728            c = client.HTTPSConnection(hp)
1729            self.assertEqual(h, c.host)
1730            self.assertEqual(p, c.port)
1731
1732
1733class RequestBodyTest(TestCase):
1734    """Test cases where a request includes a message body."""
1735
1736    def setUp(self):
1737        self.conn = client.HTTPConnection('example.com')
1738        self.conn.sock = self.sock = FakeSocket("")
1739        self.conn.sock = self.sock
1740
1741    def get_headers_and_fp(self):
1742        f = io.BytesIO(self.sock.data)
1743        f.readline()  # read the request line
1744        message = client.parse_headers(f)
1745        return message, f
1746
1747    def test_list_body(self):
1748        # Note that no content-length is automatically calculated for
1749        # an iterable.  The request will fall back to send chunked
1750        # transfer encoding.
1751        cases = (
1752            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1753            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1754        )
1755        for body, expected in cases:
1756            with self.subTest(body):
1757                self.conn = client.HTTPConnection('example.com')
1758                self.conn.sock = self.sock = FakeSocket('')
1759
1760                self.conn.request('PUT', '/url', body)
1761                msg, f = self.get_headers_and_fp()
1762                self.assertNotIn('Content-Type', msg)
1763                self.assertNotIn('Content-Length', msg)
1764                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1765                self.assertEqual(expected, f.read())
1766
1767    def test_manual_content_length(self):
1768        # Set an incorrect content-length so that we can verify that
1769        # it will not be over-ridden by the library.
1770        self.conn.request("PUT", "/url", "body",
1771                          {"Content-Length": "42"})
1772        message, f = self.get_headers_and_fp()
1773        self.assertEqual("42", message.get("content-length"))
1774        self.assertEqual(4, len(f.read()))
1775
1776    def test_ascii_body(self):
1777        self.conn.request("PUT", "/url", "body")
1778        message, f = self.get_headers_and_fp()
1779        self.assertEqual("text/plain", message.get_content_type())
1780        self.assertIsNone(message.get_charset())
1781        self.assertEqual("4", message.get("content-length"))
1782        self.assertEqual(b'body', f.read())
1783
1784    def test_latin1_body(self):
1785        self.conn.request("PUT", "/url", "body\xc1")
1786        message, f = self.get_headers_and_fp()
1787        self.assertEqual("text/plain", message.get_content_type())
1788        self.assertIsNone(message.get_charset())
1789        self.assertEqual("5", message.get("content-length"))
1790        self.assertEqual(b'body\xc1', f.read())
1791
1792    def test_bytes_body(self):
1793        self.conn.request("PUT", "/url", b"body\xc1")
1794        message, f = self.get_headers_and_fp()
1795        self.assertEqual("text/plain", message.get_content_type())
1796        self.assertIsNone(message.get_charset())
1797        self.assertEqual("5", message.get("content-length"))
1798        self.assertEqual(b'body\xc1', f.read())
1799
1800    def test_text_file_body(self):
1801        self.addCleanup(support.unlink, support.TESTFN)
1802        with open(support.TESTFN, "w") as f:
1803            f.write("body")
1804        with open(support.TESTFN) as f:
1805            self.conn.request("PUT", "/url", f)
1806            message, f = self.get_headers_and_fp()
1807            self.assertEqual("text/plain", message.get_content_type())
1808            self.assertIsNone(message.get_charset())
1809            # No content-length will be determined for files; the body
1810            # will be sent using chunked transfer encoding instead.
1811            self.assertIsNone(message.get("content-length"))
1812            self.assertEqual("chunked", message.get("transfer-encoding"))
1813            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1814
1815    def test_binary_file_body(self):
1816        self.addCleanup(support.unlink, support.TESTFN)
1817        with open(support.TESTFN, "wb") as f:
1818            f.write(b"body\xc1")
1819        with open(support.TESTFN, "rb") as f:
1820            self.conn.request("PUT", "/url", f)
1821            message, f = self.get_headers_and_fp()
1822            self.assertEqual("text/plain", message.get_content_type())
1823            self.assertIsNone(message.get_charset())
1824            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1825            self.assertNotIn("Content-Length", message)
1826            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1827
1828
1829class HTTPResponseTest(TestCase):
1830
1831    def setUp(self):
1832        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1833                second-value\r\n\r\nText"
1834        sock = FakeSocket(body)
1835        self.resp = client.HTTPResponse(sock)
1836        self.resp.begin()
1837
1838    def test_getting_header(self):
1839        header = self.resp.getheader('My-Header')
1840        self.assertEqual(header, 'first-value, second-value')
1841
1842        header = self.resp.getheader('My-Header', 'some default')
1843        self.assertEqual(header, 'first-value, second-value')
1844
1845    def test_getting_nonexistent_header_with_string_default(self):
1846        header = self.resp.getheader('No-Such-Header', 'default-value')
1847        self.assertEqual(header, 'default-value')
1848
1849    def test_getting_nonexistent_header_with_iterable_default(self):
1850        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1851        self.assertEqual(header, 'default, values')
1852
1853        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1854        self.assertEqual(header, 'default, values')
1855
1856    def test_getting_nonexistent_header_without_default(self):
1857        header = self.resp.getheader('No-Such-Header')
1858        self.assertEqual(header, None)
1859
1860    def test_getting_header_defaultint(self):
1861        header = self.resp.getheader('No-Such-Header',default=42)
1862        self.assertEqual(header, 42)
1863
1864class TunnelTests(TestCase):
1865    def setUp(self):
1866        response_text = (
1867            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1868            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1869            'Content-Length: 42\r\n\r\n'
1870        )
1871        self.host = 'proxy.com'
1872        self.conn = client.HTTPConnection(self.host)
1873        self.conn._create_connection = self._create_connection(response_text)
1874
1875    def tearDown(self):
1876        self.conn.close()
1877
1878    def _create_connection(self, response_text):
1879        def create_connection(address, timeout=None, source_address=None):
1880            return FakeSocket(response_text, host=address[0], port=address[1])
1881        return create_connection
1882
1883    def test_set_tunnel_host_port_headers(self):
1884        tunnel_host = 'destination.com'
1885        tunnel_port = 8888
1886        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
1887        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
1888                             headers=tunnel_headers)
1889        self.conn.request('HEAD', '/', '')
1890        self.assertEqual(self.conn.sock.host, self.host)
1891        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1892        self.assertEqual(self.conn._tunnel_host, tunnel_host)
1893        self.assertEqual(self.conn._tunnel_port, tunnel_port)
1894        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
1895
1896    def test_disallow_set_tunnel_after_connect(self):
1897        # Once connected, we shouldn't be able to tunnel anymore
1898        self.conn.connect()
1899        self.assertRaises(RuntimeError, self.conn.set_tunnel,
1900                          'destination.com')
1901
1902    def test_connect_with_tunnel(self):
1903        self.conn.set_tunnel('destination.com')
1904        self.conn.request('HEAD', '/', '')
1905        self.assertEqual(self.conn.sock.host, self.host)
1906        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1907        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1908        # issue22095
1909        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
1910        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1911
1912        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
1913        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
1914
1915    def test_connect_put_request(self):
1916        self.conn.set_tunnel('destination.com')
1917        self.conn.request('PUT', '/', '')
1918        self.assertEqual(self.conn.sock.host, self.host)
1919        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1920        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
1921        self.assertIn(b'Host: destination.com', self.conn.sock.data)
1922
1923    def test_tunnel_debuglog(self):
1924        expected_header = 'X-Dummy: 1'
1925        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
1926
1927        self.conn.set_debuglevel(1)
1928        self.conn._create_connection = self._create_connection(response_text)
1929        self.conn.set_tunnel('destination.com')
1930
1931        with support.captured_stdout() as output:
1932            self.conn.request('PUT', '/', '')
1933        lines = output.getvalue().splitlines()
1934        self.assertIn('header: {}'.format(expected_header), lines)
1935
1936
1937if __name__ == '__main__':
1938    unittest.main(verbosity=2)
1939