1import errno 2from http import client 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13TestCase = unittest.TestCase 14 15from test import support 16 17here = os.path.dirname(__file__) 18# Self-signed cert file for 'localhost' 19CERT_localhost = os.path.join(here, 'keycert.pem') 20# Self-signed cert file for 'fakehostname' 21CERT_fakehostname = os.path.join(here, 'keycert2.pem') 22# Self-signed cert file for self-signed.pythontest.net 23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 24 25# constants for testing chunked encoding 26chunked_start = ( 27 'HTTP/1.1 200 OK\r\n' 28 'Transfer-Encoding: chunked\r\n\r\n' 29 'a\r\n' 30 'hello worl\r\n' 31 '3\r\n' 32 'd! \r\n' 33 '8\r\n' 34 'and now \r\n' 35 '22\r\n' 36 'for something completely different\r\n' 37) 38chunked_expected = b'hello world! and now for something completely different' 39chunk_extension = ";foo=bar" 40last_chunk = "0\r\n" 41last_chunk_extended = "0" + chunk_extension + "\r\n" 42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 43chunked_end = "\r\n" 44 45HOST = support.HOST 46 47class FakeSocket: 48 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 49 if isinstance(text, str): 50 text = text.encode("ascii") 51 self.text = text 52 self.fileclass = fileclass 53 self.data = b'' 54 self.sendall_calls = 0 55 self.file_closed = False 56 self.host = host 57 self.port = port 58 59 def sendall(self, data): 60 self.sendall_calls += 1 61 self.data += data 62 63 def makefile(self, mode, bufsize=None): 64 if mode != 'r' and mode != 'rb': 65 raise client.UnimplementedFileMode() 66 # keep the file around so we can check how much was read from it 67 self.file = self.fileclass(self.text) 68 self.file.close = self.file_close #nerf close () 69 return self.file 70 71 def file_close(self): 72 self.file_closed = True 73 74 def close(self): 75 pass 76 77 def setsockopt(self, level, optname, value): 78 pass 79 80class EPipeSocket(FakeSocket): 81 82 def __init__(self, text, pipe_trigger): 83 # When sendall() is called with pipe_trigger, raise EPIPE. 84 FakeSocket.__init__(self, text) 85 self.pipe_trigger = pipe_trigger 86 87 def sendall(self, data): 88 if self.pipe_trigger in data: 89 raise OSError(errno.EPIPE, "gotcha") 90 self.data += data 91 92 def close(self): 93 pass 94 95class NoEOFBytesIO(io.BytesIO): 96 """Like BytesIO, but raises AssertionError on EOF. 97 98 This is used below to test that http.client doesn't try to read 99 more from the underlying file than it should. 100 """ 101 def read(self, n=-1): 102 data = io.BytesIO.read(self, n) 103 if data == b'': 104 raise AssertionError('caller tried to read past EOF') 105 return data 106 107 def readline(self, length=None): 108 data = io.BytesIO.readline(self, length) 109 if data == b'': 110 raise AssertionError('caller tried to read past EOF') 111 return data 112 113class FakeSocketHTTPConnection(client.HTTPConnection): 114 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 115 116 def __init__(self, *args): 117 self.connections = 0 118 super().__init__('example.com') 119 self.fake_socket_args = args 120 self._create_connection = self.create_connection 121 122 def connect(self): 123 """Count the number of times connect() is invoked""" 124 self.connections += 1 125 return super().connect() 126 127 def create_connection(self, *pos, **kw): 128 return FakeSocket(*self.fake_socket_args) 129 130class HeaderTests(TestCase): 131 def test_auto_headers(self): 132 # Some headers are added automatically, but should not be added by 133 # .request() if they are explicitly set. 134 135 class HeaderCountingBuffer(list): 136 def __init__(self): 137 self.count = {} 138 def append(self, item): 139 kv = item.split(b':') 140 if len(kv) > 1: 141 # item is a 'Key: Value' header string 142 lcKey = kv[0].decode('ascii').lower() 143 self.count.setdefault(lcKey, 0) 144 self.count[lcKey] += 1 145 list.append(self, item) 146 147 for explicit_header in True, False: 148 for header in 'Content-length', 'Host', 'Accept-encoding': 149 conn = client.HTTPConnection('example.com') 150 conn.sock = FakeSocket('blahblahblah') 151 conn._buffer = HeaderCountingBuffer() 152 153 body = 'spamspamspam' 154 headers = {} 155 if explicit_header: 156 headers[header] = str(len(body)) 157 conn.request('POST', '/', body, headers) 158 self.assertEqual(conn._buffer.count[header.lower()], 1) 159 160 def test_content_length_0(self): 161 162 class ContentLengthChecker(list): 163 def __init__(self): 164 list.__init__(self) 165 self.content_length = None 166 def append(self, item): 167 kv = item.split(b':', 1) 168 if len(kv) > 1 and kv[0].lower() == b'content-length': 169 self.content_length = kv[1].strip() 170 list.append(self, item) 171 172 # Here, we're testing that methods expecting a body get a 173 # content-length set to zero if the body is empty (either None or '') 174 bodies = (None, '') 175 methods_with_body = ('PUT', 'POST', 'PATCH') 176 for method, body in itertools.product(methods_with_body, bodies): 177 conn = client.HTTPConnection('example.com') 178 conn.sock = FakeSocket(None) 179 conn._buffer = ContentLengthChecker() 180 conn.request(method, '/', body) 181 self.assertEqual( 182 conn._buffer.content_length, b'0', 183 'Header Content-Length incorrect on {}'.format(method) 184 ) 185 186 # For these methods, we make sure that content-length is not set when 187 # the body is None because it might cause unexpected behaviour on the 188 # server. 189 methods_without_body = ( 190 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 191 ) 192 for method in methods_without_body: 193 conn = client.HTTPConnection('example.com') 194 conn.sock = FakeSocket(None) 195 conn._buffer = ContentLengthChecker() 196 conn.request(method, '/', None) 197 self.assertEqual( 198 conn._buffer.content_length, None, 199 'Header Content-Length set for empty body on {}'.format(method) 200 ) 201 202 # If the body is set to '', that's considered to be "present but 203 # empty" rather than "missing", so content length would be set, even 204 # for methods that don't expect a body. 205 for method in methods_without_body: 206 conn = client.HTTPConnection('example.com') 207 conn.sock = FakeSocket(None) 208 conn._buffer = ContentLengthChecker() 209 conn.request(method, '/', '') 210 self.assertEqual( 211 conn._buffer.content_length, b'0', 212 'Header Content-Length incorrect on {}'.format(method) 213 ) 214 215 # If the body is set, make sure Content-Length is set. 216 for method in itertools.chain(methods_without_body, methods_with_body): 217 conn = client.HTTPConnection('example.com') 218 conn.sock = FakeSocket(None) 219 conn._buffer = ContentLengthChecker() 220 conn.request(method, '/', ' ') 221 self.assertEqual( 222 conn._buffer.content_length, b'1', 223 'Header Content-Length incorrect on {}'.format(method) 224 ) 225 226 def test_putheader(self): 227 conn = client.HTTPConnection('example.com') 228 conn.sock = FakeSocket(None) 229 conn.putrequest('GET','/') 230 conn.putheader('Content-length', 42) 231 self.assertIn(b'Content-length: 42', conn._buffer) 232 233 conn.putheader('Foo', ' bar ') 234 self.assertIn(b'Foo: bar ', conn._buffer) 235 conn.putheader('Bar', '\tbaz\t') 236 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 237 conn.putheader('Authorization', 'Bearer mytoken') 238 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 239 conn.putheader('IterHeader', 'IterA', 'IterB') 240 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 241 conn.putheader('LatinHeader', b'\xFF') 242 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 243 conn.putheader('Utf8Header', b'\xc3\x80') 244 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 245 conn.putheader('C1-Control', b'next\x85line') 246 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 247 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 248 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 249 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 250 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 251 conn.putheader('Key Space', 'value') 252 self.assertIn(b'Key Space: value', conn._buffer) 253 conn.putheader('KeySpace ', 'value') 254 self.assertIn(b'KeySpace : value', conn._buffer) 255 conn.putheader(b'Nonbreak\xa0Space', 'value') 256 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 257 conn.putheader(b'\xa0NonbreakSpace', 'value') 258 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 259 260 def test_ipv6host_header(self): 261 # Default host header on IPv6 transaction should be wrapped by [] if 262 # it is an IPv6 address 263 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 264 b'Accept-Encoding: identity\r\n\r\n' 265 conn = client.HTTPConnection('[2001::]:81') 266 sock = FakeSocket('') 267 conn.sock = sock 268 conn.request('GET', '/foo') 269 self.assertTrue(sock.data.startswith(expected)) 270 271 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 272 b'Accept-Encoding: identity\r\n\r\n' 273 conn = client.HTTPConnection('[2001:102A::]') 274 sock = FakeSocket('') 275 conn.sock = sock 276 conn.request('GET', '/foo') 277 self.assertTrue(sock.data.startswith(expected)) 278 279 def test_malformed_headers_coped_with(self): 280 # Issue 19996 281 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 282 sock = FakeSocket(body) 283 resp = client.HTTPResponse(sock) 284 resp.begin() 285 286 self.assertEqual(resp.getheader('First'), 'val') 287 self.assertEqual(resp.getheader('Second'), 'val') 288 289 def test_parse_all_octets(self): 290 # Ensure no valid header field octet breaks the parser 291 body = ( 292 b'HTTP/1.1 200 OK\r\n' 293 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 294 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 295 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 296 b'obs-fold: text\r\n' 297 b' folded with space\r\n' 298 b'\tfolded with tab\r\n' 299 b'Content-Length: 0\r\n' 300 b'\r\n' 301 ) 302 sock = FakeSocket(body) 303 resp = client.HTTPResponse(sock) 304 resp.begin() 305 self.assertEqual(resp.getheader('Content-Length'), '0') 306 self.assertEqual(resp.msg['Content-Length'], '0') 307 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 308 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 309 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 310 self.assertEqual(resp.getheader('VCHAR'), vchar) 311 self.assertEqual(resp.msg['VCHAR'], vchar) 312 self.assertIsNotNone(resp.getheader('obs-text')) 313 self.assertIn('obs-text', resp.msg) 314 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 315 self.assertTrue(folded.startswith('text')) 316 self.assertIn(' folded with space', folded) 317 self.assertTrue(folded.endswith('folded with tab')) 318 319 def test_invalid_headers(self): 320 conn = client.HTTPConnection('example.com') 321 conn.sock = FakeSocket('') 322 conn.putrequest('GET', '/') 323 324 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 325 # longer allowed in header names 326 cases = ( 327 (b'Invalid\r\nName', b'ValidValue'), 328 (b'Invalid\rName', b'ValidValue'), 329 (b'Invalid\nName', b'ValidValue'), 330 (b'\r\nInvalidName', b'ValidValue'), 331 (b'\rInvalidName', b'ValidValue'), 332 (b'\nInvalidName', b'ValidValue'), 333 (b' InvalidName', b'ValidValue'), 334 (b'\tInvalidName', b'ValidValue'), 335 (b'Invalid:Name', b'ValidValue'), 336 (b':InvalidName', b'ValidValue'), 337 (b'ValidName', b'Invalid\r\nValue'), 338 (b'ValidName', b'Invalid\rValue'), 339 (b'ValidName', b'Invalid\nValue'), 340 (b'ValidName', b'InvalidValue\r\n'), 341 (b'ValidName', b'InvalidValue\r'), 342 (b'ValidName', b'InvalidValue\n'), 343 ) 344 for name, value in cases: 345 with self.subTest((name, value)): 346 with self.assertRaisesRegex(ValueError, 'Invalid header'): 347 conn.putheader(name, value) 348 349 def test_headers_debuglevel(self): 350 body = ( 351 b'HTTP/1.1 200 OK\r\n' 352 b'First: val\r\n' 353 b'Second: val1\r\n' 354 b'Second: val2\r\n' 355 ) 356 sock = FakeSocket(body) 357 resp = client.HTTPResponse(sock, debuglevel=1) 358 with support.captured_stdout() as output: 359 resp.begin() 360 lines = output.getvalue().splitlines() 361 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 362 self.assertEqual(lines[1], "header: First: val") 363 self.assertEqual(lines[2], "header: Second: val1") 364 self.assertEqual(lines[3], "header: Second: val2") 365 366 367class HttpMethodTests(TestCase): 368 def test_invalid_method_names(self): 369 methods = ( 370 'GET\r', 371 'POST\n', 372 'PUT\n\r', 373 'POST\nValue', 374 'POST\nHOST:abc', 375 'GET\nrHost:abc\n', 376 'POST\rRemainder:\r', 377 'GET\rHOST:\n', 378 '\nPUT' 379 ) 380 381 for method in methods: 382 with self.assertRaisesRegex( 383 ValueError, "method can't contain control characters"): 384 conn = client.HTTPConnection('example.com') 385 conn.sock = FakeSocket(None) 386 conn.request(method=method, url="/") 387 388 389class TransferEncodingTest(TestCase): 390 expected_body = b"It's just a flesh wound" 391 392 def test_endheaders_chunked(self): 393 conn = client.HTTPConnection('example.com') 394 conn.sock = FakeSocket(b'') 395 conn.putrequest('POST', '/') 396 conn.endheaders(self._make_body(), encode_chunked=True) 397 398 _, _, body = self._parse_request(conn.sock.data) 399 body = self._parse_chunked(body) 400 self.assertEqual(body, self.expected_body) 401 402 def test_explicit_headers(self): 403 # explicit chunked 404 conn = client.HTTPConnection('example.com') 405 conn.sock = FakeSocket(b'') 406 # this shouldn't actually be automatically chunk-encoded because the 407 # calling code has explicitly stated that it's taking care of it 408 conn.request( 409 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 410 411 _, headers, body = self._parse_request(conn.sock.data) 412 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 413 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 414 self.assertEqual(body, self.expected_body) 415 416 # explicit chunked, string body 417 conn = client.HTTPConnection('example.com') 418 conn.sock = FakeSocket(b'') 419 conn.request( 420 'POST', '/', self.expected_body.decode('latin-1'), 421 {'Transfer-Encoding': 'chunked'}) 422 423 _, headers, body = self._parse_request(conn.sock.data) 424 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 425 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 426 self.assertEqual(body, self.expected_body) 427 428 # User-specified TE, but request() does the chunk encoding 429 conn = client.HTTPConnection('example.com') 430 conn.sock = FakeSocket(b'') 431 conn.request('POST', '/', 432 headers={'Transfer-Encoding': 'gzip, chunked'}, 433 encode_chunked=True, 434 body=self._make_body()) 435 _, headers, body = self._parse_request(conn.sock.data) 436 self.assertNotIn('content-length', [k.lower() for k in headers]) 437 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 438 self.assertEqual(self._parse_chunked(body), self.expected_body) 439 440 def test_request(self): 441 for empty_lines in (False, True,): 442 conn = client.HTTPConnection('example.com') 443 conn.sock = FakeSocket(b'') 444 conn.request( 445 'POST', '/', self._make_body(empty_lines=empty_lines)) 446 447 _, headers, body = self._parse_request(conn.sock.data) 448 body = self._parse_chunked(body) 449 self.assertEqual(body, self.expected_body) 450 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 451 452 # Content-Length and Transfer-Encoding SHOULD not be sent in the 453 # same request 454 self.assertNotIn('content-length', [k.lower() for k in headers]) 455 456 def test_empty_body(self): 457 # Zero-length iterable should be treated like any other iterable 458 conn = client.HTTPConnection('example.com') 459 conn.sock = FakeSocket(b'') 460 conn.request('POST', '/', ()) 461 _, headers, body = self._parse_request(conn.sock.data) 462 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 463 self.assertNotIn('content-length', [k.lower() for k in headers]) 464 self.assertEqual(body, b"0\r\n\r\n") 465 466 def _make_body(self, empty_lines=False): 467 lines = self.expected_body.split(b' ') 468 for idx, line in enumerate(lines): 469 # for testing handling empty lines 470 if empty_lines and idx % 2: 471 yield b'' 472 if idx < len(lines) - 1: 473 yield line + b' ' 474 else: 475 yield line 476 477 def _parse_request(self, data): 478 lines = data.split(b'\r\n') 479 request = lines[0] 480 headers = {} 481 n = 1 482 while n < len(lines) and len(lines[n]) > 0: 483 key, val = lines[n].split(b':') 484 key = key.decode('latin-1').strip() 485 headers[key] = val.decode('latin-1').strip() 486 n += 1 487 488 return request, headers, b'\r\n'.join(lines[n + 1:]) 489 490 def _parse_chunked(self, data): 491 body = [] 492 trailers = {} 493 n = 0 494 lines = data.split(b'\r\n') 495 # parse body 496 while True: 497 size, chunk = lines[n:n+2] 498 size = int(size, 16) 499 500 if size == 0: 501 n += 1 502 break 503 504 self.assertEqual(size, len(chunk)) 505 body.append(chunk) 506 507 n += 2 508 # we /should/ hit the end chunk, but check against the size of 509 # lines so we're not stuck in an infinite loop should we get 510 # malformed data 511 if n > len(lines): 512 break 513 514 return b''.join(body) 515 516 517class BasicTest(TestCase): 518 def test_status_lines(self): 519 # Test HTTP status lines 520 521 body = "HTTP/1.1 200 Ok\r\n\r\nText" 522 sock = FakeSocket(body) 523 resp = client.HTTPResponse(sock) 524 resp.begin() 525 self.assertEqual(resp.read(0), b'') # Issue #20007 526 self.assertFalse(resp.isclosed()) 527 self.assertFalse(resp.closed) 528 self.assertEqual(resp.read(), b"Text") 529 self.assertTrue(resp.isclosed()) 530 self.assertFalse(resp.closed) 531 resp.close() 532 self.assertTrue(resp.closed) 533 534 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 535 sock = FakeSocket(body) 536 resp = client.HTTPResponse(sock) 537 self.assertRaises(client.BadStatusLine, resp.begin) 538 539 def test_bad_status_repr(self): 540 exc = client.BadStatusLine('') 541 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 542 543 def test_partial_reads(self): 544 # if we have Content-Length, HTTPResponse knows when to close itself, 545 # the same behaviour as when we read the whole thing with read() 546 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 547 sock = FakeSocket(body) 548 resp = client.HTTPResponse(sock) 549 resp.begin() 550 self.assertEqual(resp.read(2), b'Te') 551 self.assertFalse(resp.isclosed()) 552 self.assertEqual(resp.read(2), b'xt') 553 self.assertTrue(resp.isclosed()) 554 self.assertFalse(resp.closed) 555 resp.close() 556 self.assertTrue(resp.closed) 557 558 def test_mixed_reads(self): 559 # readline() should update the remaining length, so that read() knows 560 # how much data is left and does not raise IncompleteRead 561 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 562 sock = FakeSocket(body) 563 resp = client.HTTPResponse(sock) 564 resp.begin() 565 self.assertEqual(resp.readline(), b'Text\r\n') 566 self.assertFalse(resp.isclosed()) 567 self.assertEqual(resp.read(), b'Another') 568 self.assertTrue(resp.isclosed()) 569 self.assertFalse(resp.closed) 570 resp.close() 571 self.assertTrue(resp.closed) 572 573 def test_partial_readintos(self): 574 # if we have Content-Length, HTTPResponse knows when to close itself, 575 # the same behaviour as when we read the whole thing with read() 576 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 577 sock = FakeSocket(body) 578 resp = client.HTTPResponse(sock) 579 resp.begin() 580 b = bytearray(2) 581 n = resp.readinto(b) 582 self.assertEqual(n, 2) 583 self.assertEqual(bytes(b), b'Te') 584 self.assertFalse(resp.isclosed()) 585 n = resp.readinto(b) 586 self.assertEqual(n, 2) 587 self.assertEqual(bytes(b), b'xt') 588 self.assertTrue(resp.isclosed()) 589 self.assertFalse(resp.closed) 590 resp.close() 591 self.assertTrue(resp.closed) 592 593 def test_partial_reads_no_content_length(self): 594 # when no length is present, the socket should be gracefully closed when 595 # all data was read 596 body = "HTTP/1.1 200 Ok\r\n\r\nText" 597 sock = FakeSocket(body) 598 resp = client.HTTPResponse(sock) 599 resp.begin() 600 self.assertEqual(resp.read(2), b'Te') 601 self.assertFalse(resp.isclosed()) 602 self.assertEqual(resp.read(2), b'xt') 603 self.assertEqual(resp.read(1), b'') 604 self.assertTrue(resp.isclosed()) 605 self.assertFalse(resp.closed) 606 resp.close() 607 self.assertTrue(resp.closed) 608 609 def test_partial_readintos_no_content_length(self): 610 # when no length is present, the socket should be gracefully closed when 611 # all data was read 612 body = "HTTP/1.1 200 Ok\r\n\r\nText" 613 sock = FakeSocket(body) 614 resp = client.HTTPResponse(sock) 615 resp.begin() 616 b = bytearray(2) 617 n = resp.readinto(b) 618 self.assertEqual(n, 2) 619 self.assertEqual(bytes(b), b'Te') 620 self.assertFalse(resp.isclosed()) 621 n = resp.readinto(b) 622 self.assertEqual(n, 2) 623 self.assertEqual(bytes(b), b'xt') 624 n = resp.readinto(b) 625 self.assertEqual(n, 0) 626 self.assertTrue(resp.isclosed()) 627 628 def test_partial_reads_incomplete_body(self): 629 # if the server shuts down the connection before the whole 630 # content-length is delivered, the socket is gracefully closed 631 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 632 sock = FakeSocket(body) 633 resp = client.HTTPResponse(sock) 634 resp.begin() 635 self.assertEqual(resp.read(2), b'Te') 636 self.assertFalse(resp.isclosed()) 637 self.assertEqual(resp.read(2), b'xt') 638 self.assertEqual(resp.read(1), b'') 639 self.assertTrue(resp.isclosed()) 640 641 def test_partial_readintos_incomplete_body(self): 642 # if the server shuts down the connection before the whole 643 # content-length is delivered, the socket is gracefully closed 644 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 645 sock = FakeSocket(body) 646 resp = client.HTTPResponse(sock) 647 resp.begin() 648 b = bytearray(2) 649 n = resp.readinto(b) 650 self.assertEqual(n, 2) 651 self.assertEqual(bytes(b), b'Te') 652 self.assertFalse(resp.isclosed()) 653 n = resp.readinto(b) 654 self.assertEqual(n, 2) 655 self.assertEqual(bytes(b), b'xt') 656 n = resp.readinto(b) 657 self.assertEqual(n, 0) 658 self.assertTrue(resp.isclosed()) 659 self.assertFalse(resp.closed) 660 resp.close() 661 self.assertTrue(resp.closed) 662 663 def test_host_port(self): 664 # Check invalid host_port 665 666 for hp in ("www.python.org:abc", "user:password@www.python.org"): 667 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 668 669 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 670 "fe80::207:e9ff:fe9b", 8000), 671 ("www.python.org:80", "www.python.org", 80), 672 ("www.python.org:", "www.python.org", 80), 673 ("www.python.org", "www.python.org", 80), 674 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 675 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 676 c = client.HTTPConnection(hp) 677 self.assertEqual(h, c.host) 678 self.assertEqual(p, c.port) 679 680 def test_response_headers(self): 681 # test response with multiple message headers with the same field name. 682 text = ('HTTP/1.1 200 OK\r\n' 683 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 684 'Version="1"; Path="/acme"\r\n' 685 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 686 ' Path="/acme"\r\n' 687 '\r\n' 688 'No body\r\n') 689 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 690 ', ' 691 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 692 s = FakeSocket(text) 693 r = client.HTTPResponse(s) 694 r.begin() 695 cookies = r.getheader("Set-Cookie") 696 self.assertEqual(cookies, hdr) 697 698 def test_read_head(self): 699 # Test that the library doesn't attempt to read any data 700 # from a HEAD request. (Tickles SF bug #622042.) 701 sock = FakeSocket( 702 'HTTP/1.1 200 OK\r\n' 703 'Content-Length: 14432\r\n' 704 '\r\n', 705 NoEOFBytesIO) 706 resp = client.HTTPResponse(sock, method="HEAD") 707 resp.begin() 708 if resp.read(): 709 self.fail("Did not expect response from HEAD request") 710 711 def test_readinto_head(self): 712 # Test that the library doesn't attempt to read any data 713 # from a HEAD request. (Tickles SF bug #622042.) 714 sock = FakeSocket( 715 'HTTP/1.1 200 OK\r\n' 716 'Content-Length: 14432\r\n' 717 '\r\n', 718 NoEOFBytesIO) 719 resp = client.HTTPResponse(sock, method="HEAD") 720 resp.begin() 721 b = bytearray(5) 722 if resp.readinto(b) != 0: 723 self.fail("Did not expect response from HEAD request") 724 self.assertEqual(bytes(b), b'\x00'*5) 725 726 def test_too_many_headers(self): 727 headers = '\r\n'.join('Header%d: foo' % i 728 for i in range(client._MAXHEADERS + 1)) + '\r\n' 729 text = ('HTTP/1.1 200 OK\r\n' + headers) 730 s = FakeSocket(text) 731 r = client.HTTPResponse(s) 732 self.assertRaisesRegex(client.HTTPException, 733 r"got more than \d+ headers", r.begin) 734 735 def test_send_file(self): 736 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 737 b'Accept-Encoding: identity\r\n' 738 b'Transfer-Encoding: chunked\r\n' 739 b'\r\n') 740 741 with open(__file__, 'rb') as body: 742 conn = client.HTTPConnection('example.com') 743 sock = FakeSocket(body) 744 conn.sock = sock 745 conn.request('GET', '/foo', body) 746 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 747 (sock.data[:len(expected)], expected)) 748 749 def test_send(self): 750 expected = b'this is a test this is only a test' 751 conn = client.HTTPConnection('example.com') 752 sock = FakeSocket(None) 753 conn.sock = sock 754 conn.send(expected) 755 self.assertEqual(expected, sock.data) 756 sock.data = b'' 757 conn.send(array.array('b', expected)) 758 self.assertEqual(expected, sock.data) 759 sock.data = b'' 760 conn.send(io.BytesIO(expected)) 761 self.assertEqual(expected, sock.data) 762 763 def test_send_updating_file(self): 764 def data(): 765 yield 'data' 766 yield None 767 yield 'data_two' 768 769 class UpdatingFile(io.TextIOBase): 770 mode = 'r' 771 d = data() 772 def read(self, blocksize=-1): 773 return next(self.d) 774 775 expected = b'data' 776 777 conn = client.HTTPConnection('example.com') 778 sock = FakeSocket("") 779 conn.sock = sock 780 conn.send(UpdatingFile()) 781 self.assertEqual(sock.data, expected) 782 783 784 def test_send_iter(self): 785 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 786 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 787 b'\r\nonetwothree' 788 789 def body(): 790 yield b"one" 791 yield b"two" 792 yield b"three" 793 794 conn = client.HTTPConnection('example.com') 795 sock = FakeSocket("") 796 conn.sock = sock 797 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 798 self.assertEqual(sock.data, expected) 799 800 def test_blocksize_request(self): 801 """Check that request() respects the configured block size.""" 802 blocksize = 8 # For easy debugging. 803 conn = client.HTTPConnection('example.com', blocksize=blocksize) 804 sock = FakeSocket(None) 805 conn.sock = sock 806 expected = b"a" * blocksize + b"b" 807 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 808 self.assertEqual(sock.sendall_calls, 3) 809 body = sock.data.split(b"\r\n\r\n", 1)[1] 810 self.assertEqual(body, expected) 811 812 def test_blocksize_send(self): 813 """Check that send() respects the configured block size.""" 814 blocksize = 8 # For easy debugging. 815 conn = client.HTTPConnection('example.com', blocksize=blocksize) 816 sock = FakeSocket(None) 817 conn.sock = sock 818 expected = b"a" * blocksize + b"b" 819 conn.send(io.BytesIO(expected)) 820 self.assertEqual(sock.sendall_calls, 2) 821 self.assertEqual(sock.data, expected) 822 823 def test_send_type_error(self): 824 # See: Issue #12676 825 conn = client.HTTPConnection('example.com') 826 conn.sock = FakeSocket('') 827 with self.assertRaises(TypeError): 828 conn.request('POST', 'test', conn) 829 830 def test_chunked(self): 831 expected = chunked_expected 832 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 833 resp = client.HTTPResponse(sock, method="GET") 834 resp.begin() 835 self.assertEqual(resp.read(), expected) 836 resp.close() 837 838 # Various read sizes 839 for n in range(1, 12): 840 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 841 resp = client.HTTPResponse(sock, method="GET") 842 resp.begin() 843 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 844 resp.close() 845 846 for x in ('', 'foo\r\n'): 847 sock = FakeSocket(chunked_start + x) 848 resp = client.HTTPResponse(sock, method="GET") 849 resp.begin() 850 try: 851 resp.read() 852 except client.IncompleteRead as i: 853 self.assertEqual(i.partial, expected) 854 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 855 self.assertEqual(repr(i), expected_message) 856 self.assertEqual(str(i), expected_message) 857 else: 858 self.fail('IncompleteRead expected') 859 finally: 860 resp.close() 861 862 def test_readinto_chunked(self): 863 864 expected = chunked_expected 865 nexpected = len(expected) 866 b = bytearray(128) 867 868 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 869 resp = client.HTTPResponse(sock, method="GET") 870 resp.begin() 871 n = resp.readinto(b) 872 self.assertEqual(b[:nexpected], expected) 873 self.assertEqual(n, nexpected) 874 resp.close() 875 876 # Various read sizes 877 for n in range(1, 12): 878 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 879 resp = client.HTTPResponse(sock, method="GET") 880 resp.begin() 881 m = memoryview(b) 882 i = resp.readinto(m[0:n]) 883 i += resp.readinto(m[i:n + i]) 884 i += resp.readinto(m[i:]) 885 self.assertEqual(b[:nexpected], expected) 886 self.assertEqual(i, nexpected) 887 resp.close() 888 889 for x in ('', 'foo\r\n'): 890 sock = FakeSocket(chunked_start + x) 891 resp = client.HTTPResponse(sock, method="GET") 892 resp.begin() 893 try: 894 n = resp.readinto(b) 895 except client.IncompleteRead as i: 896 self.assertEqual(i.partial, expected) 897 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 898 self.assertEqual(repr(i), expected_message) 899 self.assertEqual(str(i), expected_message) 900 else: 901 self.fail('IncompleteRead expected') 902 finally: 903 resp.close() 904 905 def test_chunked_head(self): 906 chunked_start = ( 907 'HTTP/1.1 200 OK\r\n' 908 'Transfer-Encoding: chunked\r\n\r\n' 909 'a\r\n' 910 'hello world\r\n' 911 '1\r\n' 912 'd\r\n' 913 ) 914 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 915 resp = client.HTTPResponse(sock, method="HEAD") 916 resp.begin() 917 self.assertEqual(resp.read(), b'') 918 self.assertEqual(resp.status, 200) 919 self.assertEqual(resp.reason, 'OK') 920 self.assertTrue(resp.isclosed()) 921 self.assertFalse(resp.closed) 922 resp.close() 923 self.assertTrue(resp.closed) 924 925 def test_readinto_chunked_head(self): 926 chunked_start = ( 927 'HTTP/1.1 200 OK\r\n' 928 'Transfer-Encoding: chunked\r\n\r\n' 929 'a\r\n' 930 'hello world\r\n' 931 '1\r\n' 932 'd\r\n' 933 ) 934 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 935 resp = client.HTTPResponse(sock, method="HEAD") 936 resp.begin() 937 b = bytearray(5) 938 n = resp.readinto(b) 939 self.assertEqual(n, 0) 940 self.assertEqual(bytes(b), b'\x00'*5) 941 self.assertEqual(resp.status, 200) 942 self.assertEqual(resp.reason, 'OK') 943 self.assertTrue(resp.isclosed()) 944 self.assertFalse(resp.closed) 945 resp.close() 946 self.assertTrue(resp.closed) 947 948 def test_negative_content_length(self): 949 sock = FakeSocket( 950 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 951 resp = client.HTTPResponse(sock, method="GET") 952 resp.begin() 953 self.assertEqual(resp.read(), b'Hello\r\n') 954 self.assertTrue(resp.isclosed()) 955 956 def test_incomplete_read(self): 957 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 958 resp = client.HTTPResponse(sock, method="GET") 959 resp.begin() 960 try: 961 resp.read() 962 except client.IncompleteRead as i: 963 self.assertEqual(i.partial, b'Hello\r\n') 964 self.assertEqual(repr(i), 965 "IncompleteRead(7 bytes read, 3 more expected)") 966 self.assertEqual(str(i), 967 "IncompleteRead(7 bytes read, 3 more expected)") 968 self.assertTrue(resp.isclosed()) 969 else: 970 self.fail('IncompleteRead expected') 971 972 def test_epipe(self): 973 sock = EPipeSocket( 974 "HTTP/1.0 401 Authorization Required\r\n" 975 "Content-type: text/html\r\n" 976 "WWW-Authenticate: Basic realm=\"example\"\r\n", 977 b"Content-Length") 978 conn = client.HTTPConnection("example.com") 979 conn.sock = sock 980 self.assertRaises(OSError, 981 lambda: conn.request("PUT", "/url", "body")) 982 resp = conn.getresponse() 983 self.assertEqual(401, resp.status) 984 self.assertEqual("Basic realm=\"example\"", 985 resp.getheader("www-authenticate")) 986 987 # Test lines overflowing the max line size (_MAXLINE in http.client) 988 989 def test_overflowing_status_line(self): 990 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 991 resp = client.HTTPResponse(FakeSocket(body)) 992 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 993 994 def test_overflowing_header_line(self): 995 body = ( 996 'HTTP/1.1 200 OK\r\n' 997 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 998 ) 999 resp = client.HTTPResponse(FakeSocket(body)) 1000 self.assertRaises(client.LineTooLong, resp.begin) 1001 1002 def test_overflowing_header_limit_after_100(self): 1003 body = ( 1004 'HTTP/1.1 100 OK\r\n' 1005 'r\n' * 32768 1006 ) 1007 resp = client.HTTPResponse(FakeSocket(body)) 1008 with self.assertRaises(client.HTTPException) as cm: 1009 resp.begin() 1010 # We must assert more because other reasonable errors that we 1011 # do not want can also be HTTPException derived. 1012 self.assertIn('got more than ', str(cm.exception)) 1013 self.assertIn('headers', str(cm.exception)) 1014 1015 def test_overflowing_chunked_line(self): 1016 body = ( 1017 'HTTP/1.1 200 OK\r\n' 1018 'Transfer-Encoding: chunked\r\n\r\n' 1019 + '0' * 65536 + 'a\r\n' 1020 'hello world\r\n' 1021 '0\r\n' 1022 '\r\n' 1023 ) 1024 resp = client.HTTPResponse(FakeSocket(body)) 1025 resp.begin() 1026 self.assertRaises(client.LineTooLong, resp.read) 1027 1028 def test_early_eof(self): 1029 # Test httpresponse with no \r\n termination, 1030 body = "HTTP/1.1 200 Ok" 1031 sock = FakeSocket(body) 1032 resp = client.HTTPResponse(sock) 1033 resp.begin() 1034 self.assertEqual(resp.read(), b'') 1035 self.assertTrue(resp.isclosed()) 1036 self.assertFalse(resp.closed) 1037 resp.close() 1038 self.assertTrue(resp.closed) 1039 1040 def test_error_leak(self): 1041 # Test that the socket is not leaked if getresponse() fails 1042 conn = client.HTTPConnection('example.com') 1043 response = None 1044 class Response(client.HTTPResponse): 1045 def __init__(self, *pos, **kw): 1046 nonlocal response 1047 response = self # Avoid garbage collector closing the socket 1048 client.HTTPResponse.__init__(self, *pos, **kw) 1049 conn.response_class = Response 1050 conn.sock = FakeSocket('Invalid status line') 1051 conn.request('GET', '/') 1052 self.assertRaises(client.BadStatusLine, conn.getresponse) 1053 self.assertTrue(response.closed) 1054 self.assertTrue(conn.sock.file_closed) 1055 1056 def test_chunked_extension(self): 1057 extra = '3;foo=bar\r\n' + 'abc\r\n' 1058 expected = chunked_expected + b'abc' 1059 1060 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1061 resp = client.HTTPResponse(sock, method="GET") 1062 resp.begin() 1063 self.assertEqual(resp.read(), expected) 1064 resp.close() 1065 1066 def test_chunked_missing_end(self): 1067 """some servers may serve up a short chunked encoding stream""" 1068 expected = chunked_expected 1069 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1070 resp = client.HTTPResponse(sock, method="GET") 1071 resp.begin() 1072 self.assertEqual(resp.read(), expected) 1073 resp.close() 1074 1075 def test_chunked_trailers(self): 1076 """See that trailers are read and ignored""" 1077 expected = chunked_expected 1078 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1079 resp = client.HTTPResponse(sock, method="GET") 1080 resp.begin() 1081 self.assertEqual(resp.read(), expected) 1082 # we should have reached the end of the file 1083 self.assertEqual(sock.file.read(), b"") #we read to the end 1084 resp.close() 1085 1086 def test_chunked_sync(self): 1087 """Check that we don't read past the end of the chunked-encoding stream""" 1088 expected = chunked_expected 1089 extradata = "extradata" 1090 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1091 resp = client.HTTPResponse(sock, method="GET") 1092 resp.begin() 1093 self.assertEqual(resp.read(), expected) 1094 # the file should now have our extradata ready to be read 1095 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1096 resp.close() 1097 1098 def test_content_length_sync(self): 1099 """Check that we don't read past the end of the Content-Length stream""" 1100 extradata = b"extradata" 1101 expected = b"Hello123\r\n" 1102 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1103 resp = client.HTTPResponse(sock, method="GET") 1104 resp.begin() 1105 self.assertEqual(resp.read(), expected) 1106 # the file should now have our extradata ready to be read 1107 self.assertEqual(sock.file.read(), extradata) #we read to the end 1108 resp.close() 1109 1110 def test_readlines_content_length(self): 1111 extradata = b"extradata" 1112 expected = b"Hello123\r\n" 1113 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1114 resp = client.HTTPResponse(sock, method="GET") 1115 resp.begin() 1116 self.assertEqual(resp.readlines(2000), [expected]) 1117 # the file should now have our extradata ready to be read 1118 self.assertEqual(sock.file.read(), extradata) #we read to the end 1119 resp.close() 1120 1121 def test_read1_content_length(self): 1122 extradata = b"extradata" 1123 expected = b"Hello123\r\n" 1124 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1125 resp = client.HTTPResponse(sock, method="GET") 1126 resp.begin() 1127 self.assertEqual(resp.read1(2000), expected) 1128 # the file should now have our extradata ready to be read 1129 self.assertEqual(sock.file.read(), extradata) #we read to the end 1130 resp.close() 1131 1132 def test_readline_bound_content_length(self): 1133 extradata = b"extradata" 1134 expected = b"Hello123\r\n" 1135 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1136 resp = client.HTTPResponse(sock, method="GET") 1137 resp.begin() 1138 self.assertEqual(resp.readline(10), expected) 1139 self.assertEqual(resp.readline(10), b"") 1140 # the file should now have our extradata ready to be read 1141 self.assertEqual(sock.file.read(), extradata) #we read to the end 1142 resp.close() 1143 1144 def test_read1_bound_content_length(self): 1145 extradata = b"extradata" 1146 expected = b"Hello123\r\n" 1147 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1148 resp = client.HTTPResponse(sock, method="GET") 1149 resp.begin() 1150 self.assertEqual(resp.read1(20), expected*2) 1151 self.assertEqual(resp.read(), expected) 1152 # the file should now have our extradata ready to be read 1153 self.assertEqual(sock.file.read(), extradata) #we read to the end 1154 resp.close() 1155 1156 def test_response_fileno(self): 1157 # Make sure fd returned by fileno is valid. 1158 serv = socket.create_server((HOST, 0)) 1159 self.addCleanup(serv.close) 1160 1161 result = None 1162 def run_server(): 1163 [conn, address] = serv.accept() 1164 with conn, conn.makefile("rb") as reader: 1165 # Read the request header until a blank line 1166 while True: 1167 line = reader.readline() 1168 if not line.rstrip(b"\r\n"): 1169 break 1170 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1171 nonlocal result 1172 result = reader.read() 1173 1174 thread = threading.Thread(target=run_server) 1175 thread.start() 1176 self.addCleanup(thread.join, float(1)) 1177 conn = client.HTTPConnection(*serv.getsockname()) 1178 conn.request("CONNECT", "dummy:1234") 1179 response = conn.getresponse() 1180 try: 1181 self.assertEqual(response.status, client.OK) 1182 s = socket.socket(fileno=response.fileno()) 1183 try: 1184 s.sendall(b"proxied data\n") 1185 finally: 1186 s.detach() 1187 finally: 1188 response.close() 1189 conn.close() 1190 thread.join() 1191 self.assertEqual(result, b"proxied data\n") 1192 1193 def test_putrequest_override_domain_validation(self): 1194 """ 1195 It should be possible to override the default validation 1196 behavior in putrequest (bpo-38216). 1197 """ 1198 class UnsafeHTTPConnection(client.HTTPConnection): 1199 def _validate_path(self, url): 1200 pass 1201 1202 conn = UnsafeHTTPConnection('example.com') 1203 conn.sock = FakeSocket('') 1204 conn.putrequest('GET', '/\x00') 1205 1206 def test_putrequest_override_host_validation(self): 1207 class UnsafeHTTPConnection(client.HTTPConnection): 1208 def _validate_host(self, url): 1209 pass 1210 1211 conn = UnsafeHTTPConnection('example.com\r\n') 1212 conn.sock = FakeSocket('') 1213 # set skip_host so a ValueError is not raised upon adding the 1214 # invalid URL as the value of the "Host:" header 1215 conn.putrequest('GET', '/', skip_host=1) 1216 1217 def test_putrequest_override_encoding(self): 1218 """ 1219 It should be possible to override the default encoding 1220 to transmit bytes in another encoding even if invalid 1221 (bpo-36274). 1222 """ 1223 class UnsafeHTTPConnection(client.HTTPConnection): 1224 def _encode_request(self, str_url): 1225 return str_url.encode('utf-8') 1226 1227 conn = UnsafeHTTPConnection('example.com') 1228 conn.sock = FakeSocket('') 1229 conn.putrequest('GET', '/☃') 1230 1231 1232class ExtendedReadTest(TestCase): 1233 """ 1234 Test peek(), read1(), readline() 1235 """ 1236 lines = ( 1237 'HTTP/1.1 200 OK\r\n' 1238 '\r\n' 1239 'hello world!\n' 1240 'and now \n' 1241 'for something completely different\n' 1242 'foo' 1243 ) 1244 lines_expected = lines[lines.find('hello'):].encode("ascii") 1245 lines_chunked = ( 1246 'HTTP/1.1 200 OK\r\n' 1247 'Transfer-Encoding: chunked\r\n\r\n' 1248 'a\r\n' 1249 'hello worl\r\n' 1250 '3\r\n' 1251 'd!\n\r\n' 1252 '9\r\n' 1253 'and now \n\r\n' 1254 '23\r\n' 1255 'for something completely different\n\r\n' 1256 '3\r\n' 1257 'foo\r\n' 1258 '0\r\n' # terminating chunk 1259 '\r\n' # end of trailers 1260 ) 1261 1262 def setUp(self): 1263 sock = FakeSocket(self.lines) 1264 resp = client.HTTPResponse(sock, method="GET") 1265 resp.begin() 1266 resp.fp = io.BufferedReader(resp.fp) 1267 self.resp = resp 1268 1269 1270 1271 def test_peek(self): 1272 resp = self.resp 1273 # patch up the buffered peek so that it returns not too much stuff 1274 oldpeek = resp.fp.peek 1275 def mypeek(n=-1): 1276 p = oldpeek(n) 1277 if n >= 0: 1278 return p[:n] 1279 return p[:10] 1280 resp.fp.peek = mypeek 1281 1282 all = [] 1283 while True: 1284 # try a short peek 1285 p = resp.peek(3) 1286 if p: 1287 self.assertGreater(len(p), 0) 1288 # then unbounded peek 1289 p2 = resp.peek() 1290 self.assertGreaterEqual(len(p2), len(p)) 1291 self.assertTrue(p2.startswith(p)) 1292 next = resp.read(len(p2)) 1293 self.assertEqual(next, p2) 1294 else: 1295 next = resp.read() 1296 self.assertFalse(next) 1297 all.append(next) 1298 if not next: 1299 break 1300 self.assertEqual(b"".join(all), self.lines_expected) 1301 1302 def test_readline(self): 1303 resp = self.resp 1304 self._verify_readline(self.resp.readline, self.lines_expected) 1305 1306 def _verify_readline(self, readline, expected): 1307 all = [] 1308 while True: 1309 # short readlines 1310 line = readline(5) 1311 if line and line != b"foo": 1312 if len(line) < 5: 1313 self.assertTrue(line.endswith(b"\n")) 1314 all.append(line) 1315 if not line: 1316 break 1317 self.assertEqual(b"".join(all), expected) 1318 1319 def test_read1(self): 1320 resp = self.resp 1321 def r(): 1322 res = resp.read1(4) 1323 self.assertLessEqual(len(res), 4) 1324 return res 1325 readliner = Readliner(r) 1326 self._verify_readline(readliner.readline, self.lines_expected) 1327 1328 def test_read1_unbounded(self): 1329 resp = self.resp 1330 all = [] 1331 while True: 1332 data = resp.read1() 1333 if not data: 1334 break 1335 all.append(data) 1336 self.assertEqual(b"".join(all), self.lines_expected) 1337 1338 def test_read1_bounded(self): 1339 resp = self.resp 1340 all = [] 1341 while True: 1342 data = resp.read1(10) 1343 if not data: 1344 break 1345 self.assertLessEqual(len(data), 10) 1346 all.append(data) 1347 self.assertEqual(b"".join(all), self.lines_expected) 1348 1349 def test_read1_0(self): 1350 self.assertEqual(self.resp.read1(0), b"") 1351 1352 def test_peek_0(self): 1353 p = self.resp.peek(0) 1354 self.assertLessEqual(0, len(p)) 1355 1356 1357class ExtendedReadTestChunked(ExtendedReadTest): 1358 """ 1359 Test peek(), read1(), readline() in chunked mode 1360 """ 1361 lines = ( 1362 'HTTP/1.1 200 OK\r\n' 1363 'Transfer-Encoding: chunked\r\n\r\n' 1364 'a\r\n' 1365 'hello worl\r\n' 1366 '3\r\n' 1367 'd!\n\r\n' 1368 '9\r\n' 1369 'and now \n\r\n' 1370 '23\r\n' 1371 'for something completely different\n\r\n' 1372 '3\r\n' 1373 'foo\r\n' 1374 '0\r\n' # terminating chunk 1375 '\r\n' # end of trailers 1376 ) 1377 1378 1379class Readliner: 1380 """ 1381 a simple readline class that uses an arbitrary read function and buffering 1382 """ 1383 def __init__(self, readfunc): 1384 self.readfunc = readfunc 1385 self.remainder = b"" 1386 1387 def readline(self, limit): 1388 data = [] 1389 datalen = 0 1390 read = self.remainder 1391 try: 1392 while True: 1393 idx = read.find(b'\n') 1394 if idx != -1: 1395 break 1396 if datalen + len(read) >= limit: 1397 idx = limit - datalen - 1 1398 # read more data 1399 data.append(read) 1400 read = self.readfunc() 1401 if not read: 1402 idx = 0 #eof condition 1403 break 1404 idx += 1 1405 data.append(read[:idx]) 1406 self.remainder = read[idx:] 1407 return b"".join(data) 1408 except: 1409 self.remainder = b"".join(data) 1410 raise 1411 1412 1413class OfflineTest(TestCase): 1414 def test_all(self): 1415 # Documented objects defined in the module should be in __all__ 1416 expected = {"responses"} # Allowlist documented dict() object 1417 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1418 # intentionally omitted for simplicity 1419 blacklist = {"HTTPMessage", "parse_headers"} 1420 for name in dir(client): 1421 if name.startswith("_") or name in blacklist: 1422 continue 1423 module_object = getattr(client, name) 1424 if getattr(module_object, "__module__", None) == "http.client": 1425 expected.add(name) 1426 self.assertCountEqual(client.__all__, expected) 1427 1428 def test_responses(self): 1429 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1430 1431 def test_client_constants(self): 1432 # Make sure we don't break backward compatibility with 3.4 1433 expected = [ 1434 'CONTINUE', 1435 'SWITCHING_PROTOCOLS', 1436 'PROCESSING', 1437 'OK', 1438 'CREATED', 1439 'ACCEPTED', 1440 'NON_AUTHORITATIVE_INFORMATION', 1441 'NO_CONTENT', 1442 'RESET_CONTENT', 1443 'PARTIAL_CONTENT', 1444 'MULTI_STATUS', 1445 'IM_USED', 1446 'MULTIPLE_CHOICES', 1447 'MOVED_PERMANENTLY', 1448 'FOUND', 1449 'SEE_OTHER', 1450 'NOT_MODIFIED', 1451 'USE_PROXY', 1452 'TEMPORARY_REDIRECT', 1453 'BAD_REQUEST', 1454 'UNAUTHORIZED', 1455 'PAYMENT_REQUIRED', 1456 'FORBIDDEN', 1457 'NOT_FOUND', 1458 'METHOD_NOT_ALLOWED', 1459 'NOT_ACCEPTABLE', 1460 'PROXY_AUTHENTICATION_REQUIRED', 1461 'REQUEST_TIMEOUT', 1462 'CONFLICT', 1463 'GONE', 1464 'LENGTH_REQUIRED', 1465 'PRECONDITION_FAILED', 1466 'REQUEST_ENTITY_TOO_LARGE', 1467 'REQUEST_URI_TOO_LONG', 1468 'UNSUPPORTED_MEDIA_TYPE', 1469 'REQUESTED_RANGE_NOT_SATISFIABLE', 1470 'EXPECTATION_FAILED', 1471 'MISDIRECTED_REQUEST', 1472 'UNPROCESSABLE_ENTITY', 1473 'LOCKED', 1474 'FAILED_DEPENDENCY', 1475 'UPGRADE_REQUIRED', 1476 'PRECONDITION_REQUIRED', 1477 'TOO_MANY_REQUESTS', 1478 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1479 'UNAVAILABLE_FOR_LEGAL_REASONS', 1480 'INTERNAL_SERVER_ERROR', 1481 'NOT_IMPLEMENTED', 1482 'BAD_GATEWAY', 1483 'SERVICE_UNAVAILABLE', 1484 'GATEWAY_TIMEOUT', 1485 'HTTP_VERSION_NOT_SUPPORTED', 1486 'INSUFFICIENT_STORAGE', 1487 'NOT_EXTENDED', 1488 'NETWORK_AUTHENTICATION_REQUIRED', 1489 ] 1490 for const in expected: 1491 with self.subTest(constant=const): 1492 self.assertTrue(hasattr(client, const)) 1493 1494 1495class SourceAddressTest(TestCase): 1496 def setUp(self): 1497 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1498 self.port = support.bind_port(self.serv) 1499 self.source_port = support.find_unused_port() 1500 self.serv.listen() 1501 self.conn = None 1502 1503 def tearDown(self): 1504 if self.conn: 1505 self.conn.close() 1506 self.conn = None 1507 self.serv.close() 1508 self.serv = None 1509 1510 def testHTTPConnectionSourceAddress(self): 1511 self.conn = client.HTTPConnection(HOST, self.port, 1512 source_address=('', self.source_port)) 1513 self.conn.connect() 1514 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1515 1516 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1517 'http.client.HTTPSConnection not defined') 1518 def testHTTPSConnectionSourceAddress(self): 1519 self.conn = client.HTTPSConnection(HOST, self.port, 1520 source_address=('', self.source_port)) 1521 # We don't test anything here other than the constructor not barfing as 1522 # this code doesn't deal with setting up an active running SSL server 1523 # for an ssl_wrapped connect() to actually return from. 1524 1525 1526class TimeoutTest(TestCase): 1527 PORT = None 1528 1529 def setUp(self): 1530 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1531 TimeoutTest.PORT = support.bind_port(self.serv) 1532 self.serv.listen() 1533 1534 def tearDown(self): 1535 self.serv.close() 1536 self.serv = None 1537 1538 def testTimeoutAttribute(self): 1539 # This will prove that the timeout gets through HTTPConnection 1540 # and into the socket. 1541 1542 # default -- use global socket timeout 1543 self.assertIsNone(socket.getdefaulttimeout()) 1544 socket.setdefaulttimeout(30) 1545 try: 1546 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1547 httpConn.connect() 1548 finally: 1549 socket.setdefaulttimeout(None) 1550 self.assertEqual(httpConn.sock.gettimeout(), 30) 1551 httpConn.close() 1552 1553 # no timeout -- do not use global socket default 1554 self.assertIsNone(socket.getdefaulttimeout()) 1555 socket.setdefaulttimeout(30) 1556 try: 1557 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1558 timeout=None) 1559 httpConn.connect() 1560 finally: 1561 socket.setdefaulttimeout(None) 1562 self.assertEqual(httpConn.sock.gettimeout(), None) 1563 httpConn.close() 1564 1565 # a value 1566 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1567 httpConn.connect() 1568 self.assertEqual(httpConn.sock.gettimeout(), 30) 1569 httpConn.close() 1570 1571 1572class PersistenceTest(TestCase): 1573 1574 def test_reuse_reconnect(self): 1575 # Should reuse or reconnect depending on header from server 1576 tests = ( 1577 ('1.0', '', False), 1578 ('1.0', 'Connection: keep-alive\r\n', True), 1579 ('1.1', '', True), 1580 ('1.1', 'Connection: close\r\n', False), 1581 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1582 ('1.1', 'Connection: cloSE\r\n', False), 1583 ) 1584 for version, header, reuse in tests: 1585 with self.subTest(version=version, header=header): 1586 msg = ( 1587 'HTTP/{} 200 OK\r\n' 1588 '{}' 1589 'Content-Length: 12\r\n' 1590 '\r\n' 1591 'Dummy body\r\n' 1592 ).format(version, header) 1593 conn = FakeSocketHTTPConnection(msg) 1594 self.assertIsNone(conn.sock) 1595 conn.request('GET', '/open-connection') 1596 with conn.getresponse() as response: 1597 self.assertEqual(conn.sock is None, not reuse) 1598 response.read() 1599 self.assertEqual(conn.sock is None, not reuse) 1600 self.assertEqual(conn.connections, 1) 1601 conn.request('GET', '/subsequent-request') 1602 self.assertEqual(conn.connections, 1 if reuse else 2) 1603 1604 def test_disconnected(self): 1605 1606 def make_reset_reader(text): 1607 """Return BufferedReader that raises ECONNRESET at EOF""" 1608 stream = io.BytesIO(text) 1609 def readinto(buffer): 1610 size = io.BytesIO.readinto(stream, buffer) 1611 if size == 0: 1612 raise ConnectionResetError() 1613 return size 1614 stream.readinto = readinto 1615 return io.BufferedReader(stream) 1616 1617 tests = ( 1618 (io.BytesIO, client.RemoteDisconnected), 1619 (make_reset_reader, ConnectionResetError), 1620 ) 1621 for stream_factory, exception in tests: 1622 with self.subTest(exception=exception): 1623 conn = FakeSocketHTTPConnection(b'', stream_factory) 1624 conn.request('GET', '/eof-response') 1625 self.assertRaises(exception, conn.getresponse) 1626 self.assertIsNone(conn.sock) 1627 # HTTPConnection.connect() should be automatically invoked 1628 conn.request('GET', '/reconnect') 1629 self.assertEqual(conn.connections, 2) 1630 1631 def test_100_close(self): 1632 conn = FakeSocketHTTPConnection( 1633 b'HTTP/1.1 100 Continue\r\n' 1634 b'\r\n' 1635 # Missing final response 1636 ) 1637 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1638 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1639 self.assertIsNone(conn.sock) 1640 conn.request('GET', '/reconnect') 1641 self.assertEqual(conn.connections, 2) 1642 1643 1644class HTTPSTest(TestCase): 1645 1646 def setUp(self): 1647 if not hasattr(client, 'HTTPSConnection'): 1648 self.skipTest('ssl support required') 1649 1650 def make_server(self, certfile): 1651 from test.ssl_servers import make_https_server 1652 return make_https_server(self, certfile=certfile) 1653 1654 def test_attributes(self): 1655 # simple test to check it's storing the timeout 1656 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1657 self.assertEqual(h.timeout, 30) 1658 1659 def test_networked(self): 1660 # Default settings: requires a valid cert from a trusted CA 1661 import ssl 1662 support.requires('network') 1663 with support.transient_internet('self-signed.pythontest.net'): 1664 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1665 with self.assertRaises(ssl.SSLError) as exc_info: 1666 h.request('GET', '/') 1667 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1668 1669 def test_networked_noverification(self): 1670 # Switch off cert verification 1671 import ssl 1672 support.requires('network') 1673 with support.transient_internet('self-signed.pythontest.net'): 1674 context = ssl._create_unverified_context() 1675 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1676 context=context) 1677 h.request('GET', '/') 1678 resp = h.getresponse() 1679 h.close() 1680 self.assertIn('nginx', resp.getheader('server')) 1681 resp.close() 1682 1683 @support.system_must_validate_cert 1684 def test_networked_trusted_by_default_cert(self): 1685 # Default settings: requires a valid cert from a trusted CA 1686 support.requires('network') 1687 with support.transient_internet('www.python.org'): 1688 h = client.HTTPSConnection('www.python.org', 443) 1689 h.request('GET', '/') 1690 resp = h.getresponse() 1691 content_type = resp.getheader('content-type') 1692 resp.close() 1693 h.close() 1694 self.assertIn('text/html', content_type) 1695 1696 def test_networked_good_cert(self): 1697 # We feed the server's cert as a validating cert 1698 import ssl 1699 support.requires('network') 1700 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1701 with support.transient_internet(selfsigned_pythontestdotnet): 1702 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1703 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1704 self.assertEqual(context.check_hostname, True) 1705 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1706 try: 1707 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1708 context=context) 1709 h.request('GET', '/') 1710 resp = h.getresponse() 1711 except ssl.SSLError as ssl_err: 1712 ssl_err_str = str(ssl_err) 1713 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1714 # modern Linux distros (Debian Buster, etc) default OpenSSL 1715 # configurations it'll fail saying "key too weak" until we 1716 # address https://bugs.python.org/issue36816 to use a proper 1717 # key size on self-signed.pythontest.net. 1718 if re.search(r'(?i)key.too.weak', ssl_err_str): 1719 raise unittest.SkipTest( 1720 f'Got {ssl_err_str} trying to connect ' 1721 f'to {selfsigned_pythontestdotnet}. ' 1722 'See https://bugs.python.org/issue36816.') 1723 raise 1724 server_string = resp.getheader('server') 1725 resp.close() 1726 h.close() 1727 self.assertIn('nginx', server_string) 1728 1729 def test_networked_bad_cert(self): 1730 # We feed a "CA" cert that is unrelated to the server's cert 1731 import ssl 1732 support.requires('network') 1733 with support.transient_internet('self-signed.pythontest.net'): 1734 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1735 context.load_verify_locations(CERT_localhost) 1736 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1737 with self.assertRaises(ssl.SSLError) as exc_info: 1738 h.request('GET', '/') 1739 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1740 1741 def test_local_unknown_cert(self): 1742 # The custom cert isn't known to the default trust bundle 1743 import ssl 1744 server = self.make_server(CERT_localhost) 1745 h = client.HTTPSConnection('localhost', server.port) 1746 with self.assertRaises(ssl.SSLError) as exc_info: 1747 h.request('GET', '/') 1748 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1749 1750 def test_local_good_hostname(self): 1751 # The (valid) cert validates the HTTP hostname 1752 import ssl 1753 server = self.make_server(CERT_localhost) 1754 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1755 context.load_verify_locations(CERT_localhost) 1756 h = client.HTTPSConnection('localhost', server.port, context=context) 1757 self.addCleanup(h.close) 1758 h.request('GET', '/nonexistent') 1759 resp = h.getresponse() 1760 self.addCleanup(resp.close) 1761 self.assertEqual(resp.status, 404) 1762 1763 def test_local_bad_hostname(self): 1764 # The (valid) cert doesn't validate the HTTP hostname 1765 import ssl 1766 server = self.make_server(CERT_fakehostname) 1767 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1768 context.load_verify_locations(CERT_fakehostname) 1769 h = client.HTTPSConnection('localhost', server.port, context=context) 1770 with self.assertRaises(ssl.CertificateError): 1771 h.request('GET', '/') 1772 # Same with explicit check_hostname=True 1773 with support.check_warnings(('', DeprecationWarning)): 1774 h = client.HTTPSConnection('localhost', server.port, 1775 context=context, check_hostname=True) 1776 with self.assertRaises(ssl.CertificateError): 1777 h.request('GET', '/') 1778 # With check_hostname=False, the mismatching is ignored 1779 context.check_hostname = False 1780 with support.check_warnings(('', DeprecationWarning)): 1781 h = client.HTTPSConnection('localhost', server.port, 1782 context=context, check_hostname=False) 1783 h.request('GET', '/nonexistent') 1784 resp = h.getresponse() 1785 resp.close() 1786 h.close() 1787 self.assertEqual(resp.status, 404) 1788 # The context's check_hostname setting is used if one isn't passed to 1789 # HTTPSConnection. 1790 context.check_hostname = False 1791 h = client.HTTPSConnection('localhost', server.port, context=context) 1792 h.request('GET', '/nonexistent') 1793 resp = h.getresponse() 1794 self.assertEqual(resp.status, 404) 1795 resp.close() 1796 h.close() 1797 # Passing check_hostname to HTTPSConnection should override the 1798 # context's setting. 1799 with support.check_warnings(('', DeprecationWarning)): 1800 h = client.HTTPSConnection('localhost', server.port, 1801 context=context, check_hostname=True) 1802 with self.assertRaises(ssl.CertificateError): 1803 h.request('GET', '/') 1804 1805 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1806 'http.client.HTTPSConnection not available') 1807 def test_host_port(self): 1808 # Check invalid host_port 1809 1810 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1811 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1812 1813 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1814 "fe80::207:e9ff:fe9b", 8000), 1815 ("www.python.org:443", "www.python.org", 443), 1816 ("www.python.org:", "www.python.org", 443), 1817 ("www.python.org", "www.python.org", 443), 1818 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1819 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1820 443)): 1821 c = client.HTTPSConnection(hp) 1822 self.assertEqual(h, c.host) 1823 self.assertEqual(p, c.port) 1824 1825 def test_tls13_pha(self): 1826 import ssl 1827 if not ssl.HAS_TLSv1_3: 1828 self.skipTest('TLS 1.3 support required') 1829 # just check status of PHA flag 1830 h = client.HTTPSConnection('localhost', 443) 1831 self.assertTrue(h._context.post_handshake_auth) 1832 1833 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1834 self.assertFalse(context.post_handshake_auth) 1835 h = client.HTTPSConnection('localhost', 443, context=context) 1836 self.assertIs(h._context, context) 1837 self.assertFalse(h._context.post_handshake_auth) 1838 1839 with warnings.catch_warnings(): 1840 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1841 DeprecationWarning) 1842 h = client.HTTPSConnection('localhost', 443, context=context, 1843 cert_file=CERT_localhost) 1844 self.assertTrue(h._context.post_handshake_auth) 1845 1846 1847class RequestBodyTest(TestCase): 1848 """Test cases where a request includes a message body.""" 1849 1850 def setUp(self): 1851 self.conn = client.HTTPConnection('example.com') 1852 self.conn.sock = self.sock = FakeSocket("") 1853 self.conn.sock = self.sock 1854 1855 def get_headers_and_fp(self): 1856 f = io.BytesIO(self.sock.data) 1857 f.readline() # read the request line 1858 message = client.parse_headers(f) 1859 return message, f 1860 1861 def test_list_body(self): 1862 # Note that no content-length is automatically calculated for 1863 # an iterable. The request will fall back to send chunked 1864 # transfer encoding. 1865 cases = ( 1866 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1867 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1868 ) 1869 for body, expected in cases: 1870 with self.subTest(body): 1871 self.conn = client.HTTPConnection('example.com') 1872 self.conn.sock = self.sock = FakeSocket('') 1873 1874 self.conn.request('PUT', '/url', body) 1875 msg, f = self.get_headers_and_fp() 1876 self.assertNotIn('Content-Type', msg) 1877 self.assertNotIn('Content-Length', msg) 1878 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1879 self.assertEqual(expected, f.read()) 1880 1881 def test_manual_content_length(self): 1882 # Set an incorrect content-length so that we can verify that 1883 # it will not be over-ridden by the library. 1884 self.conn.request("PUT", "/url", "body", 1885 {"Content-Length": "42"}) 1886 message, f = self.get_headers_and_fp() 1887 self.assertEqual("42", message.get("content-length")) 1888 self.assertEqual(4, len(f.read())) 1889 1890 def test_ascii_body(self): 1891 self.conn.request("PUT", "/url", "body") 1892 message, f = self.get_headers_and_fp() 1893 self.assertEqual("text/plain", message.get_content_type()) 1894 self.assertIsNone(message.get_charset()) 1895 self.assertEqual("4", message.get("content-length")) 1896 self.assertEqual(b'body', f.read()) 1897 1898 def test_latin1_body(self): 1899 self.conn.request("PUT", "/url", "body\xc1") 1900 message, f = self.get_headers_and_fp() 1901 self.assertEqual("text/plain", message.get_content_type()) 1902 self.assertIsNone(message.get_charset()) 1903 self.assertEqual("5", message.get("content-length")) 1904 self.assertEqual(b'body\xc1', f.read()) 1905 1906 def test_bytes_body(self): 1907 self.conn.request("PUT", "/url", b"body\xc1") 1908 message, f = self.get_headers_and_fp() 1909 self.assertEqual("text/plain", message.get_content_type()) 1910 self.assertIsNone(message.get_charset()) 1911 self.assertEqual("5", message.get("content-length")) 1912 self.assertEqual(b'body\xc1', f.read()) 1913 1914 def test_text_file_body(self): 1915 self.addCleanup(support.unlink, support.TESTFN) 1916 with open(support.TESTFN, "w") as f: 1917 f.write("body") 1918 with open(support.TESTFN) as f: 1919 self.conn.request("PUT", "/url", f) 1920 message, f = self.get_headers_and_fp() 1921 self.assertEqual("text/plain", message.get_content_type()) 1922 self.assertIsNone(message.get_charset()) 1923 # No content-length will be determined for files; the body 1924 # will be sent using chunked transfer encoding instead. 1925 self.assertIsNone(message.get("content-length")) 1926 self.assertEqual("chunked", message.get("transfer-encoding")) 1927 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1928 1929 def test_binary_file_body(self): 1930 self.addCleanup(support.unlink, support.TESTFN) 1931 with open(support.TESTFN, "wb") as f: 1932 f.write(b"body\xc1") 1933 with open(support.TESTFN, "rb") as f: 1934 self.conn.request("PUT", "/url", f) 1935 message, f = self.get_headers_and_fp() 1936 self.assertEqual("text/plain", message.get_content_type()) 1937 self.assertIsNone(message.get_charset()) 1938 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1939 self.assertNotIn("Content-Length", message) 1940 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1941 1942 1943class HTTPResponseTest(TestCase): 1944 1945 def setUp(self): 1946 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1947 second-value\r\n\r\nText" 1948 sock = FakeSocket(body) 1949 self.resp = client.HTTPResponse(sock) 1950 self.resp.begin() 1951 1952 def test_getting_header(self): 1953 header = self.resp.getheader('My-Header') 1954 self.assertEqual(header, 'first-value, second-value') 1955 1956 header = self.resp.getheader('My-Header', 'some default') 1957 self.assertEqual(header, 'first-value, second-value') 1958 1959 def test_getting_nonexistent_header_with_string_default(self): 1960 header = self.resp.getheader('No-Such-Header', 'default-value') 1961 self.assertEqual(header, 'default-value') 1962 1963 def test_getting_nonexistent_header_with_iterable_default(self): 1964 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1965 self.assertEqual(header, 'default, values') 1966 1967 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1968 self.assertEqual(header, 'default, values') 1969 1970 def test_getting_nonexistent_header_without_default(self): 1971 header = self.resp.getheader('No-Such-Header') 1972 self.assertEqual(header, None) 1973 1974 def test_getting_header_defaultint(self): 1975 header = self.resp.getheader('No-Such-Header',default=42) 1976 self.assertEqual(header, 42) 1977 1978class TunnelTests(TestCase): 1979 def setUp(self): 1980 response_text = ( 1981 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1982 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1983 'Content-Length: 42\r\n\r\n' 1984 ) 1985 self.host = 'proxy.com' 1986 self.conn = client.HTTPConnection(self.host) 1987 self.conn._create_connection = self._create_connection(response_text) 1988 1989 def tearDown(self): 1990 self.conn.close() 1991 1992 def _create_connection(self, response_text): 1993 def create_connection(address, timeout=None, source_address=None): 1994 return FakeSocket(response_text, host=address[0], port=address[1]) 1995 return create_connection 1996 1997 def test_set_tunnel_host_port_headers(self): 1998 tunnel_host = 'destination.com' 1999 tunnel_port = 8888 2000 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 2001 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 2002 headers=tunnel_headers) 2003 self.conn.request('HEAD', '/', '') 2004 self.assertEqual(self.conn.sock.host, self.host) 2005 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2006 self.assertEqual(self.conn._tunnel_host, tunnel_host) 2007 self.assertEqual(self.conn._tunnel_port, tunnel_port) 2008 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 2009 2010 def test_disallow_set_tunnel_after_connect(self): 2011 # Once connected, we shouldn't be able to tunnel anymore 2012 self.conn.connect() 2013 self.assertRaises(RuntimeError, self.conn.set_tunnel, 2014 'destination.com') 2015 2016 def test_connect_with_tunnel(self): 2017 self.conn.set_tunnel('destination.com') 2018 self.conn.request('HEAD', '/', '') 2019 self.assertEqual(self.conn.sock.host, self.host) 2020 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2021 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2022 # issue22095 2023 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 2024 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2025 2026 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 2027 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 2028 2029 def test_connect_put_request(self): 2030 self.conn.set_tunnel('destination.com') 2031 self.conn.request('PUT', '/', '') 2032 self.assertEqual(self.conn.sock.host, self.host) 2033 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2034 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2035 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2036 2037 def test_tunnel_debuglog(self): 2038 expected_header = 'X-Dummy: 1' 2039 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 2040 2041 self.conn.set_debuglevel(1) 2042 self.conn._create_connection = self._create_connection(response_text) 2043 self.conn.set_tunnel('destination.com') 2044 2045 with support.captured_stdout() as output: 2046 self.conn.request('PUT', '/', '') 2047 lines = output.getvalue().splitlines() 2048 self.assertIn('header: {}'.format(expected_header), lines) 2049 2050 2051if __name__ == '__main__': 2052 unittest.main(verbosity=2) 2053