1import errno 2from http import client, HTTPStatus 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13TestCase = unittest.TestCase 14 15from test import support 16from test.support import socket_helper 17 18here = os.path.dirname(__file__) 19# Self-signed cert file for 'localhost' 20CERT_localhost = os.path.join(here, 'keycert.pem') 21# Self-signed cert file for 'fakehostname' 22CERT_fakehostname = os.path.join(here, 'keycert2.pem') 23# Self-signed cert file for self-signed.pythontest.net 24CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 25 26# constants for testing chunked encoding 27chunked_start = ( 28 'HTTP/1.1 200 OK\r\n' 29 'Transfer-Encoding: chunked\r\n\r\n' 30 'a\r\n' 31 'hello worl\r\n' 32 '3\r\n' 33 'd! \r\n' 34 '8\r\n' 35 'and now \r\n' 36 '22\r\n' 37 'for something completely different\r\n' 38) 39chunked_expected = b'hello world! and now for something completely different' 40chunk_extension = ";foo=bar" 41last_chunk = "0\r\n" 42last_chunk_extended = "0" + chunk_extension + "\r\n" 43trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 44chunked_end = "\r\n" 45 46HOST = socket_helper.HOST 47 48class FakeSocket: 49 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 50 if isinstance(text, str): 51 text = text.encode("ascii") 52 self.text = text 53 self.fileclass = fileclass 54 self.data = b'' 55 self.sendall_calls = 0 56 self.file_closed = False 57 self.host = host 58 self.port = port 59 60 def sendall(self, data): 61 self.sendall_calls += 1 62 self.data += data 63 64 def makefile(self, mode, bufsize=None): 65 if mode != 'r' and mode != 'rb': 66 raise client.UnimplementedFileMode() 67 # keep the file around so we can check how much was read from it 68 self.file = self.fileclass(self.text) 69 self.file.close = self.file_close #nerf close () 70 return self.file 71 72 def file_close(self): 73 self.file_closed = True 74 75 def close(self): 76 pass 77 78 def setsockopt(self, level, optname, value): 79 pass 80 81class EPipeSocket(FakeSocket): 82 83 def __init__(self, text, pipe_trigger): 84 # When sendall() is called with pipe_trigger, raise EPIPE. 85 FakeSocket.__init__(self, text) 86 self.pipe_trigger = pipe_trigger 87 88 def sendall(self, data): 89 if self.pipe_trigger in data: 90 raise OSError(errno.EPIPE, "gotcha") 91 self.data += data 92 93 def close(self): 94 pass 95 96class NoEOFBytesIO(io.BytesIO): 97 """Like BytesIO, but raises AssertionError on EOF. 98 99 This is used below to test that http.client doesn't try to read 100 more from the underlying file than it should. 101 """ 102 def read(self, n=-1): 103 data = io.BytesIO.read(self, n) 104 if data == b'': 105 raise AssertionError('caller tried to read past EOF') 106 return data 107 108 def readline(self, length=None): 109 data = io.BytesIO.readline(self, length) 110 if data == b'': 111 raise AssertionError('caller tried to read past EOF') 112 return data 113 114class FakeSocketHTTPConnection(client.HTTPConnection): 115 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 116 117 def __init__(self, *args): 118 self.connections = 0 119 super().__init__('example.com') 120 self.fake_socket_args = args 121 self._create_connection = self.create_connection 122 123 def connect(self): 124 """Count the number of times connect() is invoked""" 125 self.connections += 1 126 return super().connect() 127 128 def create_connection(self, *pos, **kw): 129 return FakeSocket(*self.fake_socket_args) 130 131class HeaderTests(TestCase): 132 def test_auto_headers(self): 133 # Some headers are added automatically, but should not be added by 134 # .request() if they are explicitly set. 135 136 class HeaderCountingBuffer(list): 137 def __init__(self): 138 self.count = {} 139 def append(self, item): 140 kv = item.split(b':') 141 if len(kv) > 1: 142 # item is a 'Key: Value' header string 143 lcKey = kv[0].decode('ascii').lower() 144 self.count.setdefault(lcKey, 0) 145 self.count[lcKey] += 1 146 list.append(self, item) 147 148 for explicit_header in True, False: 149 for header in 'Content-length', 'Host', 'Accept-encoding': 150 conn = client.HTTPConnection('example.com') 151 conn.sock = FakeSocket('blahblahblah') 152 conn._buffer = HeaderCountingBuffer() 153 154 body = 'spamspamspam' 155 headers = {} 156 if explicit_header: 157 headers[header] = str(len(body)) 158 conn.request('POST', '/', body, headers) 159 self.assertEqual(conn._buffer.count[header.lower()], 1) 160 161 def test_content_length_0(self): 162 163 class ContentLengthChecker(list): 164 def __init__(self): 165 list.__init__(self) 166 self.content_length = None 167 def append(self, item): 168 kv = item.split(b':', 1) 169 if len(kv) > 1 and kv[0].lower() == b'content-length': 170 self.content_length = kv[1].strip() 171 list.append(self, item) 172 173 # Here, we're testing that methods expecting a body get a 174 # content-length set to zero if the body is empty (either None or '') 175 bodies = (None, '') 176 methods_with_body = ('PUT', 'POST', 'PATCH') 177 for method, body in itertools.product(methods_with_body, bodies): 178 conn = client.HTTPConnection('example.com') 179 conn.sock = FakeSocket(None) 180 conn._buffer = ContentLengthChecker() 181 conn.request(method, '/', body) 182 self.assertEqual( 183 conn._buffer.content_length, b'0', 184 'Header Content-Length incorrect on {}'.format(method) 185 ) 186 187 # For these methods, we make sure that content-length is not set when 188 # the body is None because it might cause unexpected behaviour on the 189 # server. 190 methods_without_body = ( 191 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 192 ) 193 for method in methods_without_body: 194 conn = client.HTTPConnection('example.com') 195 conn.sock = FakeSocket(None) 196 conn._buffer = ContentLengthChecker() 197 conn.request(method, '/', None) 198 self.assertEqual( 199 conn._buffer.content_length, None, 200 'Header Content-Length set for empty body on {}'.format(method) 201 ) 202 203 # If the body is set to '', that's considered to be "present but 204 # empty" rather than "missing", so content length would be set, even 205 # for methods that don't expect a body. 206 for method in methods_without_body: 207 conn = client.HTTPConnection('example.com') 208 conn.sock = FakeSocket(None) 209 conn._buffer = ContentLengthChecker() 210 conn.request(method, '/', '') 211 self.assertEqual( 212 conn._buffer.content_length, b'0', 213 'Header Content-Length incorrect on {}'.format(method) 214 ) 215 216 # If the body is set, make sure Content-Length is set. 217 for method in itertools.chain(methods_without_body, methods_with_body): 218 conn = client.HTTPConnection('example.com') 219 conn.sock = FakeSocket(None) 220 conn._buffer = ContentLengthChecker() 221 conn.request(method, '/', ' ') 222 self.assertEqual( 223 conn._buffer.content_length, b'1', 224 'Header Content-Length incorrect on {}'.format(method) 225 ) 226 227 def test_putheader(self): 228 conn = client.HTTPConnection('example.com') 229 conn.sock = FakeSocket(None) 230 conn.putrequest('GET','/') 231 conn.putheader('Content-length', 42) 232 self.assertIn(b'Content-length: 42', conn._buffer) 233 234 conn.putheader('Foo', ' bar ') 235 self.assertIn(b'Foo: bar ', conn._buffer) 236 conn.putheader('Bar', '\tbaz\t') 237 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 238 conn.putheader('Authorization', 'Bearer mytoken') 239 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 240 conn.putheader('IterHeader', 'IterA', 'IterB') 241 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 242 conn.putheader('LatinHeader', b'\xFF') 243 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 244 conn.putheader('Utf8Header', b'\xc3\x80') 245 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 246 conn.putheader('C1-Control', b'next\x85line') 247 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 248 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 249 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 250 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 251 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 252 conn.putheader('Key Space', 'value') 253 self.assertIn(b'Key Space: value', conn._buffer) 254 conn.putheader('KeySpace ', 'value') 255 self.assertIn(b'KeySpace : value', conn._buffer) 256 conn.putheader(b'Nonbreak\xa0Space', 'value') 257 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 258 conn.putheader(b'\xa0NonbreakSpace', 'value') 259 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 260 261 def test_ipv6host_header(self): 262 # Default host header on IPv6 transaction should be wrapped by [] if 263 # it is an IPv6 address 264 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 265 b'Accept-Encoding: identity\r\n\r\n' 266 conn = client.HTTPConnection('[2001::]:81') 267 sock = FakeSocket('') 268 conn.sock = sock 269 conn.request('GET', '/foo') 270 self.assertTrue(sock.data.startswith(expected)) 271 272 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 273 b'Accept-Encoding: identity\r\n\r\n' 274 conn = client.HTTPConnection('[2001:102A::]') 275 sock = FakeSocket('') 276 conn.sock = sock 277 conn.request('GET', '/foo') 278 self.assertTrue(sock.data.startswith(expected)) 279 280 def test_malformed_headers_coped_with(self): 281 # Issue 19996 282 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 283 sock = FakeSocket(body) 284 resp = client.HTTPResponse(sock) 285 resp.begin() 286 287 self.assertEqual(resp.getheader('First'), 'val') 288 self.assertEqual(resp.getheader('Second'), 'val') 289 290 def test_parse_all_octets(self): 291 # Ensure no valid header field octet breaks the parser 292 body = ( 293 b'HTTP/1.1 200 OK\r\n' 294 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 295 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 296 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 297 b'obs-fold: text\r\n' 298 b' folded with space\r\n' 299 b'\tfolded with tab\r\n' 300 b'Content-Length: 0\r\n' 301 b'\r\n' 302 ) 303 sock = FakeSocket(body) 304 resp = client.HTTPResponse(sock) 305 resp.begin() 306 self.assertEqual(resp.getheader('Content-Length'), '0') 307 self.assertEqual(resp.msg['Content-Length'], '0') 308 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 309 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 310 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 311 self.assertEqual(resp.getheader('VCHAR'), vchar) 312 self.assertEqual(resp.msg['VCHAR'], vchar) 313 self.assertIsNotNone(resp.getheader('obs-text')) 314 self.assertIn('obs-text', resp.msg) 315 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 316 self.assertTrue(folded.startswith('text')) 317 self.assertIn(' folded with space', folded) 318 self.assertTrue(folded.endswith('folded with tab')) 319 320 def test_invalid_headers(self): 321 conn = client.HTTPConnection('example.com') 322 conn.sock = FakeSocket('') 323 conn.putrequest('GET', '/') 324 325 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 326 # longer allowed in header names 327 cases = ( 328 (b'Invalid\r\nName', b'ValidValue'), 329 (b'Invalid\rName', b'ValidValue'), 330 (b'Invalid\nName', b'ValidValue'), 331 (b'\r\nInvalidName', b'ValidValue'), 332 (b'\rInvalidName', b'ValidValue'), 333 (b'\nInvalidName', b'ValidValue'), 334 (b' InvalidName', b'ValidValue'), 335 (b'\tInvalidName', b'ValidValue'), 336 (b'Invalid:Name', b'ValidValue'), 337 (b':InvalidName', b'ValidValue'), 338 (b'ValidName', b'Invalid\r\nValue'), 339 (b'ValidName', b'Invalid\rValue'), 340 (b'ValidName', b'Invalid\nValue'), 341 (b'ValidName', b'InvalidValue\r\n'), 342 (b'ValidName', b'InvalidValue\r'), 343 (b'ValidName', b'InvalidValue\n'), 344 ) 345 for name, value in cases: 346 with self.subTest((name, value)): 347 with self.assertRaisesRegex(ValueError, 'Invalid header'): 348 conn.putheader(name, value) 349 350 def test_headers_debuglevel(self): 351 body = ( 352 b'HTTP/1.1 200 OK\r\n' 353 b'First: val\r\n' 354 b'Second: val1\r\n' 355 b'Second: val2\r\n' 356 ) 357 sock = FakeSocket(body) 358 resp = client.HTTPResponse(sock, debuglevel=1) 359 with support.captured_stdout() as output: 360 resp.begin() 361 lines = output.getvalue().splitlines() 362 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 363 self.assertEqual(lines[1], "header: First: val") 364 self.assertEqual(lines[2], "header: Second: val1") 365 self.assertEqual(lines[3], "header: Second: val2") 366 367 368class HttpMethodTests(TestCase): 369 def test_invalid_method_names(self): 370 methods = ( 371 'GET\r', 372 'POST\n', 373 'PUT\n\r', 374 'POST\nValue', 375 'POST\nHOST:abc', 376 'GET\nrHost:abc\n', 377 'POST\rRemainder:\r', 378 'GET\rHOST:\n', 379 '\nPUT' 380 ) 381 382 for method in methods: 383 with self.assertRaisesRegex( 384 ValueError, "method can't contain control characters"): 385 conn = client.HTTPConnection('example.com') 386 conn.sock = FakeSocket(None) 387 conn.request(method=method, url="/") 388 389 390class TransferEncodingTest(TestCase): 391 expected_body = b"It's just a flesh wound" 392 393 def test_endheaders_chunked(self): 394 conn = client.HTTPConnection('example.com') 395 conn.sock = FakeSocket(b'') 396 conn.putrequest('POST', '/') 397 conn.endheaders(self._make_body(), encode_chunked=True) 398 399 _, _, body = self._parse_request(conn.sock.data) 400 body = self._parse_chunked(body) 401 self.assertEqual(body, self.expected_body) 402 403 def test_explicit_headers(self): 404 # explicit chunked 405 conn = client.HTTPConnection('example.com') 406 conn.sock = FakeSocket(b'') 407 # this shouldn't actually be automatically chunk-encoded because the 408 # calling code has explicitly stated that it's taking care of it 409 conn.request( 410 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 411 412 _, headers, body = self._parse_request(conn.sock.data) 413 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 414 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 415 self.assertEqual(body, self.expected_body) 416 417 # explicit chunked, string body 418 conn = client.HTTPConnection('example.com') 419 conn.sock = FakeSocket(b'') 420 conn.request( 421 'POST', '/', self.expected_body.decode('latin-1'), 422 {'Transfer-Encoding': 'chunked'}) 423 424 _, headers, body = self._parse_request(conn.sock.data) 425 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 426 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 427 self.assertEqual(body, self.expected_body) 428 429 # User-specified TE, but request() does the chunk encoding 430 conn = client.HTTPConnection('example.com') 431 conn.sock = FakeSocket(b'') 432 conn.request('POST', '/', 433 headers={'Transfer-Encoding': 'gzip, chunked'}, 434 encode_chunked=True, 435 body=self._make_body()) 436 _, headers, body = self._parse_request(conn.sock.data) 437 self.assertNotIn('content-length', [k.lower() for k in headers]) 438 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 439 self.assertEqual(self._parse_chunked(body), self.expected_body) 440 441 def test_request(self): 442 for empty_lines in (False, True,): 443 conn = client.HTTPConnection('example.com') 444 conn.sock = FakeSocket(b'') 445 conn.request( 446 'POST', '/', self._make_body(empty_lines=empty_lines)) 447 448 _, headers, body = self._parse_request(conn.sock.data) 449 body = self._parse_chunked(body) 450 self.assertEqual(body, self.expected_body) 451 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 452 453 # Content-Length and Transfer-Encoding SHOULD not be sent in the 454 # same request 455 self.assertNotIn('content-length', [k.lower() for k in headers]) 456 457 def test_empty_body(self): 458 # Zero-length iterable should be treated like any other iterable 459 conn = client.HTTPConnection('example.com') 460 conn.sock = FakeSocket(b'') 461 conn.request('POST', '/', ()) 462 _, headers, body = self._parse_request(conn.sock.data) 463 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 464 self.assertNotIn('content-length', [k.lower() for k in headers]) 465 self.assertEqual(body, b"0\r\n\r\n") 466 467 def _make_body(self, empty_lines=False): 468 lines = self.expected_body.split(b' ') 469 for idx, line in enumerate(lines): 470 # for testing handling empty lines 471 if empty_lines and idx % 2: 472 yield b'' 473 if idx < len(lines) - 1: 474 yield line + b' ' 475 else: 476 yield line 477 478 def _parse_request(self, data): 479 lines = data.split(b'\r\n') 480 request = lines[0] 481 headers = {} 482 n = 1 483 while n < len(lines) and len(lines[n]) > 0: 484 key, val = lines[n].split(b':') 485 key = key.decode('latin-1').strip() 486 headers[key] = val.decode('latin-1').strip() 487 n += 1 488 489 return request, headers, b'\r\n'.join(lines[n + 1:]) 490 491 def _parse_chunked(self, data): 492 body = [] 493 trailers = {} 494 n = 0 495 lines = data.split(b'\r\n') 496 # parse body 497 while True: 498 size, chunk = lines[n:n+2] 499 size = int(size, 16) 500 501 if size == 0: 502 n += 1 503 break 504 505 self.assertEqual(size, len(chunk)) 506 body.append(chunk) 507 508 n += 2 509 # we /should/ hit the end chunk, but check against the size of 510 # lines so we're not stuck in an infinite loop should we get 511 # malformed data 512 if n > len(lines): 513 break 514 515 return b''.join(body) 516 517 518class BasicTest(TestCase): 519 def test_dir_with_added_behavior_on_status(self): 520 # see issue40084 521 self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404)))) 522 523 def test_status_lines(self): 524 # Test HTTP status lines 525 526 body = "HTTP/1.1 200 Ok\r\n\r\nText" 527 sock = FakeSocket(body) 528 resp = client.HTTPResponse(sock) 529 resp.begin() 530 self.assertEqual(resp.read(0), b'') # Issue #20007 531 self.assertFalse(resp.isclosed()) 532 self.assertFalse(resp.closed) 533 self.assertEqual(resp.read(), b"Text") 534 self.assertTrue(resp.isclosed()) 535 self.assertFalse(resp.closed) 536 resp.close() 537 self.assertTrue(resp.closed) 538 539 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 540 sock = FakeSocket(body) 541 resp = client.HTTPResponse(sock) 542 self.assertRaises(client.BadStatusLine, resp.begin) 543 544 def test_bad_status_repr(self): 545 exc = client.BadStatusLine('') 546 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 547 548 def test_partial_reads(self): 549 # if we have Content-Length, HTTPResponse knows when to close itself, 550 # the same behaviour as when we read the whole thing with read() 551 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 552 sock = FakeSocket(body) 553 resp = client.HTTPResponse(sock) 554 resp.begin() 555 self.assertEqual(resp.read(2), b'Te') 556 self.assertFalse(resp.isclosed()) 557 self.assertEqual(resp.read(2), b'xt') 558 self.assertTrue(resp.isclosed()) 559 self.assertFalse(resp.closed) 560 resp.close() 561 self.assertTrue(resp.closed) 562 563 def test_mixed_reads(self): 564 # readline() should update the remaining length, so that read() knows 565 # how much data is left and does not raise IncompleteRead 566 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 567 sock = FakeSocket(body) 568 resp = client.HTTPResponse(sock) 569 resp.begin() 570 self.assertEqual(resp.readline(), b'Text\r\n') 571 self.assertFalse(resp.isclosed()) 572 self.assertEqual(resp.read(), b'Another') 573 self.assertTrue(resp.isclosed()) 574 self.assertFalse(resp.closed) 575 resp.close() 576 self.assertTrue(resp.closed) 577 578 def test_partial_readintos(self): 579 # if we have Content-Length, HTTPResponse knows when to close itself, 580 # the same behaviour as when we read the whole thing with read() 581 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 582 sock = FakeSocket(body) 583 resp = client.HTTPResponse(sock) 584 resp.begin() 585 b = bytearray(2) 586 n = resp.readinto(b) 587 self.assertEqual(n, 2) 588 self.assertEqual(bytes(b), b'Te') 589 self.assertFalse(resp.isclosed()) 590 n = resp.readinto(b) 591 self.assertEqual(n, 2) 592 self.assertEqual(bytes(b), b'xt') 593 self.assertTrue(resp.isclosed()) 594 self.assertFalse(resp.closed) 595 resp.close() 596 self.assertTrue(resp.closed) 597 598 def test_partial_reads_no_content_length(self): 599 # when no length is present, the socket should be gracefully closed when 600 # all data was read 601 body = "HTTP/1.1 200 Ok\r\n\r\nText" 602 sock = FakeSocket(body) 603 resp = client.HTTPResponse(sock) 604 resp.begin() 605 self.assertEqual(resp.read(2), b'Te') 606 self.assertFalse(resp.isclosed()) 607 self.assertEqual(resp.read(2), b'xt') 608 self.assertEqual(resp.read(1), b'') 609 self.assertTrue(resp.isclosed()) 610 self.assertFalse(resp.closed) 611 resp.close() 612 self.assertTrue(resp.closed) 613 614 def test_partial_readintos_no_content_length(self): 615 # when no length is present, the socket should be gracefully closed when 616 # all data was read 617 body = "HTTP/1.1 200 Ok\r\n\r\nText" 618 sock = FakeSocket(body) 619 resp = client.HTTPResponse(sock) 620 resp.begin() 621 b = bytearray(2) 622 n = resp.readinto(b) 623 self.assertEqual(n, 2) 624 self.assertEqual(bytes(b), b'Te') 625 self.assertFalse(resp.isclosed()) 626 n = resp.readinto(b) 627 self.assertEqual(n, 2) 628 self.assertEqual(bytes(b), b'xt') 629 n = resp.readinto(b) 630 self.assertEqual(n, 0) 631 self.assertTrue(resp.isclosed()) 632 633 def test_partial_reads_incomplete_body(self): 634 # if the server shuts down the connection before the whole 635 # content-length is delivered, the socket is gracefully closed 636 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 637 sock = FakeSocket(body) 638 resp = client.HTTPResponse(sock) 639 resp.begin() 640 self.assertEqual(resp.read(2), b'Te') 641 self.assertFalse(resp.isclosed()) 642 self.assertEqual(resp.read(2), b'xt') 643 self.assertEqual(resp.read(1), b'') 644 self.assertTrue(resp.isclosed()) 645 646 def test_partial_readintos_incomplete_body(self): 647 # if the server shuts down the connection before the whole 648 # content-length is delivered, the socket is gracefully closed 649 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 650 sock = FakeSocket(body) 651 resp = client.HTTPResponse(sock) 652 resp.begin() 653 b = bytearray(2) 654 n = resp.readinto(b) 655 self.assertEqual(n, 2) 656 self.assertEqual(bytes(b), b'Te') 657 self.assertFalse(resp.isclosed()) 658 n = resp.readinto(b) 659 self.assertEqual(n, 2) 660 self.assertEqual(bytes(b), b'xt') 661 n = resp.readinto(b) 662 self.assertEqual(n, 0) 663 self.assertTrue(resp.isclosed()) 664 self.assertFalse(resp.closed) 665 resp.close() 666 self.assertTrue(resp.closed) 667 668 def test_host_port(self): 669 # Check invalid host_port 670 671 for hp in ("www.python.org:abc", "user:password@www.python.org"): 672 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 673 674 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 675 "fe80::207:e9ff:fe9b", 8000), 676 ("www.python.org:80", "www.python.org", 80), 677 ("www.python.org:", "www.python.org", 80), 678 ("www.python.org", "www.python.org", 80), 679 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 680 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 681 c = client.HTTPConnection(hp) 682 self.assertEqual(h, c.host) 683 self.assertEqual(p, c.port) 684 685 def test_response_headers(self): 686 # test response with multiple message headers with the same field name. 687 text = ('HTTP/1.1 200 OK\r\n' 688 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 689 'Version="1"; Path="/acme"\r\n' 690 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 691 ' Path="/acme"\r\n' 692 '\r\n' 693 'No body\r\n') 694 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 695 ', ' 696 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 697 s = FakeSocket(text) 698 r = client.HTTPResponse(s) 699 r.begin() 700 cookies = r.getheader("Set-Cookie") 701 self.assertEqual(cookies, hdr) 702 703 def test_read_head(self): 704 # Test that the library doesn't attempt to read any data 705 # from a HEAD request. (Tickles SF bug #622042.) 706 sock = FakeSocket( 707 'HTTP/1.1 200 OK\r\n' 708 'Content-Length: 14432\r\n' 709 '\r\n', 710 NoEOFBytesIO) 711 resp = client.HTTPResponse(sock, method="HEAD") 712 resp.begin() 713 if resp.read(): 714 self.fail("Did not expect response from HEAD request") 715 716 def test_readinto_head(self): 717 # Test that the library doesn't attempt to read any data 718 # from a HEAD request. (Tickles SF bug #622042.) 719 sock = FakeSocket( 720 'HTTP/1.1 200 OK\r\n' 721 'Content-Length: 14432\r\n' 722 '\r\n', 723 NoEOFBytesIO) 724 resp = client.HTTPResponse(sock, method="HEAD") 725 resp.begin() 726 b = bytearray(5) 727 if resp.readinto(b) != 0: 728 self.fail("Did not expect response from HEAD request") 729 self.assertEqual(bytes(b), b'\x00'*5) 730 731 def test_too_many_headers(self): 732 headers = '\r\n'.join('Header%d: foo' % i 733 for i in range(client._MAXHEADERS + 1)) + '\r\n' 734 text = ('HTTP/1.1 200 OK\r\n' + headers) 735 s = FakeSocket(text) 736 r = client.HTTPResponse(s) 737 self.assertRaisesRegex(client.HTTPException, 738 r"got more than \d+ headers", r.begin) 739 740 def test_send_file(self): 741 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 742 b'Accept-Encoding: identity\r\n' 743 b'Transfer-Encoding: chunked\r\n' 744 b'\r\n') 745 746 with open(__file__, 'rb') as body: 747 conn = client.HTTPConnection('example.com') 748 sock = FakeSocket(body) 749 conn.sock = sock 750 conn.request('GET', '/foo', body) 751 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 752 (sock.data[:len(expected)], expected)) 753 754 def test_send(self): 755 expected = b'this is a test this is only a test' 756 conn = client.HTTPConnection('example.com') 757 sock = FakeSocket(None) 758 conn.sock = sock 759 conn.send(expected) 760 self.assertEqual(expected, sock.data) 761 sock.data = b'' 762 conn.send(array.array('b', expected)) 763 self.assertEqual(expected, sock.data) 764 sock.data = b'' 765 conn.send(io.BytesIO(expected)) 766 self.assertEqual(expected, sock.data) 767 768 def test_send_updating_file(self): 769 def data(): 770 yield 'data' 771 yield None 772 yield 'data_two' 773 774 class UpdatingFile(io.TextIOBase): 775 mode = 'r' 776 d = data() 777 def read(self, blocksize=-1): 778 return next(self.d) 779 780 expected = b'data' 781 782 conn = client.HTTPConnection('example.com') 783 sock = FakeSocket("") 784 conn.sock = sock 785 conn.send(UpdatingFile()) 786 self.assertEqual(sock.data, expected) 787 788 789 def test_send_iter(self): 790 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 791 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 792 b'\r\nonetwothree' 793 794 def body(): 795 yield b"one" 796 yield b"two" 797 yield b"three" 798 799 conn = client.HTTPConnection('example.com') 800 sock = FakeSocket("") 801 conn.sock = sock 802 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 803 self.assertEqual(sock.data, expected) 804 805 def test_blocksize_request(self): 806 """Check that request() respects the configured block size.""" 807 blocksize = 8 # For easy debugging. 808 conn = client.HTTPConnection('example.com', blocksize=blocksize) 809 sock = FakeSocket(None) 810 conn.sock = sock 811 expected = b"a" * blocksize + b"b" 812 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 813 self.assertEqual(sock.sendall_calls, 3) 814 body = sock.data.split(b"\r\n\r\n", 1)[1] 815 self.assertEqual(body, expected) 816 817 def test_blocksize_send(self): 818 """Check that send() respects the configured block size.""" 819 blocksize = 8 # For easy debugging. 820 conn = client.HTTPConnection('example.com', blocksize=blocksize) 821 sock = FakeSocket(None) 822 conn.sock = sock 823 expected = b"a" * blocksize + b"b" 824 conn.send(io.BytesIO(expected)) 825 self.assertEqual(sock.sendall_calls, 2) 826 self.assertEqual(sock.data, expected) 827 828 def test_send_type_error(self): 829 # See: Issue #12676 830 conn = client.HTTPConnection('example.com') 831 conn.sock = FakeSocket('') 832 with self.assertRaises(TypeError): 833 conn.request('POST', 'test', conn) 834 835 def test_chunked(self): 836 expected = chunked_expected 837 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 838 resp = client.HTTPResponse(sock, method="GET") 839 resp.begin() 840 self.assertEqual(resp.read(), expected) 841 resp.close() 842 843 # Various read sizes 844 for n in range(1, 12): 845 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 846 resp = client.HTTPResponse(sock, method="GET") 847 resp.begin() 848 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 849 resp.close() 850 851 for x in ('', 'foo\r\n'): 852 sock = FakeSocket(chunked_start + x) 853 resp = client.HTTPResponse(sock, method="GET") 854 resp.begin() 855 try: 856 resp.read() 857 except client.IncompleteRead as i: 858 self.assertEqual(i.partial, expected) 859 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 860 self.assertEqual(repr(i), expected_message) 861 self.assertEqual(str(i), expected_message) 862 else: 863 self.fail('IncompleteRead expected') 864 finally: 865 resp.close() 866 867 def test_readinto_chunked(self): 868 869 expected = chunked_expected 870 nexpected = len(expected) 871 b = bytearray(128) 872 873 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 874 resp = client.HTTPResponse(sock, method="GET") 875 resp.begin() 876 n = resp.readinto(b) 877 self.assertEqual(b[:nexpected], expected) 878 self.assertEqual(n, nexpected) 879 resp.close() 880 881 # Various read sizes 882 for n in range(1, 12): 883 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 884 resp = client.HTTPResponse(sock, method="GET") 885 resp.begin() 886 m = memoryview(b) 887 i = resp.readinto(m[0:n]) 888 i += resp.readinto(m[i:n + i]) 889 i += resp.readinto(m[i:]) 890 self.assertEqual(b[:nexpected], expected) 891 self.assertEqual(i, nexpected) 892 resp.close() 893 894 for x in ('', 'foo\r\n'): 895 sock = FakeSocket(chunked_start + x) 896 resp = client.HTTPResponse(sock, method="GET") 897 resp.begin() 898 try: 899 n = resp.readinto(b) 900 except client.IncompleteRead as i: 901 self.assertEqual(i.partial, expected) 902 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 903 self.assertEqual(repr(i), expected_message) 904 self.assertEqual(str(i), expected_message) 905 else: 906 self.fail('IncompleteRead expected') 907 finally: 908 resp.close() 909 910 def test_chunked_head(self): 911 chunked_start = ( 912 'HTTP/1.1 200 OK\r\n' 913 'Transfer-Encoding: chunked\r\n\r\n' 914 'a\r\n' 915 'hello world\r\n' 916 '1\r\n' 917 'd\r\n' 918 ) 919 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 920 resp = client.HTTPResponse(sock, method="HEAD") 921 resp.begin() 922 self.assertEqual(resp.read(), b'') 923 self.assertEqual(resp.status, 200) 924 self.assertEqual(resp.reason, 'OK') 925 self.assertTrue(resp.isclosed()) 926 self.assertFalse(resp.closed) 927 resp.close() 928 self.assertTrue(resp.closed) 929 930 def test_readinto_chunked_head(self): 931 chunked_start = ( 932 'HTTP/1.1 200 OK\r\n' 933 'Transfer-Encoding: chunked\r\n\r\n' 934 'a\r\n' 935 'hello world\r\n' 936 '1\r\n' 937 'd\r\n' 938 ) 939 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 940 resp = client.HTTPResponse(sock, method="HEAD") 941 resp.begin() 942 b = bytearray(5) 943 n = resp.readinto(b) 944 self.assertEqual(n, 0) 945 self.assertEqual(bytes(b), b'\x00'*5) 946 self.assertEqual(resp.status, 200) 947 self.assertEqual(resp.reason, 'OK') 948 self.assertTrue(resp.isclosed()) 949 self.assertFalse(resp.closed) 950 resp.close() 951 self.assertTrue(resp.closed) 952 953 def test_negative_content_length(self): 954 sock = FakeSocket( 955 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 956 resp = client.HTTPResponse(sock, method="GET") 957 resp.begin() 958 self.assertEqual(resp.read(), b'Hello\r\n') 959 self.assertTrue(resp.isclosed()) 960 961 def test_incomplete_read(self): 962 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 963 resp = client.HTTPResponse(sock, method="GET") 964 resp.begin() 965 try: 966 resp.read() 967 except client.IncompleteRead as i: 968 self.assertEqual(i.partial, b'Hello\r\n') 969 self.assertEqual(repr(i), 970 "IncompleteRead(7 bytes read, 3 more expected)") 971 self.assertEqual(str(i), 972 "IncompleteRead(7 bytes read, 3 more expected)") 973 self.assertTrue(resp.isclosed()) 974 else: 975 self.fail('IncompleteRead expected') 976 977 def test_epipe(self): 978 sock = EPipeSocket( 979 "HTTP/1.0 401 Authorization Required\r\n" 980 "Content-type: text/html\r\n" 981 "WWW-Authenticate: Basic realm=\"example\"\r\n", 982 b"Content-Length") 983 conn = client.HTTPConnection("example.com") 984 conn.sock = sock 985 self.assertRaises(OSError, 986 lambda: conn.request("PUT", "/url", "body")) 987 resp = conn.getresponse() 988 self.assertEqual(401, resp.status) 989 self.assertEqual("Basic realm=\"example\"", 990 resp.getheader("www-authenticate")) 991 992 # Test lines overflowing the max line size (_MAXLINE in http.client) 993 994 def test_overflowing_status_line(self): 995 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 996 resp = client.HTTPResponse(FakeSocket(body)) 997 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 998 999 def test_overflowing_header_line(self): 1000 body = ( 1001 'HTTP/1.1 200 OK\r\n' 1002 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 1003 ) 1004 resp = client.HTTPResponse(FakeSocket(body)) 1005 self.assertRaises(client.LineTooLong, resp.begin) 1006 1007 def test_overflowing_header_limit_after_100(self): 1008 body = ( 1009 'HTTP/1.1 100 OK\r\n' 1010 'r\n' * 32768 1011 ) 1012 resp = client.HTTPResponse(FakeSocket(body)) 1013 with self.assertRaises(client.HTTPException) as cm: 1014 resp.begin() 1015 # We must assert more because other reasonable errors that we 1016 # do not want can also be HTTPException derived. 1017 self.assertIn('got more than ', str(cm.exception)) 1018 self.assertIn('headers', str(cm.exception)) 1019 1020 def test_overflowing_chunked_line(self): 1021 body = ( 1022 'HTTP/1.1 200 OK\r\n' 1023 'Transfer-Encoding: chunked\r\n\r\n' 1024 + '0' * 65536 + 'a\r\n' 1025 'hello world\r\n' 1026 '0\r\n' 1027 '\r\n' 1028 ) 1029 resp = client.HTTPResponse(FakeSocket(body)) 1030 resp.begin() 1031 self.assertRaises(client.LineTooLong, resp.read) 1032 1033 def test_early_eof(self): 1034 # Test httpresponse with no \r\n termination, 1035 body = "HTTP/1.1 200 Ok" 1036 sock = FakeSocket(body) 1037 resp = client.HTTPResponse(sock) 1038 resp.begin() 1039 self.assertEqual(resp.read(), b'') 1040 self.assertTrue(resp.isclosed()) 1041 self.assertFalse(resp.closed) 1042 resp.close() 1043 self.assertTrue(resp.closed) 1044 1045 def test_error_leak(self): 1046 # Test that the socket is not leaked if getresponse() fails 1047 conn = client.HTTPConnection('example.com') 1048 response = None 1049 class Response(client.HTTPResponse): 1050 def __init__(self, *pos, **kw): 1051 nonlocal response 1052 response = self # Avoid garbage collector closing the socket 1053 client.HTTPResponse.__init__(self, *pos, **kw) 1054 conn.response_class = Response 1055 conn.sock = FakeSocket('Invalid status line') 1056 conn.request('GET', '/') 1057 self.assertRaises(client.BadStatusLine, conn.getresponse) 1058 self.assertTrue(response.closed) 1059 self.assertTrue(conn.sock.file_closed) 1060 1061 def test_chunked_extension(self): 1062 extra = '3;foo=bar\r\n' + 'abc\r\n' 1063 expected = chunked_expected + b'abc' 1064 1065 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1066 resp = client.HTTPResponse(sock, method="GET") 1067 resp.begin() 1068 self.assertEqual(resp.read(), expected) 1069 resp.close() 1070 1071 def test_chunked_missing_end(self): 1072 """some servers may serve up a short chunked encoding stream""" 1073 expected = chunked_expected 1074 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1075 resp = client.HTTPResponse(sock, method="GET") 1076 resp.begin() 1077 self.assertEqual(resp.read(), expected) 1078 resp.close() 1079 1080 def test_chunked_trailers(self): 1081 """See that trailers are read and ignored""" 1082 expected = chunked_expected 1083 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1084 resp = client.HTTPResponse(sock, method="GET") 1085 resp.begin() 1086 self.assertEqual(resp.read(), expected) 1087 # we should have reached the end of the file 1088 self.assertEqual(sock.file.read(), b"") #we read to the end 1089 resp.close() 1090 1091 def test_chunked_sync(self): 1092 """Check that we don't read past the end of the chunked-encoding stream""" 1093 expected = chunked_expected 1094 extradata = "extradata" 1095 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1096 resp = client.HTTPResponse(sock, method="GET") 1097 resp.begin() 1098 self.assertEqual(resp.read(), expected) 1099 # the file should now have our extradata ready to be read 1100 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1101 resp.close() 1102 1103 def test_content_length_sync(self): 1104 """Check that we don't read past the end of the Content-Length stream""" 1105 extradata = b"extradata" 1106 expected = b"Hello123\r\n" 1107 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1108 resp = client.HTTPResponse(sock, method="GET") 1109 resp.begin() 1110 self.assertEqual(resp.read(), expected) 1111 # the file should now have our extradata ready to be read 1112 self.assertEqual(sock.file.read(), extradata) #we read to the end 1113 resp.close() 1114 1115 def test_readlines_content_length(self): 1116 extradata = b"extradata" 1117 expected = b"Hello123\r\n" 1118 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1119 resp = client.HTTPResponse(sock, method="GET") 1120 resp.begin() 1121 self.assertEqual(resp.readlines(2000), [expected]) 1122 # the file should now have our extradata ready to be read 1123 self.assertEqual(sock.file.read(), extradata) #we read to the end 1124 resp.close() 1125 1126 def test_read1_content_length(self): 1127 extradata = b"extradata" 1128 expected = b"Hello123\r\n" 1129 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1130 resp = client.HTTPResponse(sock, method="GET") 1131 resp.begin() 1132 self.assertEqual(resp.read1(2000), expected) 1133 # the file should now have our extradata ready to be read 1134 self.assertEqual(sock.file.read(), extradata) #we read to the end 1135 resp.close() 1136 1137 def test_readline_bound_content_length(self): 1138 extradata = b"extradata" 1139 expected = b"Hello123\r\n" 1140 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1141 resp = client.HTTPResponse(sock, method="GET") 1142 resp.begin() 1143 self.assertEqual(resp.readline(10), expected) 1144 self.assertEqual(resp.readline(10), b"") 1145 # the file should now have our extradata ready to be read 1146 self.assertEqual(sock.file.read(), extradata) #we read to the end 1147 resp.close() 1148 1149 def test_read1_bound_content_length(self): 1150 extradata = b"extradata" 1151 expected = b"Hello123\r\n" 1152 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1153 resp = client.HTTPResponse(sock, method="GET") 1154 resp.begin() 1155 self.assertEqual(resp.read1(20), expected*2) 1156 self.assertEqual(resp.read(), expected) 1157 # the file should now have our extradata ready to be read 1158 self.assertEqual(sock.file.read(), extradata) #we read to the end 1159 resp.close() 1160 1161 def test_response_fileno(self): 1162 # Make sure fd returned by fileno is valid. 1163 serv = socket.create_server((HOST, 0)) 1164 self.addCleanup(serv.close) 1165 1166 result = None 1167 def run_server(): 1168 [conn, address] = serv.accept() 1169 with conn, conn.makefile("rb") as reader: 1170 # Read the request header until a blank line 1171 while True: 1172 line = reader.readline() 1173 if not line.rstrip(b"\r\n"): 1174 break 1175 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1176 nonlocal result 1177 result = reader.read() 1178 1179 thread = threading.Thread(target=run_server) 1180 thread.start() 1181 self.addCleanup(thread.join, float(1)) 1182 conn = client.HTTPConnection(*serv.getsockname()) 1183 conn.request("CONNECT", "dummy:1234") 1184 response = conn.getresponse() 1185 try: 1186 self.assertEqual(response.status, client.OK) 1187 s = socket.socket(fileno=response.fileno()) 1188 try: 1189 s.sendall(b"proxied data\n") 1190 finally: 1191 s.detach() 1192 finally: 1193 response.close() 1194 conn.close() 1195 thread.join() 1196 self.assertEqual(result, b"proxied data\n") 1197 1198 def test_putrequest_override_domain_validation(self): 1199 """ 1200 It should be possible to override the default validation 1201 behavior in putrequest (bpo-38216). 1202 """ 1203 class UnsafeHTTPConnection(client.HTTPConnection): 1204 def _validate_path(self, url): 1205 pass 1206 1207 conn = UnsafeHTTPConnection('example.com') 1208 conn.sock = FakeSocket('') 1209 conn.putrequest('GET', '/\x00') 1210 1211 def test_putrequest_override_host_validation(self): 1212 class UnsafeHTTPConnection(client.HTTPConnection): 1213 def _validate_host(self, url): 1214 pass 1215 1216 conn = UnsafeHTTPConnection('example.com\r\n') 1217 conn.sock = FakeSocket('') 1218 # set skip_host so a ValueError is not raised upon adding the 1219 # invalid URL as the value of the "Host:" header 1220 conn.putrequest('GET', '/', skip_host=1) 1221 1222 def test_putrequest_override_encoding(self): 1223 """ 1224 It should be possible to override the default encoding 1225 to transmit bytes in another encoding even if invalid 1226 (bpo-36274). 1227 """ 1228 class UnsafeHTTPConnection(client.HTTPConnection): 1229 def _encode_request(self, str_url): 1230 return str_url.encode('utf-8') 1231 1232 conn = UnsafeHTTPConnection('example.com') 1233 conn.sock = FakeSocket('') 1234 conn.putrequest('GET', '/☃') 1235 1236 1237class ExtendedReadTest(TestCase): 1238 """ 1239 Test peek(), read1(), readline() 1240 """ 1241 lines = ( 1242 'HTTP/1.1 200 OK\r\n' 1243 '\r\n' 1244 'hello world!\n' 1245 'and now \n' 1246 'for something completely different\n' 1247 'foo' 1248 ) 1249 lines_expected = lines[lines.find('hello'):].encode("ascii") 1250 lines_chunked = ( 1251 'HTTP/1.1 200 OK\r\n' 1252 'Transfer-Encoding: chunked\r\n\r\n' 1253 'a\r\n' 1254 'hello worl\r\n' 1255 '3\r\n' 1256 'd!\n\r\n' 1257 '9\r\n' 1258 'and now \n\r\n' 1259 '23\r\n' 1260 'for something completely different\n\r\n' 1261 '3\r\n' 1262 'foo\r\n' 1263 '0\r\n' # terminating chunk 1264 '\r\n' # end of trailers 1265 ) 1266 1267 def setUp(self): 1268 sock = FakeSocket(self.lines) 1269 resp = client.HTTPResponse(sock, method="GET") 1270 resp.begin() 1271 resp.fp = io.BufferedReader(resp.fp) 1272 self.resp = resp 1273 1274 1275 1276 def test_peek(self): 1277 resp = self.resp 1278 # patch up the buffered peek so that it returns not too much stuff 1279 oldpeek = resp.fp.peek 1280 def mypeek(n=-1): 1281 p = oldpeek(n) 1282 if n >= 0: 1283 return p[:n] 1284 return p[:10] 1285 resp.fp.peek = mypeek 1286 1287 all = [] 1288 while True: 1289 # try a short peek 1290 p = resp.peek(3) 1291 if p: 1292 self.assertGreater(len(p), 0) 1293 # then unbounded peek 1294 p2 = resp.peek() 1295 self.assertGreaterEqual(len(p2), len(p)) 1296 self.assertTrue(p2.startswith(p)) 1297 next = resp.read(len(p2)) 1298 self.assertEqual(next, p2) 1299 else: 1300 next = resp.read() 1301 self.assertFalse(next) 1302 all.append(next) 1303 if not next: 1304 break 1305 self.assertEqual(b"".join(all), self.lines_expected) 1306 1307 def test_readline(self): 1308 resp = self.resp 1309 self._verify_readline(self.resp.readline, self.lines_expected) 1310 1311 def _verify_readline(self, readline, expected): 1312 all = [] 1313 while True: 1314 # short readlines 1315 line = readline(5) 1316 if line and line != b"foo": 1317 if len(line) < 5: 1318 self.assertTrue(line.endswith(b"\n")) 1319 all.append(line) 1320 if not line: 1321 break 1322 self.assertEqual(b"".join(all), expected) 1323 1324 def test_read1(self): 1325 resp = self.resp 1326 def r(): 1327 res = resp.read1(4) 1328 self.assertLessEqual(len(res), 4) 1329 return res 1330 readliner = Readliner(r) 1331 self._verify_readline(readliner.readline, self.lines_expected) 1332 1333 def test_read1_unbounded(self): 1334 resp = self.resp 1335 all = [] 1336 while True: 1337 data = resp.read1() 1338 if not data: 1339 break 1340 all.append(data) 1341 self.assertEqual(b"".join(all), self.lines_expected) 1342 1343 def test_read1_bounded(self): 1344 resp = self.resp 1345 all = [] 1346 while True: 1347 data = resp.read1(10) 1348 if not data: 1349 break 1350 self.assertLessEqual(len(data), 10) 1351 all.append(data) 1352 self.assertEqual(b"".join(all), self.lines_expected) 1353 1354 def test_read1_0(self): 1355 self.assertEqual(self.resp.read1(0), b"") 1356 1357 def test_peek_0(self): 1358 p = self.resp.peek(0) 1359 self.assertLessEqual(0, len(p)) 1360 1361 1362class ExtendedReadTestChunked(ExtendedReadTest): 1363 """ 1364 Test peek(), read1(), readline() in chunked mode 1365 """ 1366 lines = ( 1367 'HTTP/1.1 200 OK\r\n' 1368 'Transfer-Encoding: chunked\r\n\r\n' 1369 'a\r\n' 1370 'hello worl\r\n' 1371 '3\r\n' 1372 'd!\n\r\n' 1373 '9\r\n' 1374 'and now \n\r\n' 1375 '23\r\n' 1376 'for something completely different\n\r\n' 1377 '3\r\n' 1378 'foo\r\n' 1379 '0\r\n' # terminating chunk 1380 '\r\n' # end of trailers 1381 ) 1382 1383 1384class Readliner: 1385 """ 1386 a simple readline class that uses an arbitrary read function and buffering 1387 """ 1388 def __init__(self, readfunc): 1389 self.readfunc = readfunc 1390 self.remainder = b"" 1391 1392 def readline(self, limit): 1393 data = [] 1394 datalen = 0 1395 read = self.remainder 1396 try: 1397 while True: 1398 idx = read.find(b'\n') 1399 if idx != -1: 1400 break 1401 if datalen + len(read) >= limit: 1402 idx = limit - datalen - 1 1403 # read more data 1404 data.append(read) 1405 read = self.readfunc() 1406 if not read: 1407 idx = 0 #eof condition 1408 break 1409 idx += 1 1410 data.append(read[:idx]) 1411 self.remainder = read[idx:] 1412 return b"".join(data) 1413 except: 1414 self.remainder = b"".join(data) 1415 raise 1416 1417 1418class OfflineTest(TestCase): 1419 def test_all(self): 1420 # Documented objects defined in the module should be in __all__ 1421 expected = {"responses"} # Allowlist documented dict() object 1422 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1423 # intentionally omitted for simplicity 1424 blacklist = {"HTTPMessage", "parse_headers"} 1425 for name in dir(client): 1426 if name.startswith("_") or name in blacklist: 1427 continue 1428 module_object = getattr(client, name) 1429 if getattr(module_object, "__module__", None) == "http.client": 1430 expected.add(name) 1431 self.assertCountEqual(client.__all__, expected) 1432 1433 def test_responses(self): 1434 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1435 1436 def test_client_constants(self): 1437 # Make sure we don't break backward compatibility with 3.4 1438 expected = [ 1439 'CONTINUE', 1440 'SWITCHING_PROTOCOLS', 1441 'PROCESSING', 1442 'OK', 1443 'CREATED', 1444 'ACCEPTED', 1445 'NON_AUTHORITATIVE_INFORMATION', 1446 'NO_CONTENT', 1447 'RESET_CONTENT', 1448 'PARTIAL_CONTENT', 1449 'MULTI_STATUS', 1450 'IM_USED', 1451 'MULTIPLE_CHOICES', 1452 'MOVED_PERMANENTLY', 1453 'FOUND', 1454 'SEE_OTHER', 1455 'NOT_MODIFIED', 1456 'USE_PROXY', 1457 'TEMPORARY_REDIRECT', 1458 'BAD_REQUEST', 1459 'UNAUTHORIZED', 1460 'PAYMENT_REQUIRED', 1461 'FORBIDDEN', 1462 'NOT_FOUND', 1463 'METHOD_NOT_ALLOWED', 1464 'NOT_ACCEPTABLE', 1465 'PROXY_AUTHENTICATION_REQUIRED', 1466 'REQUEST_TIMEOUT', 1467 'CONFLICT', 1468 'GONE', 1469 'LENGTH_REQUIRED', 1470 'PRECONDITION_FAILED', 1471 'REQUEST_ENTITY_TOO_LARGE', 1472 'REQUEST_URI_TOO_LONG', 1473 'UNSUPPORTED_MEDIA_TYPE', 1474 'REQUESTED_RANGE_NOT_SATISFIABLE', 1475 'EXPECTATION_FAILED', 1476 'IM_A_TEAPOT', 1477 'MISDIRECTED_REQUEST', 1478 'UNPROCESSABLE_ENTITY', 1479 'LOCKED', 1480 'FAILED_DEPENDENCY', 1481 'UPGRADE_REQUIRED', 1482 'PRECONDITION_REQUIRED', 1483 'TOO_MANY_REQUESTS', 1484 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1485 'UNAVAILABLE_FOR_LEGAL_REASONS', 1486 'INTERNAL_SERVER_ERROR', 1487 'NOT_IMPLEMENTED', 1488 'BAD_GATEWAY', 1489 'SERVICE_UNAVAILABLE', 1490 'GATEWAY_TIMEOUT', 1491 'HTTP_VERSION_NOT_SUPPORTED', 1492 'INSUFFICIENT_STORAGE', 1493 'NOT_EXTENDED', 1494 'NETWORK_AUTHENTICATION_REQUIRED', 1495 'EARLY_HINTS', 1496 'TOO_EARLY' 1497 ] 1498 for const in expected: 1499 with self.subTest(constant=const): 1500 self.assertTrue(hasattr(client, const)) 1501 1502 1503class SourceAddressTest(TestCase): 1504 def setUp(self): 1505 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1506 self.port = socket_helper.bind_port(self.serv) 1507 self.source_port = socket_helper.find_unused_port() 1508 self.serv.listen() 1509 self.conn = None 1510 1511 def tearDown(self): 1512 if self.conn: 1513 self.conn.close() 1514 self.conn = None 1515 self.serv.close() 1516 self.serv = None 1517 1518 def testHTTPConnectionSourceAddress(self): 1519 self.conn = client.HTTPConnection(HOST, self.port, 1520 source_address=('', self.source_port)) 1521 self.conn.connect() 1522 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1523 1524 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1525 'http.client.HTTPSConnection not defined') 1526 def testHTTPSConnectionSourceAddress(self): 1527 self.conn = client.HTTPSConnection(HOST, self.port, 1528 source_address=('', self.source_port)) 1529 # We don't test anything here other than the constructor not barfing as 1530 # this code doesn't deal with setting up an active running SSL server 1531 # for an ssl_wrapped connect() to actually return from. 1532 1533 1534class TimeoutTest(TestCase): 1535 PORT = None 1536 1537 def setUp(self): 1538 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1539 TimeoutTest.PORT = socket_helper.bind_port(self.serv) 1540 self.serv.listen() 1541 1542 def tearDown(self): 1543 self.serv.close() 1544 self.serv = None 1545 1546 def testTimeoutAttribute(self): 1547 # This will prove that the timeout gets through HTTPConnection 1548 # and into the socket. 1549 1550 # default -- use global socket timeout 1551 self.assertIsNone(socket.getdefaulttimeout()) 1552 socket.setdefaulttimeout(30) 1553 try: 1554 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1555 httpConn.connect() 1556 finally: 1557 socket.setdefaulttimeout(None) 1558 self.assertEqual(httpConn.sock.gettimeout(), 30) 1559 httpConn.close() 1560 1561 # no timeout -- do not use global socket default 1562 self.assertIsNone(socket.getdefaulttimeout()) 1563 socket.setdefaulttimeout(30) 1564 try: 1565 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1566 timeout=None) 1567 httpConn.connect() 1568 finally: 1569 socket.setdefaulttimeout(None) 1570 self.assertEqual(httpConn.sock.gettimeout(), None) 1571 httpConn.close() 1572 1573 # a value 1574 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1575 httpConn.connect() 1576 self.assertEqual(httpConn.sock.gettimeout(), 30) 1577 httpConn.close() 1578 1579 1580class PersistenceTest(TestCase): 1581 1582 def test_reuse_reconnect(self): 1583 # Should reuse or reconnect depending on header from server 1584 tests = ( 1585 ('1.0', '', False), 1586 ('1.0', 'Connection: keep-alive\r\n', True), 1587 ('1.1', '', True), 1588 ('1.1', 'Connection: close\r\n', False), 1589 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1590 ('1.1', 'Connection: cloSE\r\n', False), 1591 ) 1592 for version, header, reuse in tests: 1593 with self.subTest(version=version, header=header): 1594 msg = ( 1595 'HTTP/{} 200 OK\r\n' 1596 '{}' 1597 'Content-Length: 12\r\n' 1598 '\r\n' 1599 'Dummy body\r\n' 1600 ).format(version, header) 1601 conn = FakeSocketHTTPConnection(msg) 1602 self.assertIsNone(conn.sock) 1603 conn.request('GET', '/open-connection') 1604 with conn.getresponse() as response: 1605 self.assertEqual(conn.sock is None, not reuse) 1606 response.read() 1607 self.assertEqual(conn.sock is None, not reuse) 1608 self.assertEqual(conn.connections, 1) 1609 conn.request('GET', '/subsequent-request') 1610 self.assertEqual(conn.connections, 1 if reuse else 2) 1611 1612 def test_disconnected(self): 1613 1614 def make_reset_reader(text): 1615 """Return BufferedReader that raises ECONNRESET at EOF""" 1616 stream = io.BytesIO(text) 1617 def readinto(buffer): 1618 size = io.BytesIO.readinto(stream, buffer) 1619 if size == 0: 1620 raise ConnectionResetError() 1621 return size 1622 stream.readinto = readinto 1623 return io.BufferedReader(stream) 1624 1625 tests = ( 1626 (io.BytesIO, client.RemoteDisconnected), 1627 (make_reset_reader, ConnectionResetError), 1628 ) 1629 for stream_factory, exception in tests: 1630 with self.subTest(exception=exception): 1631 conn = FakeSocketHTTPConnection(b'', stream_factory) 1632 conn.request('GET', '/eof-response') 1633 self.assertRaises(exception, conn.getresponse) 1634 self.assertIsNone(conn.sock) 1635 # HTTPConnection.connect() should be automatically invoked 1636 conn.request('GET', '/reconnect') 1637 self.assertEqual(conn.connections, 2) 1638 1639 def test_100_close(self): 1640 conn = FakeSocketHTTPConnection( 1641 b'HTTP/1.1 100 Continue\r\n' 1642 b'\r\n' 1643 # Missing final response 1644 ) 1645 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1646 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1647 self.assertIsNone(conn.sock) 1648 conn.request('GET', '/reconnect') 1649 self.assertEqual(conn.connections, 2) 1650 1651 1652class HTTPSTest(TestCase): 1653 1654 def setUp(self): 1655 if not hasattr(client, 'HTTPSConnection'): 1656 self.skipTest('ssl support required') 1657 1658 def make_server(self, certfile): 1659 from test.ssl_servers import make_https_server 1660 return make_https_server(self, certfile=certfile) 1661 1662 def test_attributes(self): 1663 # simple test to check it's storing the timeout 1664 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1665 self.assertEqual(h.timeout, 30) 1666 1667 def test_networked(self): 1668 # Default settings: requires a valid cert from a trusted CA 1669 import ssl 1670 support.requires('network') 1671 with socket_helper.transient_internet('self-signed.pythontest.net'): 1672 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1673 with self.assertRaises(ssl.SSLError) as exc_info: 1674 h.request('GET', '/') 1675 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1676 1677 def test_networked_noverification(self): 1678 # Switch off cert verification 1679 import ssl 1680 support.requires('network') 1681 with socket_helper.transient_internet('self-signed.pythontest.net'): 1682 context = ssl._create_unverified_context() 1683 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1684 context=context) 1685 h.request('GET', '/') 1686 resp = h.getresponse() 1687 h.close() 1688 self.assertIn('nginx', resp.getheader('server')) 1689 resp.close() 1690 1691 @support.system_must_validate_cert 1692 def test_networked_trusted_by_default_cert(self): 1693 # Default settings: requires a valid cert from a trusted CA 1694 support.requires('network') 1695 with socket_helper.transient_internet('www.python.org'): 1696 h = client.HTTPSConnection('www.python.org', 443) 1697 h.request('GET', '/') 1698 resp = h.getresponse() 1699 content_type = resp.getheader('content-type') 1700 resp.close() 1701 h.close() 1702 self.assertIn('text/html', content_type) 1703 1704 def test_networked_good_cert(self): 1705 # We feed the server's cert as a validating cert 1706 import ssl 1707 support.requires('network') 1708 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1709 with socket_helper.transient_internet(selfsigned_pythontestdotnet): 1710 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1711 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1712 self.assertEqual(context.check_hostname, True) 1713 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1714 try: 1715 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1716 context=context) 1717 h.request('GET', '/') 1718 resp = h.getresponse() 1719 except ssl.SSLError as ssl_err: 1720 ssl_err_str = str(ssl_err) 1721 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1722 # modern Linux distros (Debian Buster, etc) default OpenSSL 1723 # configurations it'll fail saying "key too weak" until we 1724 # address https://bugs.python.org/issue36816 to use a proper 1725 # key size on self-signed.pythontest.net. 1726 if re.search(r'(?i)key.too.weak', ssl_err_str): 1727 raise unittest.SkipTest( 1728 f'Got {ssl_err_str} trying to connect ' 1729 f'to {selfsigned_pythontestdotnet}. ' 1730 'See https://bugs.python.org/issue36816.') 1731 raise 1732 server_string = resp.getheader('server') 1733 resp.close() 1734 h.close() 1735 self.assertIn('nginx', server_string) 1736 1737 def test_networked_bad_cert(self): 1738 # We feed a "CA" cert that is unrelated to the server's cert 1739 import ssl 1740 support.requires('network') 1741 with socket_helper.transient_internet('self-signed.pythontest.net'): 1742 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1743 context.load_verify_locations(CERT_localhost) 1744 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1745 with self.assertRaises(ssl.SSLError) as exc_info: 1746 h.request('GET', '/') 1747 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1748 1749 def test_local_unknown_cert(self): 1750 # The custom cert isn't known to the default trust bundle 1751 import ssl 1752 server = self.make_server(CERT_localhost) 1753 h = client.HTTPSConnection('localhost', server.port) 1754 with self.assertRaises(ssl.SSLError) as exc_info: 1755 h.request('GET', '/') 1756 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1757 1758 def test_local_good_hostname(self): 1759 # The (valid) cert validates the HTTP hostname 1760 import ssl 1761 server = self.make_server(CERT_localhost) 1762 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1763 context.load_verify_locations(CERT_localhost) 1764 h = client.HTTPSConnection('localhost', server.port, context=context) 1765 self.addCleanup(h.close) 1766 h.request('GET', '/nonexistent') 1767 resp = h.getresponse() 1768 self.addCleanup(resp.close) 1769 self.assertEqual(resp.status, 404) 1770 1771 def test_local_bad_hostname(self): 1772 # The (valid) cert doesn't validate the HTTP hostname 1773 import ssl 1774 server = self.make_server(CERT_fakehostname) 1775 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1776 context.load_verify_locations(CERT_fakehostname) 1777 h = client.HTTPSConnection('localhost', server.port, context=context) 1778 with self.assertRaises(ssl.CertificateError): 1779 h.request('GET', '/') 1780 # Same with explicit check_hostname=True 1781 with support.check_warnings(('', DeprecationWarning)): 1782 h = client.HTTPSConnection('localhost', server.port, 1783 context=context, check_hostname=True) 1784 with self.assertRaises(ssl.CertificateError): 1785 h.request('GET', '/') 1786 # With check_hostname=False, the mismatching is ignored 1787 context.check_hostname = False 1788 with support.check_warnings(('', DeprecationWarning)): 1789 h = client.HTTPSConnection('localhost', server.port, 1790 context=context, check_hostname=False) 1791 h.request('GET', '/nonexistent') 1792 resp = h.getresponse() 1793 resp.close() 1794 h.close() 1795 self.assertEqual(resp.status, 404) 1796 # The context's check_hostname setting is used if one isn't passed to 1797 # HTTPSConnection. 1798 context.check_hostname = False 1799 h = client.HTTPSConnection('localhost', server.port, context=context) 1800 h.request('GET', '/nonexistent') 1801 resp = h.getresponse() 1802 self.assertEqual(resp.status, 404) 1803 resp.close() 1804 h.close() 1805 # Passing check_hostname to HTTPSConnection should override the 1806 # context's setting. 1807 with support.check_warnings(('', DeprecationWarning)): 1808 h = client.HTTPSConnection('localhost', server.port, 1809 context=context, check_hostname=True) 1810 with self.assertRaises(ssl.CertificateError): 1811 h.request('GET', '/') 1812 1813 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1814 'http.client.HTTPSConnection not available') 1815 def test_host_port(self): 1816 # Check invalid host_port 1817 1818 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1819 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1820 1821 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1822 "fe80::207:e9ff:fe9b", 8000), 1823 ("www.python.org:443", "www.python.org", 443), 1824 ("www.python.org:", "www.python.org", 443), 1825 ("www.python.org", "www.python.org", 443), 1826 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1827 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1828 443)): 1829 c = client.HTTPSConnection(hp) 1830 self.assertEqual(h, c.host) 1831 self.assertEqual(p, c.port) 1832 1833 def test_tls13_pha(self): 1834 import ssl 1835 if not ssl.HAS_TLSv1_3: 1836 self.skipTest('TLS 1.3 support required') 1837 # just check status of PHA flag 1838 h = client.HTTPSConnection('localhost', 443) 1839 self.assertTrue(h._context.post_handshake_auth) 1840 1841 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1842 self.assertFalse(context.post_handshake_auth) 1843 h = client.HTTPSConnection('localhost', 443, context=context) 1844 self.assertIs(h._context, context) 1845 self.assertFalse(h._context.post_handshake_auth) 1846 1847 with warnings.catch_warnings(): 1848 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1849 DeprecationWarning) 1850 h = client.HTTPSConnection('localhost', 443, context=context, 1851 cert_file=CERT_localhost) 1852 self.assertTrue(h._context.post_handshake_auth) 1853 1854 1855class RequestBodyTest(TestCase): 1856 """Test cases where a request includes a message body.""" 1857 1858 def setUp(self): 1859 self.conn = client.HTTPConnection('example.com') 1860 self.conn.sock = self.sock = FakeSocket("") 1861 self.conn.sock = self.sock 1862 1863 def get_headers_and_fp(self): 1864 f = io.BytesIO(self.sock.data) 1865 f.readline() # read the request line 1866 message = client.parse_headers(f) 1867 return message, f 1868 1869 def test_list_body(self): 1870 # Note that no content-length is automatically calculated for 1871 # an iterable. The request will fall back to send chunked 1872 # transfer encoding. 1873 cases = ( 1874 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1875 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1876 ) 1877 for body, expected in cases: 1878 with self.subTest(body): 1879 self.conn = client.HTTPConnection('example.com') 1880 self.conn.sock = self.sock = FakeSocket('') 1881 1882 self.conn.request('PUT', '/url', body) 1883 msg, f = self.get_headers_and_fp() 1884 self.assertNotIn('Content-Type', msg) 1885 self.assertNotIn('Content-Length', msg) 1886 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1887 self.assertEqual(expected, f.read()) 1888 1889 def test_manual_content_length(self): 1890 # Set an incorrect content-length so that we can verify that 1891 # it will not be over-ridden by the library. 1892 self.conn.request("PUT", "/url", "body", 1893 {"Content-Length": "42"}) 1894 message, f = self.get_headers_and_fp() 1895 self.assertEqual("42", message.get("content-length")) 1896 self.assertEqual(4, len(f.read())) 1897 1898 def test_ascii_body(self): 1899 self.conn.request("PUT", "/url", "body") 1900 message, f = self.get_headers_and_fp() 1901 self.assertEqual("text/plain", message.get_content_type()) 1902 self.assertIsNone(message.get_charset()) 1903 self.assertEqual("4", message.get("content-length")) 1904 self.assertEqual(b'body', f.read()) 1905 1906 def test_latin1_body(self): 1907 self.conn.request("PUT", "/url", "body\xc1") 1908 message, f = self.get_headers_and_fp() 1909 self.assertEqual("text/plain", message.get_content_type()) 1910 self.assertIsNone(message.get_charset()) 1911 self.assertEqual("5", message.get("content-length")) 1912 self.assertEqual(b'body\xc1', f.read()) 1913 1914 def test_bytes_body(self): 1915 self.conn.request("PUT", "/url", b"body\xc1") 1916 message, f = self.get_headers_and_fp() 1917 self.assertEqual("text/plain", message.get_content_type()) 1918 self.assertIsNone(message.get_charset()) 1919 self.assertEqual("5", message.get("content-length")) 1920 self.assertEqual(b'body\xc1', f.read()) 1921 1922 def test_text_file_body(self): 1923 self.addCleanup(support.unlink, support.TESTFN) 1924 with open(support.TESTFN, "w") as f: 1925 f.write("body") 1926 with open(support.TESTFN) as f: 1927 self.conn.request("PUT", "/url", f) 1928 message, f = self.get_headers_and_fp() 1929 self.assertEqual("text/plain", message.get_content_type()) 1930 self.assertIsNone(message.get_charset()) 1931 # No content-length will be determined for files; the body 1932 # will be sent using chunked transfer encoding instead. 1933 self.assertIsNone(message.get("content-length")) 1934 self.assertEqual("chunked", message.get("transfer-encoding")) 1935 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1936 1937 def test_binary_file_body(self): 1938 self.addCleanup(support.unlink, support.TESTFN) 1939 with open(support.TESTFN, "wb") as f: 1940 f.write(b"body\xc1") 1941 with open(support.TESTFN, "rb") as f: 1942 self.conn.request("PUT", "/url", f) 1943 message, f = self.get_headers_and_fp() 1944 self.assertEqual("text/plain", message.get_content_type()) 1945 self.assertIsNone(message.get_charset()) 1946 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1947 self.assertNotIn("Content-Length", message) 1948 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1949 1950 1951class HTTPResponseTest(TestCase): 1952 1953 def setUp(self): 1954 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1955 second-value\r\n\r\nText" 1956 sock = FakeSocket(body) 1957 self.resp = client.HTTPResponse(sock) 1958 self.resp.begin() 1959 1960 def test_getting_header(self): 1961 header = self.resp.getheader('My-Header') 1962 self.assertEqual(header, 'first-value, second-value') 1963 1964 header = self.resp.getheader('My-Header', 'some default') 1965 self.assertEqual(header, 'first-value, second-value') 1966 1967 def test_getting_nonexistent_header_with_string_default(self): 1968 header = self.resp.getheader('No-Such-Header', 'default-value') 1969 self.assertEqual(header, 'default-value') 1970 1971 def test_getting_nonexistent_header_with_iterable_default(self): 1972 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1973 self.assertEqual(header, 'default, values') 1974 1975 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1976 self.assertEqual(header, 'default, values') 1977 1978 def test_getting_nonexistent_header_without_default(self): 1979 header = self.resp.getheader('No-Such-Header') 1980 self.assertEqual(header, None) 1981 1982 def test_getting_header_defaultint(self): 1983 header = self.resp.getheader('No-Such-Header',default=42) 1984 self.assertEqual(header, 42) 1985 1986class TunnelTests(TestCase): 1987 def setUp(self): 1988 response_text = ( 1989 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1990 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1991 'Content-Length: 42\r\n\r\n' 1992 ) 1993 self.host = 'proxy.com' 1994 self.conn = client.HTTPConnection(self.host) 1995 self.conn._create_connection = self._create_connection(response_text) 1996 1997 def tearDown(self): 1998 self.conn.close() 1999 2000 def _create_connection(self, response_text): 2001 def create_connection(address, timeout=None, source_address=None): 2002 return FakeSocket(response_text, host=address[0], port=address[1]) 2003 return create_connection 2004 2005 def test_set_tunnel_host_port_headers(self): 2006 tunnel_host = 'destination.com' 2007 tunnel_port = 8888 2008 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 2009 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 2010 headers=tunnel_headers) 2011 self.conn.request('HEAD', '/', '') 2012 self.assertEqual(self.conn.sock.host, self.host) 2013 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2014 self.assertEqual(self.conn._tunnel_host, tunnel_host) 2015 self.assertEqual(self.conn._tunnel_port, tunnel_port) 2016 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 2017 2018 def test_disallow_set_tunnel_after_connect(self): 2019 # Once connected, we shouldn't be able to tunnel anymore 2020 self.conn.connect() 2021 self.assertRaises(RuntimeError, self.conn.set_tunnel, 2022 'destination.com') 2023 2024 def test_connect_with_tunnel(self): 2025 self.conn.set_tunnel('destination.com') 2026 self.conn.request('HEAD', '/', '') 2027 self.assertEqual(self.conn.sock.host, self.host) 2028 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2029 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2030 # issue22095 2031 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 2032 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2033 2034 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 2035 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 2036 2037 def test_connect_put_request(self): 2038 self.conn.set_tunnel('destination.com') 2039 self.conn.request('PUT', '/', '') 2040 self.assertEqual(self.conn.sock.host, self.host) 2041 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2042 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2043 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2044 2045 def test_tunnel_debuglog(self): 2046 expected_header = 'X-Dummy: 1' 2047 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 2048 2049 self.conn.set_debuglevel(1) 2050 self.conn._create_connection = self._create_connection(response_text) 2051 self.conn.set_tunnel('destination.com') 2052 2053 with support.captured_stdout() as output: 2054 self.conn.request('PUT', '/', '') 2055 lines = output.getvalue().splitlines() 2056 self.assertIn('header: {}'.format(expected_header), lines) 2057 2058 2059if __name__ == '__main__': 2060 unittest.main(verbosity=2) 2061