1import errno 2from http import client, HTTPStatus 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13from unittest import mock 14TestCase = unittest.TestCase 15 16from test import support 17from test.support import os_helper 18from test.support import socket_helper 19from test.support import warnings_helper 20 21 22here = os.path.dirname(__file__) 23# Self-signed cert file for 'localhost' 24CERT_localhost = os.path.join(here, 'keycert.pem') 25# Self-signed cert file for 'fakehostname' 26CERT_fakehostname = os.path.join(here, 'keycert2.pem') 27# Self-signed cert file for self-signed.pythontest.net 28CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 29 30# constants for testing chunked encoding 31chunked_start = ( 32 'HTTP/1.1 200 OK\r\n' 33 'Transfer-Encoding: chunked\r\n\r\n' 34 'a\r\n' 35 'hello worl\r\n' 36 '3\r\n' 37 'd! \r\n' 38 '8\r\n' 39 'and now \r\n' 40 '22\r\n' 41 'for something completely different\r\n' 42) 43chunked_expected = b'hello world! and now for something completely different' 44chunk_extension = ";foo=bar" 45last_chunk = "0\r\n" 46last_chunk_extended = "0" + chunk_extension + "\r\n" 47trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 48chunked_end = "\r\n" 49 50HOST = socket_helper.HOST 51 52class FakeSocket: 53 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 54 if isinstance(text, str): 55 text = text.encode("ascii") 56 self.text = text 57 self.fileclass = fileclass 58 self.data = b'' 59 self.sendall_calls = 0 60 self.file_closed = False 61 self.host = host 62 self.port = port 63 64 def sendall(self, data): 65 self.sendall_calls += 1 66 self.data += data 67 68 def makefile(self, mode, bufsize=None): 69 if mode != 'r' and mode != 'rb': 70 raise client.UnimplementedFileMode() 71 # keep the file around so we can check how much was read from it 72 self.file = self.fileclass(self.text) 73 self.file.close = self.file_close #nerf close () 74 return self.file 75 76 def file_close(self): 77 self.file_closed = True 78 79 def close(self): 80 pass 81 82 def setsockopt(self, level, optname, value): 83 pass 84 85class EPipeSocket(FakeSocket): 86 87 def __init__(self, text, pipe_trigger): 88 # When sendall() is called with pipe_trigger, raise EPIPE. 89 FakeSocket.__init__(self, text) 90 self.pipe_trigger = pipe_trigger 91 92 def sendall(self, data): 93 if self.pipe_trigger in data: 94 raise OSError(errno.EPIPE, "gotcha") 95 self.data += data 96 97 def close(self): 98 pass 99 100class NoEOFBytesIO(io.BytesIO): 101 """Like BytesIO, but raises AssertionError on EOF. 102 103 This is used below to test that http.client doesn't try to read 104 more from the underlying file than it should. 105 """ 106 def read(self, n=-1): 107 data = io.BytesIO.read(self, n) 108 if data == b'': 109 raise AssertionError('caller tried to read past EOF') 110 return data 111 112 def readline(self, length=None): 113 data = io.BytesIO.readline(self, length) 114 if data == b'': 115 raise AssertionError('caller tried to read past EOF') 116 return data 117 118class FakeSocketHTTPConnection(client.HTTPConnection): 119 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 120 121 def __init__(self, *args): 122 self.connections = 0 123 super().__init__('example.com') 124 self.fake_socket_args = args 125 self._create_connection = self.create_connection 126 127 def connect(self): 128 """Count the number of times connect() is invoked""" 129 self.connections += 1 130 return super().connect() 131 132 def create_connection(self, *pos, **kw): 133 return FakeSocket(*self.fake_socket_args) 134 135class HeaderTests(TestCase): 136 def test_auto_headers(self): 137 # Some headers are added automatically, but should not be added by 138 # .request() if they are explicitly set. 139 140 class HeaderCountingBuffer(list): 141 def __init__(self): 142 self.count = {} 143 def append(self, item): 144 kv = item.split(b':') 145 if len(kv) > 1: 146 # item is a 'Key: Value' header string 147 lcKey = kv[0].decode('ascii').lower() 148 self.count.setdefault(lcKey, 0) 149 self.count[lcKey] += 1 150 list.append(self, item) 151 152 for explicit_header in True, False: 153 for header in 'Content-length', 'Host', 'Accept-encoding': 154 conn = client.HTTPConnection('example.com') 155 conn.sock = FakeSocket('blahblahblah') 156 conn._buffer = HeaderCountingBuffer() 157 158 body = 'spamspamspam' 159 headers = {} 160 if explicit_header: 161 headers[header] = str(len(body)) 162 conn.request('POST', '/', body, headers) 163 self.assertEqual(conn._buffer.count[header.lower()], 1) 164 165 def test_content_length_0(self): 166 167 class ContentLengthChecker(list): 168 def __init__(self): 169 list.__init__(self) 170 self.content_length = None 171 def append(self, item): 172 kv = item.split(b':', 1) 173 if len(kv) > 1 and kv[0].lower() == b'content-length': 174 self.content_length = kv[1].strip() 175 list.append(self, item) 176 177 # Here, we're testing that methods expecting a body get a 178 # content-length set to zero if the body is empty (either None or '') 179 bodies = (None, '') 180 methods_with_body = ('PUT', 'POST', 'PATCH') 181 for method, body in itertools.product(methods_with_body, bodies): 182 conn = client.HTTPConnection('example.com') 183 conn.sock = FakeSocket(None) 184 conn._buffer = ContentLengthChecker() 185 conn.request(method, '/', body) 186 self.assertEqual( 187 conn._buffer.content_length, b'0', 188 'Header Content-Length incorrect on {}'.format(method) 189 ) 190 191 # For these methods, we make sure that content-length is not set when 192 # the body is None because it might cause unexpected behaviour on the 193 # server. 194 methods_without_body = ( 195 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 196 ) 197 for method in methods_without_body: 198 conn = client.HTTPConnection('example.com') 199 conn.sock = FakeSocket(None) 200 conn._buffer = ContentLengthChecker() 201 conn.request(method, '/', None) 202 self.assertEqual( 203 conn._buffer.content_length, None, 204 'Header Content-Length set for empty body on {}'.format(method) 205 ) 206 207 # If the body is set to '', that's considered to be "present but 208 # empty" rather than "missing", so content length would be set, even 209 # for methods that don't expect a body. 210 for method in methods_without_body: 211 conn = client.HTTPConnection('example.com') 212 conn.sock = FakeSocket(None) 213 conn._buffer = ContentLengthChecker() 214 conn.request(method, '/', '') 215 self.assertEqual( 216 conn._buffer.content_length, b'0', 217 'Header Content-Length incorrect on {}'.format(method) 218 ) 219 220 # If the body is set, make sure Content-Length is set. 221 for method in itertools.chain(methods_without_body, methods_with_body): 222 conn = client.HTTPConnection('example.com') 223 conn.sock = FakeSocket(None) 224 conn._buffer = ContentLengthChecker() 225 conn.request(method, '/', ' ') 226 self.assertEqual( 227 conn._buffer.content_length, b'1', 228 'Header Content-Length incorrect on {}'.format(method) 229 ) 230 231 def test_putheader(self): 232 conn = client.HTTPConnection('example.com') 233 conn.sock = FakeSocket(None) 234 conn.putrequest('GET','/') 235 conn.putheader('Content-length', 42) 236 self.assertIn(b'Content-length: 42', conn._buffer) 237 238 conn.putheader('Foo', ' bar ') 239 self.assertIn(b'Foo: bar ', conn._buffer) 240 conn.putheader('Bar', '\tbaz\t') 241 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 242 conn.putheader('Authorization', 'Bearer mytoken') 243 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 244 conn.putheader('IterHeader', 'IterA', 'IterB') 245 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 246 conn.putheader('LatinHeader', b'\xFF') 247 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 248 conn.putheader('Utf8Header', b'\xc3\x80') 249 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 250 conn.putheader('C1-Control', b'next\x85line') 251 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 252 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 253 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 254 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 255 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 256 conn.putheader('Key Space', 'value') 257 self.assertIn(b'Key Space: value', conn._buffer) 258 conn.putheader('KeySpace ', 'value') 259 self.assertIn(b'KeySpace : value', conn._buffer) 260 conn.putheader(b'Nonbreak\xa0Space', 'value') 261 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 262 conn.putheader(b'\xa0NonbreakSpace', 'value') 263 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 264 265 def test_ipv6host_header(self): 266 # Default host header on IPv6 transaction should be wrapped by [] if 267 # it is an IPv6 address 268 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 269 b'Accept-Encoding: identity\r\n\r\n' 270 conn = client.HTTPConnection('[2001::]:81') 271 sock = FakeSocket('') 272 conn.sock = sock 273 conn.request('GET', '/foo') 274 self.assertTrue(sock.data.startswith(expected)) 275 276 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 277 b'Accept-Encoding: identity\r\n\r\n' 278 conn = client.HTTPConnection('[2001:102A::]') 279 sock = FakeSocket('') 280 conn.sock = sock 281 conn.request('GET', '/foo') 282 self.assertTrue(sock.data.startswith(expected)) 283 284 def test_malformed_headers_coped_with(self): 285 # Issue 19996 286 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 287 sock = FakeSocket(body) 288 resp = client.HTTPResponse(sock) 289 resp.begin() 290 291 self.assertEqual(resp.getheader('First'), 'val') 292 self.assertEqual(resp.getheader('Second'), 'val') 293 294 def test_parse_all_octets(self): 295 # Ensure no valid header field octet breaks the parser 296 body = ( 297 b'HTTP/1.1 200 OK\r\n' 298 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 299 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 300 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 301 b'obs-fold: text\r\n' 302 b' folded with space\r\n' 303 b'\tfolded with tab\r\n' 304 b'Content-Length: 0\r\n' 305 b'\r\n' 306 ) 307 sock = FakeSocket(body) 308 resp = client.HTTPResponse(sock) 309 resp.begin() 310 self.assertEqual(resp.getheader('Content-Length'), '0') 311 self.assertEqual(resp.msg['Content-Length'], '0') 312 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 313 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 314 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 315 self.assertEqual(resp.getheader('VCHAR'), vchar) 316 self.assertEqual(resp.msg['VCHAR'], vchar) 317 self.assertIsNotNone(resp.getheader('obs-text')) 318 self.assertIn('obs-text', resp.msg) 319 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 320 self.assertTrue(folded.startswith('text')) 321 self.assertIn(' folded with space', folded) 322 self.assertTrue(folded.endswith('folded with tab')) 323 324 def test_invalid_headers(self): 325 conn = client.HTTPConnection('example.com') 326 conn.sock = FakeSocket('') 327 conn.putrequest('GET', '/') 328 329 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 330 # longer allowed in header names 331 cases = ( 332 (b'Invalid\r\nName', b'ValidValue'), 333 (b'Invalid\rName', b'ValidValue'), 334 (b'Invalid\nName', b'ValidValue'), 335 (b'\r\nInvalidName', b'ValidValue'), 336 (b'\rInvalidName', b'ValidValue'), 337 (b'\nInvalidName', b'ValidValue'), 338 (b' InvalidName', b'ValidValue'), 339 (b'\tInvalidName', b'ValidValue'), 340 (b'Invalid:Name', b'ValidValue'), 341 (b':InvalidName', b'ValidValue'), 342 (b'ValidName', b'Invalid\r\nValue'), 343 (b'ValidName', b'Invalid\rValue'), 344 (b'ValidName', b'Invalid\nValue'), 345 (b'ValidName', b'InvalidValue\r\n'), 346 (b'ValidName', b'InvalidValue\r'), 347 (b'ValidName', b'InvalidValue\n'), 348 ) 349 for name, value in cases: 350 with self.subTest((name, value)): 351 with self.assertRaisesRegex(ValueError, 'Invalid header'): 352 conn.putheader(name, value) 353 354 def test_headers_debuglevel(self): 355 body = ( 356 b'HTTP/1.1 200 OK\r\n' 357 b'First: val\r\n' 358 b'Second: val1\r\n' 359 b'Second: val2\r\n' 360 ) 361 sock = FakeSocket(body) 362 resp = client.HTTPResponse(sock, debuglevel=1) 363 with support.captured_stdout() as output: 364 resp.begin() 365 lines = output.getvalue().splitlines() 366 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 367 self.assertEqual(lines[1], "header: First: val") 368 self.assertEqual(lines[2], "header: Second: val1") 369 self.assertEqual(lines[3], "header: Second: val2") 370 371 372class HttpMethodTests(TestCase): 373 def test_invalid_method_names(self): 374 methods = ( 375 'GET\r', 376 'POST\n', 377 'PUT\n\r', 378 'POST\nValue', 379 'POST\nHOST:abc', 380 'GET\nrHost:abc\n', 381 'POST\rRemainder:\r', 382 'GET\rHOST:\n', 383 '\nPUT' 384 ) 385 386 for method in methods: 387 with self.assertRaisesRegex( 388 ValueError, "method can't contain control characters"): 389 conn = client.HTTPConnection('example.com') 390 conn.sock = FakeSocket(None) 391 conn.request(method=method, url="/") 392 393 394class TransferEncodingTest(TestCase): 395 expected_body = b"It's just a flesh wound" 396 397 def test_endheaders_chunked(self): 398 conn = client.HTTPConnection('example.com') 399 conn.sock = FakeSocket(b'') 400 conn.putrequest('POST', '/') 401 conn.endheaders(self._make_body(), encode_chunked=True) 402 403 _, _, body = self._parse_request(conn.sock.data) 404 body = self._parse_chunked(body) 405 self.assertEqual(body, self.expected_body) 406 407 def test_explicit_headers(self): 408 # explicit chunked 409 conn = client.HTTPConnection('example.com') 410 conn.sock = FakeSocket(b'') 411 # this shouldn't actually be automatically chunk-encoded because the 412 # calling code has explicitly stated that it's taking care of it 413 conn.request( 414 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 415 416 _, headers, body = self._parse_request(conn.sock.data) 417 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 418 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 419 self.assertEqual(body, self.expected_body) 420 421 # explicit chunked, string body 422 conn = client.HTTPConnection('example.com') 423 conn.sock = FakeSocket(b'') 424 conn.request( 425 'POST', '/', self.expected_body.decode('latin-1'), 426 {'Transfer-Encoding': 'chunked'}) 427 428 _, headers, body = self._parse_request(conn.sock.data) 429 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 430 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 431 self.assertEqual(body, self.expected_body) 432 433 # User-specified TE, but request() does the chunk encoding 434 conn = client.HTTPConnection('example.com') 435 conn.sock = FakeSocket(b'') 436 conn.request('POST', '/', 437 headers={'Transfer-Encoding': 'gzip, chunked'}, 438 encode_chunked=True, 439 body=self._make_body()) 440 _, headers, body = self._parse_request(conn.sock.data) 441 self.assertNotIn('content-length', [k.lower() for k in headers]) 442 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 443 self.assertEqual(self._parse_chunked(body), self.expected_body) 444 445 def test_request(self): 446 for empty_lines in (False, True,): 447 conn = client.HTTPConnection('example.com') 448 conn.sock = FakeSocket(b'') 449 conn.request( 450 'POST', '/', self._make_body(empty_lines=empty_lines)) 451 452 _, headers, body = self._parse_request(conn.sock.data) 453 body = self._parse_chunked(body) 454 self.assertEqual(body, self.expected_body) 455 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 456 457 # Content-Length and Transfer-Encoding SHOULD not be sent in the 458 # same request 459 self.assertNotIn('content-length', [k.lower() for k in headers]) 460 461 def test_empty_body(self): 462 # Zero-length iterable should be treated like any other iterable 463 conn = client.HTTPConnection('example.com') 464 conn.sock = FakeSocket(b'') 465 conn.request('POST', '/', ()) 466 _, headers, body = self._parse_request(conn.sock.data) 467 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 468 self.assertNotIn('content-length', [k.lower() for k in headers]) 469 self.assertEqual(body, b"0\r\n\r\n") 470 471 def _make_body(self, empty_lines=False): 472 lines = self.expected_body.split(b' ') 473 for idx, line in enumerate(lines): 474 # for testing handling empty lines 475 if empty_lines and idx % 2: 476 yield b'' 477 if idx < len(lines) - 1: 478 yield line + b' ' 479 else: 480 yield line 481 482 def _parse_request(self, data): 483 lines = data.split(b'\r\n') 484 request = lines[0] 485 headers = {} 486 n = 1 487 while n < len(lines) and len(lines[n]) > 0: 488 key, val = lines[n].split(b':') 489 key = key.decode('latin-1').strip() 490 headers[key] = val.decode('latin-1').strip() 491 n += 1 492 493 return request, headers, b'\r\n'.join(lines[n + 1:]) 494 495 def _parse_chunked(self, data): 496 body = [] 497 trailers = {} 498 n = 0 499 lines = data.split(b'\r\n') 500 # parse body 501 while True: 502 size, chunk = lines[n:n+2] 503 size = int(size, 16) 504 505 if size == 0: 506 n += 1 507 break 508 509 self.assertEqual(size, len(chunk)) 510 body.append(chunk) 511 512 n += 2 513 # we /should/ hit the end chunk, but check against the size of 514 # lines so we're not stuck in an infinite loop should we get 515 # malformed data 516 if n > len(lines): 517 break 518 519 return b''.join(body) 520 521 522class BasicTest(TestCase): 523 def test_dir_with_added_behavior_on_status(self): 524 # see issue40084 525 self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404)))) 526 527 def test_status_lines(self): 528 # Test HTTP status lines 529 530 body = "HTTP/1.1 200 Ok\r\n\r\nText" 531 sock = FakeSocket(body) 532 resp = client.HTTPResponse(sock) 533 resp.begin() 534 self.assertEqual(resp.read(0), b'') # Issue #20007 535 self.assertFalse(resp.isclosed()) 536 self.assertFalse(resp.closed) 537 self.assertEqual(resp.read(), b"Text") 538 self.assertTrue(resp.isclosed()) 539 self.assertFalse(resp.closed) 540 resp.close() 541 self.assertTrue(resp.closed) 542 543 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 544 sock = FakeSocket(body) 545 resp = client.HTTPResponse(sock) 546 self.assertRaises(client.BadStatusLine, resp.begin) 547 548 def test_bad_status_repr(self): 549 exc = client.BadStatusLine('') 550 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 551 552 def test_partial_reads(self): 553 # if we have Content-Length, HTTPResponse knows when to close itself, 554 # the same behaviour as when we read the whole thing with read() 555 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 556 sock = FakeSocket(body) 557 resp = client.HTTPResponse(sock) 558 resp.begin() 559 self.assertEqual(resp.read(2), b'Te') 560 self.assertFalse(resp.isclosed()) 561 self.assertEqual(resp.read(2), b'xt') 562 self.assertTrue(resp.isclosed()) 563 self.assertFalse(resp.closed) 564 resp.close() 565 self.assertTrue(resp.closed) 566 567 def test_mixed_reads(self): 568 # readline() should update the remaining length, so that read() knows 569 # how much data is left and does not raise IncompleteRead 570 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 571 sock = FakeSocket(body) 572 resp = client.HTTPResponse(sock) 573 resp.begin() 574 self.assertEqual(resp.readline(), b'Text\r\n') 575 self.assertFalse(resp.isclosed()) 576 self.assertEqual(resp.read(), b'Another') 577 self.assertTrue(resp.isclosed()) 578 self.assertFalse(resp.closed) 579 resp.close() 580 self.assertTrue(resp.closed) 581 582 def test_partial_readintos(self): 583 # if we have Content-Length, HTTPResponse knows when to close itself, 584 # the same behaviour as when we read the whole thing with read() 585 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 586 sock = FakeSocket(body) 587 resp = client.HTTPResponse(sock) 588 resp.begin() 589 b = bytearray(2) 590 n = resp.readinto(b) 591 self.assertEqual(n, 2) 592 self.assertEqual(bytes(b), b'Te') 593 self.assertFalse(resp.isclosed()) 594 n = resp.readinto(b) 595 self.assertEqual(n, 2) 596 self.assertEqual(bytes(b), b'xt') 597 self.assertTrue(resp.isclosed()) 598 self.assertFalse(resp.closed) 599 resp.close() 600 self.assertTrue(resp.closed) 601 602 def test_partial_reads_past_end(self): 603 # if we have Content-Length, clip reads to the end 604 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 605 sock = FakeSocket(body) 606 resp = client.HTTPResponse(sock) 607 resp.begin() 608 self.assertEqual(resp.read(10), b'Text') 609 self.assertTrue(resp.isclosed()) 610 self.assertFalse(resp.closed) 611 resp.close() 612 self.assertTrue(resp.closed) 613 614 def test_partial_readintos_past_end(self): 615 # if we have Content-Length, clip readintos to the end 616 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 617 sock = FakeSocket(body) 618 resp = client.HTTPResponse(sock) 619 resp.begin() 620 b = bytearray(10) 621 n = resp.readinto(b) 622 self.assertEqual(n, 4) 623 self.assertEqual(bytes(b)[:4], b'Text') 624 self.assertTrue(resp.isclosed()) 625 self.assertFalse(resp.closed) 626 resp.close() 627 self.assertTrue(resp.closed) 628 629 def test_partial_reads_no_content_length(self): 630 # when no length is present, the socket should be gracefully closed when 631 # all data was read 632 body = "HTTP/1.1 200 Ok\r\n\r\nText" 633 sock = FakeSocket(body) 634 resp = client.HTTPResponse(sock) 635 resp.begin() 636 self.assertEqual(resp.read(2), b'Te') 637 self.assertFalse(resp.isclosed()) 638 self.assertEqual(resp.read(2), b'xt') 639 self.assertEqual(resp.read(1), b'') 640 self.assertTrue(resp.isclosed()) 641 self.assertFalse(resp.closed) 642 resp.close() 643 self.assertTrue(resp.closed) 644 645 def test_partial_readintos_no_content_length(self): 646 # when no length is present, the socket should be gracefully closed when 647 # all data was read 648 body = "HTTP/1.1 200 Ok\r\n\r\nText" 649 sock = FakeSocket(body) 650 resp = client.HTTPResponse(sock) 651 resp.begin() 652 b = bytearray(2) 653 n = resp.readinto(b) 654 self.assertEqual(n, 2) 655 self.assertEqual(bytes(b), b'Te') 656 self.assertFalse(resp.isclosed()) 657 n = resp.readinto(b) 658 self.assertEqual(n, 2) 659 self.assertEqual(bytes(b), b'xt') 660 n = resp.readinto(b) 661 self.assertEqual(n, 0) 662 self.assertTrue(resp.isclosed()) 663 664 def test_partial_reads_incomplete_body(self): 665 # if the server shuts down the connection before the whole 666 # content-length is delivered, the socket is gracefully closed 667 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 668 sock = FakeSocket(body) 669 resp = client.HTTPResponse(sock) 670 resp.begin() 671 self.assertEqual(resp.read(2), b'Te') 672 self.assertFalse(resp.isclosed()) 673 self.assertEqual(resp.read(2), b'xt') 674 self.assertEqual(resp.read(1), b'') 675 self.assertTrue(resp.isclosed()) 676 677 def test_partial_readintos_incomplete_body(self): 678 # if the server shuts down the connection before the whole 679 # content-length is delivered, the socket is gracefully closed 680 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 681 sock = FakeSocket(body) 682 resp = client.HTTPResponse(sock) 683 resp.begin() 684 b = bytearray(2) 685 n = resp.readinto(b) 686 self.assertEqual(n, 2) 687 self.assertEqual(bytes(b), b'Te') 688 self.assertFalse(resp.isclosed()) 689 n = resp.readinto(b) 690 self.assertEqual(n, 2) 691 self.assertEqual(bytes(b), b'xt') 692 n = resp.readinto(b) 693 self.assertEqual(n, 0) 694 self.assertTrue(resp.isclosed()) 695 self.assertFalse(resp.closed) 696 resp.close() 697 self.assertTrue(resp.closed) 698 699 def test_host_port(self): 700 # Check invalid host_port 701 702 for hp in ("www.python.org:abc", "user:password@www.python.org"): 703 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 704 705 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 706 "fe80::207:e9ff:fe9b", 8000), 707 ("www.python.org:80", "www.python.org", 80), 708 ("www.python.org:", "www.python.org", 80), 709 ("www.python.org", "www.python.org", 80), 710 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 711 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 712 c = client.HTTPConnection(hp) 713 self.assertEqual(h, c.host) 714 self.assertEqual(p, c.port) 715 716 def test_response_headers(self): 717 # test response with multiple message headers with the same field name. 718 text = ('HTTP/1.1 200 OK\r\n' 719 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 720 'Version="1"; Path="/acme"\r\n' 721 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 722 ' Path="/acme"\r\n' 723 '\r\n' 724 'No body\r\n') 725 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 726 ', ' 727 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 728 s = FakeSocket(text) 729 r = client.HTTPResponse(s) 730 r.begin() 731 cookies = r.getheader("Set-Cookie") 732 self.assertEqual(cookies, hdr) 733 734 def test_read_head(self): 735 # Test that the library doesn't attempt to read any data 736 # from a HEAD request. (Tickles SF bug #622042.) 737 sock = FakeSocket( 738 'HTTP/1.1 200 OK\r\n' 739 'Content-Length: 14432\r\n' 740 '\r\n', 741 NoEOFBytesIO) 742 resp = client.HTTPResponse(sock, method="HEAD") 743 resp.begin() 744 if resp.read(): 745 self.fail("Did not expect response from HEAD request") 746 747 def test_readinto_head(self): 748 # Test that the library doesn't attempt to read any data 749 # from a HEAD request. (Tickles SF bug #622042.) 750 sock = FakeSocket( 751 'HTTP/1.1 200 OK\r\n' 752 'Content-Length: 14432\r\n' 753 '\r\n', 754 NoEOFBytesIO) 755 resp = client.HTTPResponse(sock, method="HEAD") 756 resp.begin() 757 b = bytearray(5) 758 if resp.readinto(b) != 0: 759 self.fail("Did not expect response from HEAD request") 760 self.assertEqual(bytes(b), b'\x00'*5) 761 762 def test_too_many_headers(self): 763 headers = '\r\n'.join('Header%d: foo' % i 764 for i in range(client._MAXHEADERS + 1)) + '\r\n' 765 text = ('HTTP/1.1 200 OK\r\n' + headers) 766 s = FakeSocket(text) 767 r = client.HTTPResponse(s) 768 self.assertRaisesRegex(client.HTTPException, 769 r"got more than \d+ headers", r.begin) 770 771 def test_send_file(self): 772 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 773 b'Accept-Encoding: identity\r\n' 774 b'Transfer-Encoding: chunked\r\n' 775 b'\r\n') 776 777 with open(__file__, 'rb') as body: 778 conn = client.HTTPConnection('example.com') 779 sock = FakeSocket(body) 780 conn.sock = sock 781 conn.request('GET', '/foo', body) 782 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 783 (sock.data[:len(expected)], expected)) 784 785 def test_send(self): 786 expected = b'this is a test this is only a test' 787 conn = client.HTTPConnection('example.com') 788 sock = FakeSocket(None) 789 conn.sock = sock 790 conn.send(expected) 791 self.assertEqual(expected, sock.data) 792 sock.data = b'' 793 conn.send(array.array('b', expected)) 794 self.assertEqual(expected, sock.data) 795 sock.data = b'' 796 conn.send(io.BytesIO(expected)) 797 self.assertEqual(expected, sock.data) 798 799 def test_send_updating_file(self): 800 def data(): 801 yield 'data' 802 yield None 803 yield 'data_two' 804 805 class UpdatingFile(io.TextIOBase): 806 mode = 'r' 807 d = data() 808 def read(self, blocksize=-1): 809 return next(self.d) 810 811 expected = b'data' 812 813 conn = client.HTTPConnection('example.com') 814 sock = FakeSocket("") 815 conn.sock = sock 816 conn.send(UpdatingFile()) 817 self.assertEqual(sock.data, expected) 818 819 820 def test_send_iter(self): 821 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 822 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 823 b'\r\nonetwothree' 824 825 def body(): 826 yield b"one" 827 yield b"two" 828 yield b"three" 829 830 conn = client.HTTPConnection('example.com') 831 sock = FakeSocket("") 832 conn.sock = sock 833 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 834 self.assertEqual(sock.data, expected) 835 836 def test_blocksize_request(self): 837 """Check that request() respects the configured block size.""" 838 blocksize = 8 # For easy debugging. 839 conn = client.HTTPConnection('example.com', blocksize=blocksize) 840 sock = FakeSocket(None) 841 conn.sock = sock 842 expected = b"a" * blocksize + b"b" 843 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 844 self.assertEqual(sock.sendall_calls, 3) 845 body = sock.data.split(b"\r\n\r\n", 1)[1] 846 self.assertEqual(body, expected) 847 848 def test_blocksize_send(self): 849 """Check that send() respects the configured block size.""" 850 blocksize = 8 # For easy debugging. 851 conn = client.HTTPConnection('example.com', blocksize=blocksize) 852 sock = FakeSocket(None) 853 conn.sock = sock 854 expected = b"a" * blocksize + b"b" 855 conn.send(io.BytesIO(expected)) 856 self.assertEqual(sock.sendall_calls, 2) 857 self.assertEqual(sock.data, expected) 858 859 def test_send_type_error(self): 860 # See: Issue #12676 861 conn = client.HTTPConnection('example.com') 862 conn.sock = FakeSocket('') 863 with self.assertRaises(TypeError): 864 conn.request('POST', 'test', conn) 865 866 def test_chunked(self): 867 expected = chunked_expected 868 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 869 resp = client.HTTPResponse(sock, method="GET") 870 resp.begin() 871 self.assertEqual(resp.read(), expected) 872 resp.close() 873 874 # Various read sizes 875 for n in range(1, 12): 876 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 877 resp = client.HTTPResponse(sock, method="GET") 878 resp.begin() 879 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 880 resp.close() 881 882 for x in ('', 'foo\r\n'): 883 sock = FakeSocket(chunked_start + x) 884 resp = client.HTTPResponse(sock, method="GET") 885 resp.begin() 886 try: 887 resp.read() 888 except client.IncompleteRead as i: 889 self.assertEqual(i.partial, expected) 890 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 891 self.assertEqual(repr(i), expected_message) 892 self.assertEqual(str(i), expected_message) 893 else: 894 self.fail('IncompleteRead expected') 895 finally: 896 resp.close() 897 898 def test_readinto_chunked(self): 899 900 expected = chunked_expected 901 nexpected = len(expected) 902 b = bytearray(128) 903 904 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 905 resp = client.HTTPResponse(sock, method="GET") 906 resp.begin() 907 n = resp.readinto(b) 908 self.assertEqual(b[:nexpected], expected) 909 self.assertEqual(n, nexpected) 910 resp.close() 911 912 # Various read sizes 913 for n in range(1, 12): 914 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 915 resp = client.HTTPResponse(sock, method="GET") 916 resp.begin() 917 m = memoryview(b) 918 i = resp.readinto(m[0:n]) 919 i += resp.readinto(m[i:n + i]) 920 i += resp.readinto(m[i:]) 921 self.assertEqual(b[:nexpected], expected) 922 self.assertEqual(i, nexpected) 923 resp.close() 924 925 for x in ('', 'foo\r\n'): 926 sock = FakeSocket(chunked_start + x) 927 resp = client.HTTPResponse(sock, method="GET") 928 resp.begin() 929 try: 930 n = resp.readinto(b) 931 except client.IncompleteRead as i: 932 self.assertEqual(i.partial, expected) 933 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 934 self.assertEqual(repr(i), expected_message) 935 self.assertEqual(str(i), expected_message) 936 else: 937 self.fail('IncompleteRead expected') 938 finally: 939 resp.close() 940 941 def test_chunked_head(self): 942 chunked_start = ( 943 'HTTP/1.1 200 OK\r\n' 944 'Transfer-Encoding: chunked\r\n\r\n' 945 'a\r\n' 946 'hello world\r\n' 947 '1\r\n' 948 'd\r\n' 949 ) 950 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 951 resp = client.HTTPResponse(sock, method="HEAD") 952 resp.begin() 953 self.assertEqual(resp.read(), b'') 954 self.assertEqual(resp.status, 200) 955 self.assertEqual(resp.reason, 'OK') 956 self.assertTrue(resp.isclosed()) 957 self.assertFalse(resp.closed) 958 resp.close() 959 self.assertTrue(resp.closed) 960 961 def test_readinto_chunked_head(self): 962 chunked_start = ( 963 'HTTP/1.1 200 OK\r\n' 964 'Transfer-Encoding: chunked\r\n\r\n' 965 'a\r\n' 966 'hello world\r\n' 967 '1\r\n' 968 'd\r\n' 969 ) 970 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 971 resp = client.HTTPResponse(sock, method="HEAD") 972 resp.begin() 973 b = bytearray(5) 974 n = resp.readinto(b) 975 self.assertEqual(n, 0) 976 self.assertEqual(bytes(b), b'\x00'*5) 977 self.assertEqual(resp.status, 200) 978 self.assertEqual(resp.reason, 'OK') 979 self.assertTrue(resp.isclosed()) 980 self.assertFalse(resp.closed) 981 resp.close() 982 self.assertTrue(resp.closed) 983 984 def test_negative_content_length(self): 985 sock = FakeSocket( 986 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 987 resp = client.HTTPResponse(sock, method="GET") 988 resp.begin() 989 self.assertEqual(resp.read(), b'Hello\r\n') 990 self.assertTrue(resp.isclosed()) 991 992 def test_incomplete_read(self): 993 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 994 resp = client.HTTPResponse(sock, method="GET") 995 resp.begin() 996 try: 997 resp.read() 998 except client.IncompleteRead as i: 999 self.assertEqual(i.partial, b'Hello\r\n') 1000 self.assertEqual(repr(i), 1001 "IncompleteRead(7 bytes read, 3 more expected)") 1002 self.assertEqual(str(i), 1003 "IncompleteRead(7 bytes read, 3 more expected)") 1004 self.assertTrue(resp.isclosed()) 1005 else: 1006 self.fail('IncompleteRead expected') 1007 1008 def test_epipe(self): 1009 sock = EPipeSocket( 1010 "HTTP/1.0 401 Authorization Required\r\n" 1011 "Content-type: text/html\r\n" 1012 "WWW-Authenticate: Basic realm=\"example\"\r\n", 1013 b"Content-Length") 1014 conn = client.HTTPConnection("example.com") 1015 conn.sock = sock 1016 self.assertRaises(OSError, 1017 lambda: conn.request("PUT", "/url", "body")) 1018 resp = conn.getresponse() 1019 self.assertEqual(401, resp.status) 1020 self.assertEqual("Basic realm=\"example\"", 1021 resp.getheader("www-authenticate")) 1022 1023 # Test lines overflowing the max line size (_MAXLINE in http.client) 1024 1025 def test_overflowing_status_line(self): 1026 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 1027 resp = client.HTTPResponse(FakeSocket(body)) 1028 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 1029 1030 def test_overflowing_header_line(self): 1031 body = ( 1032 'HTTP/1.1 200 OK\r\n' 1033 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 1034 ) 1035 resp = client.HTTPResponse(FakeSocket(body)) 1036 self.assertRaises(client.LineTooLong, resp.begin) 1037 1038 def test_overflowing_header_limit_after_100(self): 1039 body = ( 1040 'HTTP/1.1 100 OK\r\n' 1041 'r\n' * 32768 1042 ) 1043 resp = client.HTTPResponse(FakeSocket(body)) 1044 with self.assertRaises(client.HTTPException) as cm: 1045 resp.begin() 1046 # We must assert more because other reasonable errors that we 1047 # do not want can also be HTTPException derived. 1048 self.assertIn('got more than ', str(cm.exception)) 1049 self.assertIn('headers', str(cm.exception)) 1050 1051 def test_overflowing_chunked_line(self): 1052 body = ( 1053 'HTTP/1.1 200 OK\r\n' 1054 'Transfer-Encoding: chunked\r\n\r\n' 1055 + '0' * 65536 + 'a\r\n' 1056 'hello world\r\n' 1057 '0\r\n' 1058 '\r\n' 1059 ) 1060 resp = client.HTTPResponse(FakeSocket(body)) 1061 resp.begin() 1062 self.assertRaises(client.LineTooLong, resp.read) 1063 1064 def test_early_eof(self): 1065 # Test httpresponse with no \r\n termination, 1066 body = "HTTP/1.1 200 Ok" 1067 sock = FakeSocket(body) 1068 resp = client.HTTPResponse(sock) 1069 resp.begin() 1070 self.assertEqual(resp.read(), b'') 1071 self.assertTrue(resp.isclosed()) 1072 self.assertFalse(resp.closed) 1073 resp.close() 1074 self.assertTrue(resp.closed) 1075 1076 def test_error_leak(self): 1077 # Test that the socket is not leaked if getresponse() fails 1078 conn = client.HTTPConnection('example.com') 1079 response = None 1080 class Response(client.HTTPResponse): 1081 def __init__(self, *pos, **kw): 1082 nonlocal response 1083 response = self # Avoid garbage collector closing the socket 1084 client.HTTPResponse.__init__(self, *pos, **kw) 1085 conn.response_class = Response 1086 conn.sock = FakeSocket('Invalid status line') 1087 conn.request('GET', '/') 1088 self.assertRaises(client.BadStatusLine, conn.getresponse) 1089 self.assertTrue(response.closed) 1090 self.assertTrue(conn.sock.file_closed) 1091 1092 def test_chunked_extension(self): 1093 extra = '3;foo=bar\r\n' + 'abc\r\n' 1094 expected = chunked_expected + b'abc' 1095 1096 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1097 resp = client.HTTPResponse(sock, method="GET") 1098 resp.begin() 1099 self.assertEqual(resp.read(), expected) 1100 resp.close() 1101 1102 def test_chunked_missing_end(self): 1103 """some servers may serve up a short chunked encoding stream""" 1104 expected = chunked_expected 1105 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1106 resp = client.HTTPResponse(sock, method="GET") 1107 resp.begin() 1108 self.assertEqual(resp.read(), expected) 1109 resp.close() 1110 1111 def test_chunked_trailers(self): 1112 """See that trailers are read and ignored""" 1113 expected = chunked_expected 1114 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1115 resp = client.HTTPResponse(sock, method="GET") 1116 resp.begin() 1117 self.assertEqual(resp.read(), expected) 1118 # we should have reached the end of the file 1119 self.assertEqual(sock.file.read(), b"") #we read to the end 1120 resp.close() 1121 1122 def test_chunked_sync(self): 1123 """Check that we don't read past the end of the chunked-encoding stream""" 1124 expected = chunked_expected 1125 extradata = "extradata" 1126 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1127 resp = client.HTTPResponse(sock, method="GET") 1128 resp.begin() 1129 self.assertEqual(resp.read(), expected) 1130 # the file should now have our extradata ready to be read 1131 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1132 resp.close() 1133 1134 def test_content_length_sync(self): 1135 """Check that we don't read past the end of the Content-Length stream""" 1136 extradata = b"extradata" 1137 expected = b"Hello123\r\n" 1138 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1139 resp = client.HTTPResponse(sock, method="GET") 1140 resp.begin() 1141 self.assertEqual(resp.read(), expected) 1142 # the file should now have our extradata ready to be read 1143 self.assertEqual(sock.file.read(), extradata) #we read to the end 1144 resp.close() 1145 1146 def test_readlines_content_length(self): 1147 extradata = b"extradata" 1148 expected = b"Hello123\r\n" 1149 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1150 resp = client.HTTPResponse(sock, method="GET") 1151 resp.begin() 1152 self.assertEqual(resp.readlines(2000), [expected]) 1153 # the file should now have our extradata ready to be read 1154 self.assertEqual(sock.file.read(), extradata) #we read to the end 1155 resp.close() 1156 1157 def test_read1_content_length(self): 1158 extradata = b"extradata" 1159 expected = b"Hello123\r\n" 1160 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1161 resp = client.HTTPResponse(sock, method="GET") 1162 resp.begin() 1163 self.assertEqual(resp.read1(2000), expected) 1164 # the file should now have our extradata ready to be read 1165 self.assertEqual(sock.file.read(), extradata) #we read to the end 1166 resp.close() 1167 1168 def test_readline_bound_content_length(self): 1169 extradata = b"extradata" 1170 expected = b"Hello123\r\n" 1171 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1172 resp = client.HTTPResponse(sock, method="GET") 1173 resp.begin() 1174 self.assertEqual(resp.readline(10), expected) 1175 self.assertEqual(resp.readline(10), b"") 1176 # the file should now have our extradata ready to be read 1177 self.assertEqual(sock.file.read(), extradata) #we read to the end 1178 resp.close() 1179 1180 def test_read1_bound_content_length(self): 1181 extradata = b"extradata" 1182 expected = b"Hello123\r\n" 1183 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1184 resp = client.HTTPResponse(sock, method="GET") 1185 resp.begin() 1186 self.assertEqual(resp.read1(20), expected*2) 1187 self.assertEqual(resp.read(), expected) 1188 # the file should now have our extradata ready to be read 1189 self.assertEqual(sock.file.read(), extradata) #we read to the end 1190 resp.close() 1191 1192 def test_response_fileno(self): 1193 # Make sure fd returned by fileno is valid. 1194 serv = socket.create_server((HOST, 0)) 1195 self.addCleanup(serv.close) 1196 1197 result = None 1198 def run_server(): 1199 [conn, address] = serv.accept() 1200 with conn, conn.makefile("rb") as reader: 1201 # Read the request header until a blank line 1202 while True: 1203 line = reader.readline() 1204 if not line.rstrip(b"\r\n"): 1205 break 1206 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1207 nonlocal result 1208 result = reader.read() 1209 1210 thread = threading.Thread(target=run_server) 1211 thread.start() 1212 self.addCleanup(thread.join, float(1)) 1213 conn = client.HTTPConnection(*serv.getsockname()) 1214 conn.request("CONNECT", "dummy:1234") 1215 response = conn.getresponse() 1216 try: 1217 self.assertEqual(response.status, client.OK) 1218 s = socket.socket(fileno=response.fileno()) 1219 try: 1220 s.sendall(b"proxied data\n") 1221 finally: 1222 s.detach() 1223 finally: 1224 response.close() 1225 conn.close() 1226 thread.join() 1227 self.assertEqual(result, b"proxied data\n") 1228 1229 def test_putrequest_override_domain_validation(self): 1230 """ 1231 It should be possible to override the default validation 1232 behavior in putrequest (bpo-38216). 1233 """ 1234 class UnsafeHTTPConnection(client.HTTPConnection): 1235 def _validate_path(self, url): 1236 pass 1237 1238 conn = UnsafeHTTPConnection('example.com') 1239 conn.sock = FakeSocket('') 1240 conn.putrequest('GET', '/\x00') 1241 1242 def test_putrequest_override_host_validation(self): 1243 class UnsafeHTTPConnection(client.HTTPConnection): 1244 def _validate_host(self, url): 1245 pass 1246 1247 conn = UnsafeHTTPConnection('example.com\r\n') 1248 conn.sock = FakeSocket('') 1249 # set skip_host so a ValueError is not raised upon adding the 1250 # invalid URL as the value of the "Host:" header 1251 conn.putrequest('GET', '/', skip_host=1) 1252 1253 def test_putrequest_override_encoding(self): 1254 """ 1255 It should be possible to override the default encoding 1256 to transmit bytes in another encoding even if invalid 1257 (bpo-36274). 1258 """ 1259 class UnsafeHTTPConnection(client.HTTPConnection): 1260 def _encode_request(self, str_url): 1261 return str_url.encode('utf-8') 1262 1263 conn = UnsafeHTTPConnection('example.com') 1264 conn.sock = FakeSocket('') 1265 conn.putrequest('GET', '/☃') 1266 1267 1268class ExtendedReadTest(TestCase): 1269 """ 1270 Test peek(), read1(), readline() 1271 """ 1272 lines = ( 1273 'HTTP/1.1 200 OK\r\n' 1274 '\r\n' 1275 'hello world!\n' 1276 'and now \n' 1277 'for something completely different\n' 1278 'foo' 1279 ) 1280 lines_expected = lines[lines.find('hello'):].encode("ascii") 1281 lines_chunked = ( 1282 'HTTP/1.1 200 OK\r\n' 1283 'Transfer-Encoding: chunked\r\n\r\n' 1284 'a\r\n' 1285 'hello worl\r\n' 1286 '3\r\n' 1287 'd!\n\r\n' 1288 '9\r\n' 1289 'and now \n\r\n' 1290 '23\r\n' 1291 'for something completely different\n\r\n' 1292 '3\r\n' 1293 'foo\r\n' 1294 '0\r\n' # terminating chunk 1295 '\r\n' # end of trailers 1296 ) 1297 1298 def setUp(self): 1299 sock = FakeSocket(self.lines) 1300 resp = client.HTTPResponse(sock, method="GET") 1301 resp.begin() 1302 resp.fp = io.BufferedReader(resp.fp) 1303 self.resp = resp 1304 1305 1306 1307 def test_peek(self): 1308 resp = self.resp 1309 # patch up the buffered peek so that it returns not too much stuff 1310 oldpeek = resp.fp.peek 1311 def mypeek(n=-1): 1312 p = oldpeek(n) 1313 if n >= 0: 1314 return p[:n] 1315 return p[:10] 1316 resp.fp.peek = mypeek 1317 1318 all = [] 1319 while True: 1320 # try a short peek 1321 p = resp.peek(3) 1322 if p: 1323 self.assertGreater(len(p), 0) 1324 # then unbounded peek 1325 p2 = resp.peek() 1326 self.assertGreaterEqual(len(p2), len(p)) 1327 self.assertTrue(p2.startswith(p)) 1328 next = resp.read(len(p2)) 1329 self.assertEqual(next, p2) 1330 else: 1331 next = resp.read() 1332 self.assertFalse(next) 1333 all.append(next) 1334 if not next: 1335 break 1336 self.assertEqual(b"".join(all), self.lines_expected) 1337 1338 def test_readline(self): 1339 resp = self.resp 1340 self._verify_readline(self.resp.readline, self.lines_expected) 1341 1342 def _verify_readline(self, readline, expected): 1343 all = [] 1344 while True: 1345 # short readlines 1346 line = readline(5) 1347 if line and line != b"foo": 1348 if len(line) < 5: 1349 self.assertTrue(line.endswith(b"\n")) 1350 all.append(line) 1351 if not line: 1352 break 1353 self.assertEqual(b"".join(all), expected) 1354 1355 def test_read1(self): 1356 resp = self.resp 1357 def r(): 1358 res = resp.read1(4) 1359 self.assertLessEqual(len(res), 4) 1360 return res 1361 readliner = Readliner(r) 1362 self._verify_readline(readliner.readline, self.lines_expected) 1363 1364 def test_read1_unbounded(self): 1365 resp = self.resp 1366 all = [] 1367 while True: 1368 data = resp.read1() 1369 if not data: 1370 break 1371 all.append(data) 1372 self.assertEqual(b"".join(all), self.lines_expected) 1373 1374 def test_read1_bounded(self): 1375 resp = self.resp 1376 all = [] 1377 while True: 1378 data = resp.read1(10) 1379 if not data: 1380 break 1381 self.assertLessEqual(len(data), 10) 1382 all.append(data) 1383 self.assertEqual(b"".join(all), self.lines_expected) 1384 1385 def test_read1_0(self): 1386 self.assertEqual(self.resp.read1(0), b"") 1387 1388 def test_peek_0(self): 1389 p = self.resp.peek(0) 1390 self.assertLessEqual(0, len(p)) 1391 1392 1393class ExtendedReadTestChunked(ExtendedReadTest): 1394 """ 1395 Test peek(), read1(), readline() in chunked mode 1396 """ 1397 lines = ( 1398 'HTTP/1.1 200 OK\r\n' 1399 'Transfer-Encoding: chunked\r\n\r\n' 1400 'a\r\n' 1401 'hello worl\r\n' 1402 '3\r\n' 1403 'd!\n\r\n' 1404 '9\r\n' 1405 'and now \n\r\n' 1406 '23\r\n' 1407 'for something completely different\n\r\n' 1408 '3\r\n' 1409 'foo\r\n' 1410 '0\r\n' # terminating chunk 1411 '\r\n' # end of trailers 1412 ) 1413 1414 1415class Readliner: 1416 """ 1417 a simple readline class that uses an arbitrary read function and buffering 1418 """ 1419 def __init__(self, readfunc): 1420 self.readfunc = readfunc 1421 self.remainder = b"" 1422 1423 def readline(self, limit): 1424 data = [] 1425 datalen = 0 1426 read = self.remainder 1427 try: 1428 while True: 1429 idx = read.find(b'\n') 1430 if idx != -1: 1431 break 1432 if datalen + len(read) >= limit: 1433 idx = limit - datalen - 1 1434 # read more data 1435 data.append(read) 1436 read = self.readfunc() 1437 if not read: 1438 idx = 0 #eof condition 1439 break 1440 idx += 1 1441 data.append(read[:idx]) 1442 self.remainder = read[idx:] 1443 return b"".join(data) 1444 except: 1445 self.remainder = b"".join(data) 1446 raise 1447 1448 1449class OfflineTest(TestCase): 1450 def test_all(self): 1451 # Documented objects defined in the module should be in __all__ 1452 expected = {"responses"} # Allowlist documented dict() object 1453 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1454 # intentionally omitted for simplicity 1455 denylist = {"HTTPMessage", "parse_headers"} 1456 for name in dir(client): 1457 if name.startswith("_") or name in denylist: 1458 continue 1459 module_object = getattr(client, name) 1460 if getattr(module_object, "__module__", None) == "http.client": 1461 expected.add(name) 1462 self.assertCountEqual(client.__all__, expected) 1463 1464 def test_responses(self): 1465 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1466 1467 def test_client_constants(self): 1468 # Make sure we don't break backward compatibility with 3.4 1469 expected = [ 1470 'CONTINUE', 1471 'SWITCHING_PROTOCOLS', 1472 'PROCESSING', 1473 'OK', 1474 'CREATED', 1475 'ACCEPTED', 1476 'NON_AUTHORITATIVE_INFORMATION', 1477 'NO_CONTENT', 1478 'RESET_CONTENT', 1479 'PARTIAL_CONTENT', 1480 'MULTI_STATUS', 1481 'IM_USED', 1482 'MULTIPLE_CHOICES', 1483 'MOVED_PERMANENTLY', 1484 'FOUND', 1485 'SEE_OTHER', 1486 'NOT_MODIFIED', 1487 'USE_PROXY', 1488 'TEMPORARY_REDIRECT', 1489 'BAD_REQUEST', 1490 'UNAUTHORIZED', 1491 'PAYMENT_REQUIRED', 1492 'FORBIDDEN', 1493 'NOT_FOUND', 1494 'METHOD_NOT_ALLOWED', 1495 'NOT_ACCEPTABLE', 1496 'PROXY_AUTHENTICATION_REQUIRED', 1497 'REQUEST_TIMEOUT', 1498 'CONFLICT', 1499 'GONE', 1500 'LENGTH_REQUIRED', 1501 'PRECONDITION_FAILED', 1502 'REQUEST_ENTITY_TOO_LARGE', 1503 'REQUEST_URI_TOO_LONG', 1504 'UNSUPPORTED_MEDIA_TYPE', 1505 'REQUESTED_RANGE_NOT_SATISFIABLE', 1506 'EXPECTATION_FAILED', 1507 'IM_A_TEAPOT', 1508 'MISDIRECTED_REQUEST', 1509 'UNPROCESSABLE_ENTITY', 1510 'LOCKED', 1511 'FAILED_DEPENDENCY', 1512 'UPGRADE_REQUIRED', 1513 'PRECONDITION_REQUIRED', 1514 'TOO_MANY_REQUESTS', 1515 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1516 'UNAVAILABLE_FOR_LEGAL_REASONS', 1517 'INTERNAL_SERVER_ERROR', 1518 'NOT_IMPLEMENTED', 1519 'BAD_GATEWAY', 1520 'SERVICE_UNAVAILABLE', 1521 'GATEWAY_TIMEOUT', 1522 'HTTP_VERSION_NOT_SUPPORTED', 1523 'INSUFFICIENT_STORAGE', 1524 'NOT_EXTENDED', 1525 'NETWORK_AUTHENTICATION_REQUIRED', 1526 'EARLY_HINTS', 1527 'TOO_EARLY' 1528 ] 1529 for const in expected: 1530 with self.subTest(constant=const): 1531 self.assertTrue(hasattr(client, const)) 1532 1533 1534class SourceAddressTest(TestCase): 1535 def setUp(self): 1536 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1537 self.port = socket_helper.bind_port(self.serv) 1538 self.source_port = socket_helper.find_unused_port() 1539 self.serv.listen() 1540 self.conn = None 1541 1542 def tearDown(self): 1543 if self.conn: 1544 self.conn.close() 1545 self.conn = None 1546 self.serv.close() 1547 self.serv = None 1548 1549 def testHTTPConnectionSourceAddress(self): 1550 self.conn = client.HTTPConnection(HOST, self.port, 1551 source_address=('', self.source_port)) 1552 self.conn.connect() 1553 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1554 1555 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1556 'http.client.HTTPSConnection not defined') 1557 def testHTTPSConnectionSourceAddress(self): 1558 self.conn = client.HTTPSConnection(HOST, self.port, 1559 source_address=('', self.source_port)) 1560 # We don't test anything here other than the constructor not barfing as 1561 # this code doesn't deal with setting up an active running SSL server 1562 # for an ssl_wrapped connect() to actually return from. 1563 1564 1565class TimeoutTest(TestCase): 1566 PORT = None 1567 1568 def setUp(self): 1569 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1570 TimeoutTest.PORT = socket_helper.bind_port(self.serv) 1571 self.serv.listen() 1572 1573 def tearDown(self): 1574 self.serv.close() 1575 self.serv = None 1576 1577 def testTimeoutAttribute(self): 1578 # This will prove that the timeout gets through HTTPConnection 1579 # and into the socket. 1580 1581 # default -- use global socket timeout 1582 self.assertIsNone(socket.getdefaulttimeout()) 1583 socket.setdefaulttimeout(30) 1584 try: 1585 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1586 httpConn.connect() 1587 finally: 1588 socket.setdefaulttimeout(None) 1589 self.assertEqual(httpConn.sock.gettimeout(), 30) 1590 httpConn.close() 1591 1592 # no timeout -- do not use global socket default 1593 self.assertIsNone(socket.getdefaulttimeout()) 1594 socket.setdefaulttimeout(30) 1595 try: 1596 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1597 timeout=None) 1598 httpConn.connect() 1599 finally: 1600 socket.setdefaulttimeout(None) 1601 self.assertEqual(httpConn.sock.gettimeout(), None) 1602 httpConn.close() 1603 1604 # a value 1605 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1606 httpConn.connect() 1607 self.assertEqual(httpConn.sock.gettimeout(), 30) 1608 httpConn.close() 1609 1610 1611class PersistenceTest(TestCase): 1612 1613 def test_reuse_reconnect(self): 1614 # Should reuse or reconnect depending on header from server 1615 tests = ( 1616 ('1.0', '', False), 1617 ('1.0', 'Connection: keep-alive\r\n', True), 1618 ('1.1', '', True), 1619 ('1.1', 'Connection: close\r\n', False), 1620 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1621 ('1.1', 'Connection: cloSE\r\n', False), 1622 ) 1623 for version, header, reuse in tests: 1624 with self.subTest(version=version, header=header): 1625 msg = ( 1626 'HTTP/{} 200 OK\r\n' 1627 '{}' 1628 'Content-Length: 12\r\n' 1629 '\r\n' 1630 'Dummy body\r\n' 1631 ).format(version, header) 1632 conn = FakeSocketHTTPConnection(msg) 1633 self.assertIsNone(conn.sock) 1634 conn.request('GET', '/open-connection') 1635 with conn.getresponse() as response: 1636 self.assertEqual(conn.sock is None, not reuse) 1637 response.read() 1638 self.assertEqual(conn.sock is None, not reuse) 1639 self.assertEqual(conn.connections, 1) 1640 conn.request('GET', '/subsequent-request') 1641 self.assertEqual(conn.connections, 1 if reuse else 2) 1642 1643 def test_disconnected(self): 1644 1645 def make_reset_reader(text): 1646 """Return BufferedReader that raises ECONNRESET at EOF""" 1647 stream = io.BytesIO(text) 1648 def readinto(buffer): 1649 size = io.BytesIO.readinto(stream, buffer) 1650 if size == 0: 1651 raise ConnectionResetError() 1652 return size 1653 stream.readinto = readinto 1654 return io.BufferedReader(stream) 1655 1656 tests = ( 1657 (io.BytesIO, client.RemoteDisconnected), 1658 (make_reset_reader, ConnectionResetError), 1659 ) 1660 for stream_factory, exception in tests: 1661 with self.subTest(exception=exception): 1662 conn = FakeSocketHTTPConnection(b'', stream_factory) 1663 conn.request('GET', '/eof-response') 1664 self.assertRaises(exception, conn.getresponse) 1665 self.assertIsNone(conn.sock) 1666 # HTTPConnection.connect() should be automatically invoked 1667 conn.request('GET', '/reconnect') 1668 self.assertEqual(conn.connections, 2) 1669 1670 def test_100_close(self): 1671 conn = FakeSocketHTTPConnection( 1672 b'HTTP/1.1 100 Continue\r\n' 1673 b'\r\n' 1674 # Missing final response 1675 ) 1676 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1677 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1678 self.assertIsNone(conn.sock) 1679 conn.request('GET', '/reconnect') 1680 self.assertEqual(conn.connections, 2) 1681 1682 1683class HTTPSTest(TestCase): 1684 1685 def setUp(self): 1686 if not hasattr(client, 'HTTPSConnection'): 1687 self.skipTest('ssl support required') 1688 1689 def make_server(self, certfile): 1690 from test.ssl_servers import make_https_server 1691 return make_https_server(self, certfile=certfile) 1692 1693 def test_attributes(self): 1694 # simple test to check it's storing the timeout 1695 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1696 self.assertEqual(h.timeout, 30) 1697 1698 def test_networked(self): 1699 # Default settings: requires a valid cert from a trusted CA 1700 import ssl 1701 support.requires('network') 1702 with socket_helper.transient_internet('self-signed.pythontest.net'): 1703 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1704 with self.assertRaises(ssl.SSLError) as exc_info: 1705 h.request('GET', '/') 1706 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1707 1708 def test_networked_noverification(self): 1709 # Switch off cert verification 1710 import ssl 1711 support.requires('network') 1712 with socket_helper.transient_internet('self-signed.pythontest.net'): 1713 context = ssl._create_unverified_context() 1714 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1715 context=context) 1716 h.request('GET', '/') 1717 resp = h.getresponse() 1718 h.close() 1719 self.assertIn('nginx', resp.getheader('server')) 1720 resp.close() 1721 1722 @support.system_must_validate_cert 1723 def test_networked_trusted_by_default_cert(self): 1724 # Default settings: requires a valid cert from a trusted CA 1725 support.requires('network') 1726 with socket_helper.transient_internet('www.python.org'): 1727 h = client.HTTPSConnection('www.python.org', 443) 1728 h.request('GET', '/') 1729 resp = h.getresponse() 1730 content_type = resp.getheader('content-type') 1731 resp.close() 1732 h.close() 1733 self.assertIn('text/html', content_type) 1734 1735 def test_networked_good_cert(self): 1736 # We feed the server's cert as a validating cert 1737 import ssl 1738 support.requires('network') 1739 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1740 with socket_helper.transient_internet(selfsigned_pythontestdotnet): 1741 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1742 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1743 self.assertEqual(context.check_hostname, True) 1744 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1745 try: 1746 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1747 context=context) 1748 h.request('GET', '/') 1749 resp = h.getresponse() 1750 except ssl.SSLError as ssl_err: 1751 ssl_err_str = str(ssl_err) 1752 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1753 # modern Linux distros (Debian Buster, etc) default OpenSSL 1754 # configurations it'll fail saying "key too weak" until we 1755 # address https://bugs.python.org/issue36816 to use a proper 1756 # key size on self-signed.pythontest.net. 1757 if re.search(r'(?i)key.too.weak', ssl_err_str): 1758 raise unittest.SkipTest( 1759 f'Got {ssl_err_str} trying to connect ' 1760 f'to {selfsigned_pythontestdotnet}. ' 1761 'See https://bugs.python.org/issue36816.') 1762 raise 1763 server_string = resp.getheader('server') 1764 resp.close() 1765 h.close() 1766 self.assertIn('nginx', server_string) 1767 1768 def test_networked_bad_cert(self): 1769 # We feed a "CA" cert that is unrelated to the server's cert 1770 import ssl 1771 support.requires('network') 1772 with socket_helper.transient_internet('self-signed.pythontest.net'): 1773 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1774 context.load_verify_locations(CERT_localhost) 1775 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1776 with self.assertRaises(ssl.SSLError) as exc_info: 1777 h.request('GET', '/') 1778 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1779 1780 def test_local_unknown_cert(self): 1781 # The custom cert isn't known to the default trust bundle 1782 import ssl 1783 server = self.make_server(CERT_localhost) 1784 h = client.HTTPSConnection('localhost', server.port) 1785 with self.assertRaises(ssl.SSLError) as exc_info: 1786 h.request('GET', '/') 1787 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1788 1789 def test_local_good_hostname(self): 1790 # The (valid) cert validates the HTTP hostname 1791 import ssl 1792 server = self.make_server(CERT_localhost) 1793 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1794 context.load_verify_locations(CERT_localhost) 1795 h = client.HTTPSConnection('localhost', server.port, context=context) 1796 self.addCleanup(h.close) 1797 h.request('GET', '/nonexistent') 1798 resp = h.getresponse() 1799 self.addCleanup(resp.close) 1800 self.assertEqual(resp.status, 404) 1801 1802 def test_local_bad_hostname(self): 1803 # The (valid) cert doesn't validate the HTTP hostname 1804 import ssl 1805 server = self.make_server(CERT_fakehostname) 1806 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1807 context.load_verify_locations(CERT_fakehostname) 1808 h = client.HTTPSConnection('localhost', server.port, context=context) 1809 with self.assertRaises(ssl.CertificateError): 1810 h.request('GET', '/') 1811 # Same with explicit check_hostname=True 1812 with warnings_helper.check_warnings(('', DeprecationWarning)): 1813 h = client.HTTPSConnection('localhost', server.port, 1814 context=context, check_hostname=True) 1815 with self.assertRaises(ssl.CertificateError): 1816 h.request('GET', '/') 1817 # With check_hostname=False, the mismatching is ignored 1818 context.check_hostname = False 1819 with warnings_helper.check_warnings(('', DeprecationWarning)): 1820 h = client.HTTPSConnection('localhost', server.port, 1821 context=context, check_hostname=False) 1822 h.request('GET', '/nonexistent') 1823 resp = h.getresponse() 1824 resp.close() 1825 h.close() 1826 self.assertEqual(resp.status, 404) 1827 # The context's check_hostname setting is used if one isn't passed to 1828 # HTTPSConnection. 1829 context.check_hostname = False 1830 h = client.HTTPSConnection('localhost', server.port, context=context) 1831 h.request('GET', '/nonexistent') 1832 resp = h.getresponse() 1833 self.assertEqual(resp.status, 404) 1834 resp.close() 1835 h.close() 1836 # Passing check_hostname to HTTPSConnection should override the 1837 # context's setting. 1838 with warnings_helper.check_warnings(('', DeprecationWarning)): 1839 h = client.HTTPSConnection('localhost', server.port, 1840 context=context, check_hostname=True) 1841 with self.assertRaises(ssl.CertificateError): 1842 h.request('GET', '/') 1843 1844 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1845 'http.client.HTTPSConnection not available') 1846 def test_host_port(self): 1847 # Check invalid host_port 1848 1849 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1850 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1851 1852 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1853 "fe80::207:e9ff:fe9b", 8000), 1854 ("www.python.org:443", "www.python.org", 443), 1855 ("www.python.org:", "www.python.org", 443), 1856 ("www.python.org", "www.python.org", 443), 1857 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1858 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1859 443)): 1860 c = client.HTTPSConnection(hp) 1861 self.assertEqual(h, c.host) 1862 self.assertEqual(p, c.port) 1863 1864 def test_tls13_pha(self): 1865 import ssl 1866 if not ssl.HAS_TLSv1_3: 1867 self.skipTest('TLS 1.3 support required') 1868 # just check status of PHA flag 1869 h = client.HTTPSConnection('localhost', 443) 1870 self.assertTrue(h._context.post_handshake_auth) 1871 1872 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1873 self.assertFalse(context.post_handshake_auth) 1874 h = client.HTTPSConnection('localhost', 443, context=context) 1875 self.assertIs(h._context, context) 1876 self.assertFalse(h._context.post_handshake_auth) 1877 1878 with warnings.catch_warnings(): 1879 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1880 DeprecationWarning) 1881 h = client.HTTPSConnection('localhost', 443, context=context, 1882 cert_file=CERT_localhost) 1883 self.assertTrue(h._context.post_handshake_auth) 1884 1885 1886class RequestBodyTest(TestCase): 1887 """Test cases where a request includes a message body.""" 1888 1889 def setUp(self): 1890 self.conn = client.HTTPConnection('example.com') 1891 self.conn.sock = self.sock = FakeSocket("") 1892 self.conn.sock = self.sock 1893 1894 def get_headers_and_fp(self): 1895 f = io.BytesIO(self.sock.data) 1896 f.readline() # read the request line 1897 message = client.parse_headers(f) 1898 return message, f 1899 1900 def test_list_body(self): 1901 # Note that no content-length is automatically calculated for 1902 # an iterable. The request will fall back to send chunked 1903 # transfer encoding. 1904 cases = ( 1905 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1906 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1907 ) 1908 for body, expected in cases: 1909 with self.subTest(body): 1910 self.conn = client.HTTPConnection('example.com') 1911 self.conn.sock = self.sock = FakeSocket('') 1912 1913 self.conn.request('PUT', '/url', body) 1914 msg, f = self.get_headers_and_fp() 1915 self.assertNotIn('Content-Type', msg) 1916 self.assertNotIn('Content-Length', msg) 1917 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1918 self.assertEqual(expected, f.read()) 1919 1920 def test_manual_content_length(self): 1921 # Set an incorrect content-length so that we can verify that 1922 # it will not be over-ridden by the library. 1923 self.conn.request("PUT", "/url", "body", 1924 {"Content-Length": "42"}) 1925 message, f = self.get_headers_and_fp() 1926 self.assertEqual("42", message.get("content-length")) 1927 self.assertEqual(4, len(f.read())) 1928 1929 def test_ascii_body(self): 1930 self.conn.request("PUT", "/url", "body") 1931 message, f = self.get_headers_and_fp() 1932 self.assertEqual("text/plain", message.get_content_type()) 1933 self.assertIsNone(message.get_charset()) 1934 self.assertEqual("4", message.get("content-length")) 1935 self.assertEqual(b'body', f.read()) 1936 1937 def test_latin1_body(self): 1938 self.conn.request("PUT", "/url", "body\xc1") 1939 message, f = self.get_headers_and_fp() 1940 self.assertEqual("text/plain", message.get_content_type()) 1941 self.assertIsNone(message.get_charset()) 1942 self.assertEqual("5", message.get("content-length")) 1943 self.assertEqual(b'body\xc1', f.read()) 1944 1945 def test_bytes_body(self): 1946 self.conn.request("PUT", "/url", b"body\xc1") 1947 message, f = self.get_headers_and_fp() 1948 self.assertEqual("text/plain", message.get_content_type()) 1949 self.assertIsNone(message.get_charset()) 1950 self.assertEqual("5", message.get("content-length")) 1951 self.assertEqual(b'body\xc1', f.read()) 1952 1953 def test_text_file_body(self): 1954 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1955 with open(os_helper.TESTFN, "w", encoding="utf-8") as f: 1956 f.write("body") 1957 with open(os_helper.TESTFN, encoding="utf-8") as f: 1958 self.conn.request("PUT", "/url", f) 1959 message, f = self.get_headers_and_fp() 1960 self.assertEqual("text/plain", message.get_content_type()) 1961 self.assertIsNone(message.get_charset()) 1962 # No content-length will be determined for files; the body 1963 # will be sent using chunked transfer encoding instead. 1964 self.assertIsNone(message.get("content-length")) 1965 self.assertEqual("chunked", message.get("transfer-encoding")) 1966 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1967 1968 def test_binary_file_body(self): 1969 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1970 with open(os_helper.TESTFN, "wb") as f: 1971 f.write(b"body\xc1") 1972 with open(os_helper.TESTFN, "rb") as f: 1973 self.conn.request("PUT", "/url", f) 1974 message, f = self.get_headers_and_fp() 1975 self.assertEqual("text/plain", message.get_content_type()) 1976 self.assertIsNone(message.get_charset()) 1977 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1978 self.assertNotIn("Content-Length", message) 1979 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1980 1981 1982class HTTPResponseTest(TestCase): 1983 1984 def setUp(self): 1985 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1986 second-value\r\n\r\nText" 1987 sock = FakeSocket(body) 1988 self.resp = client.HTTPResponse(sock) 1989 self.resp.begin() 1990 1991 def test_getting_header(self): 1992 header = self.resp.getheader('My-Header') 1993 self.assertEqual(header, 'first-value, second-value') 1994 1995 header = self.resp.getheader('My-Header', 'some default') 1996 self.assertEqual(header, 'first-value, second-value') 1997 1998 def test_getting_nonexistent_header_with_string_default(self): 1999 header = self.resp.getheader('No-Such-Header', 'default-value') 2000 self.assertEqual(header, 'default-value') 2001 2002 def test_getting_nonexistent_header_with_iterable_default(self): 2003 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 2004 self.assertEqual(header, 'default, values') 2005 2006 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 2007 self.assertEqual(header, 'default, values') 2008 2009 def test_getting_nonexistent_header_without_default(self): 2010 header = self.resp.getheader('No-Such-Header') 2011 self.assertEqual(header, None) 2012 2013 def test_getting_header_defaultint(self): 2014 header = self.resp.getheader('No-Such-Header',default=42) 2015 self.assertEqual(header, 42) 2016 2017class TunnelTests(TestCase): 2018 def setUp(self): 2019 response_text = ( 2020 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 2021 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 2022 'Content-Length: 42\r\n\r\n' 2023 ) 2024 self.host = 'proxy.com' 2025 self.conn = client.HTTPConnection(self.host) 2026 self.conn._create_connection = self._create_connection(response_text) 2027 2028 def tearDown(self): 2029 self.conn.close() 2030 2031 def _create_connection(self, response_text): 2032 def create_connection(address, timeout=None, source_address=None): 2033 return FakeSocket(response_text, host=address[0], port=address[1]) 2034 return create_connection 2035 2036 def test_set_tunnel_host_port_headers(self): 2037 tunnel_host = 'destination.com' 2038 tunnel_port = 8888 2039 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 2040 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 2041 headers=tunnel_headers) 2042 self.conn.request('HEAD', '/', '') 2043 self.assertEqual(self.conn.sock.host, self.host) 2044 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2045 self.assertEqual(self.conn._tunnel_host, tunnel_host) 2046 self.assertEqual(self.conn._tunnel_port, tunnel_port) 2047 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 2048 2049 def test_disallow_set_tunnel_after_connect(self): 2050 # Once connected, we shouldn't be able to tunnel anymore 2051 self.conn.connect() 2052 self.assertRaises(RuntimeError, self.conn.set_tunnel, 2053 'destination.com') 2054 2055 def test_connect_with_tunnel(self): 2056 self.conn.set_tunnel('destination.com') 2057 self.conn.request('HEAD', '/', '') 2058 self.assertEqual(self.conn.sock.host, self.host) 2059 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2060 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2061 # issue22095 2062 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 2063 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2064 2065 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 2066 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 2067 2068 def test_tunnel_connect_single_send_connection_setup(self): 2069 """Regresstion test for https://bugs.python.org/issue43332.""" 2070 with mock.patch.object(self.conn, 'send') as mock_send: 2071 self.conn.set_tunnel('destination.com') 2072 self.conn.connect() 2073 self.conn.request('GET', '/') 2074 mock_send.assert_called() 2075 # Likely 2, but this test only cares about the first. 2076 self.assertGreater( 2077 len(mock_send.mock_calls), 1, 2078 msg=f'unexpected number of send calls: {mock_send.mock_calls}') 2079 proxy_setup_data_sent = mock_send.mock_calls[0][1][0] 2080 self.assertIn(b'CONNECT destination.com', proxy_setup_data_sent) 2081 self.assertTrue( 2082 proxy_setup_data_sent.endswith(b'\r\n\r\n'), 2083 msg=f'unexpected proxy data sent {proxy_setup_data_sent!r}') 2084 2085 def test_connect_put_request(self): 2086 self.conn.set_tunnel('destination.com') 2087 self.conn.request('PUT', '/', '') 2088 self.assertEqual(self.conn.sock.host, self.host) 2089 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2090 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2091 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2092 2093 def test_tunnel_debuglog(self): 2094 expected_header = 'X-Dummy: 1' 2095 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 2096 2097 self.conn.set_debuglevel(1) 2098 self.conn._create_connection = self._create_connection(response_text) 2099 self.conn.set_tunnel('destination.com') 2100 2101 with support.captured_stdout() as output: 2102 self.conn.request('PUT', '/', '') 2103 lines = output.getvalue().splitlines() 2104 self.assertIn('header: {}'.format(expected_header), lines) 2105 2106 2107if __name__ == '__main__': 2108 unittest.main(verbosity=2) 2109