1import errno 2from http import client 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13TestCase = unittest.TestCase 14 15from test import support 16 17here = os.path.dirname(__file__) 18# Self-signed cert file for 'localhost' 19CERT_localhost = os.path.join(here, 'keycert.pem') 20# Self-signed cert file for 'fakehostname' 21CERT_fakehostname = os.path.join(here, 'keycert2.pem') 22# Self-signed cert file for self-signed.pythontest.net 23CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 24 25# constants for testing chunked encoding 26chunked_start = ( 27 'HTTP/1.1 200 OK\r\n' 28 'Transfer-Encoding: chunked\r\n\r\n' 29 'a\r\n' 30 'hello worl\r\n' 31 '3\r\n' 32 'd! \r\n' 33 '8\r\n' 34 'and now \r\n' 35 '22\r\n' 36 'for something completely different\r\n' 37) 38chunked_expected = b'hello world! and now for something completely different' 39chunk_extension = ";foo=bar" 40last_chunk = "0\r\n" 41last_chunk_extended = "0" + chunk_extension + "\r\n" 42trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 43chunked_end = "\r\n" 44 45HOST = support.HOST 46 47class FakeSocket: 48 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 49 if isinstance(text, str): 50 text = text.encode("ascii") 51 self.text = text 52 self.fileclass = fileclass 53 self.data = b'' 54 self.sendall_calls = 0 55 self.file_closed = False 56 self.host = host 57 self.port = port 58 59 def sendall(self, data): 60 self.sendall_calls += 1 61 self.data += data 62 63 def makefile(self, mode, bufsize=None): 64 if mode != 'r' and mode != 'rb': 65 raise client.UnimplementedFileMode() 66 # keep the file around so we can check how much was read from it 67 self.file = self.fileclass(self.text) 68 self.file.close = self.file_close #nerf close () 69 return self.file 70 71 def file_close(self): 72 self.file_closed = True 73 74 def close(self): 75 pass 76 77 def setsockopt(self, level, optname, value): 78 pass 79 80class EPipeSocket(FakeSocket): 81 82 def __init__(self, text, pipe_trigger): 83 # When sendall() is called with pipe_trigger, raise EPIPE. 84 FakeSocket.__init__(self, text) 85 self.pipe_trigger = pipe_trigger 86 87 def sendall(self, data): 88 if self.pipe_trigger in data: 89 raise OSError(errno.EPIPE, "gotcha") 90 self.data += data 91 92 def close(self): 93 pass 94 95class NoEOFBytesIO(io.BytesIO): 96 """Like BytesIO, but raises AssertionError on EOF. 97 98 This is used below to test that http.client doesn't try to read 99 more from the underlying file than it should. 100 """ 101 def read(self, n=-1): 102 data = io.BytesIO.read(self, n) 103 if data == b'': 104 raise AssertionError('caller tried to read past EOF') 105 return data 106 107 def readline(self, length=None): 108 data = io.BytesIO.readline(self, length) 109 if data == b'': 110 raise AssertionError('caller tried to read past EOF') 111 return data 112 113class FakeSocketHTTPConnection(client.HTTPConnection): 114 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 115 116 def __init__(self, *args): 117 self.connections = 0 118 super().__init__('example.com') 119 self.fake_socket_args = args 120 self._create_connection = self.create_connection 121 122 def connect(self): 123 """Count the number of times connect() is invoked""" 124 self.connections += 1 125 return super().connect() 126 127 def create_connection(self, *pos, **kw): 128 return FakeSocket(*self.fake_socket_args) 129 130class HeaderTests(TestCase): 131 def test_auto_headers(self): 132 # Some headers are added automatically, but should not be added by 133 # .request() if they are explicitly set. 134 135 class HeaderCountingBuffer(list): 136 def __init__(self): 137 self.count = {} 138 def append(self, item): 139 kv = item.split(b':') 140 if len(kv) > 1: 141 # item is a 'Key: Value' header string 142 lcKey = kv[0].decode('ascii').lower() 143 self.count.setdefault(lcKey, 0) 144 self.count[lcKey] += 1 145 list.append(self, item) 146 147 for explicit_header in True, False: 148 for header in 'Content-length', 'Host', 'Accept-encoding': 149 conn = client.HTTPConnection('example.com') 150 conn.sock = FakeSocket('blahblahblah') 151 conn._buffer = HeaderCountingBuffer() 152 153 body = 'spamspamspam' 154 headers = {} 155 if explicit_header: 156 headers[header] = str(len(body)) 157 conn.request('POST', '/', body, headers) 158 self.assertEqual(conn._buffer.count[header.lower()], 1) 159 160 def test_content_length_0(self): 161 162 class ContentLengthChecker(list): 163 def __init__(self): 164 list.__init__(self) 165 self.content_length = None 166 def append(self, item): 167 kv = item.split(b':', 1) 168 if len(kv) > 1 and kv[0].lower() == b'content-length': 169 self.content_length = kv[1].strip() 170 list.append(self, item) 171 172 # Here, we're testing that methods expecting a body get a 173 # content-length set to zero if the body is empty (either None or '') 174 bodies = (None, '') 175 methods_with_body = ('PUT', 'POST', 'PATCH') 176 for method, body in itertools.product(methods_with_body, bodies): 177 conn = client.HTTPConnection('example.com') 178 conn.sock = FakeSocket(None) 179 conn._buffer = ContentLengthChecker() 180 conn.request(method, '/', body) 181 self.assertEqual( 182 conn._buffer.content_length, b'0', 183 'Header Content-Length incorrect on {}'.format(method) 184 ) 185 186 # For these methods, we make sure that content-length is not set when 187 # the body is None because it might cause unexpected behaviour on the 188 # server. 189 methods_without_body = ( 190 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 191 ) 192 for method in methods_without_body: 193 conn = client.HTTPConnection('example.com') 194 conn.sock = FakeSocket(None) 195 conn._buffer = ContentLengthChecker() 196 conn.request(method, '/', None) 197 self.assertEqual( 198 conn._buffer.content_length, None, 199 'Header Content-Length set for empty body on {}'.format(method) 200 ) 201 202 # If the body is set to '', that's considered to be "present but 203 # empty" rather than "missing", so content length would be set, even 204 # for methods that don't expect a body. 205 for method in methods_without_body: 206 conn = client.HTTPConnection('example.com') 207 conn.sock = FakeSocket(None) 208 conn._buffer = ContentLengthChecker() 209 conn.request(method, '/', '') 210 self.assertEqual( 211 conn._buffer.content_length, b'0', 212 'Header Content-Length incorrect on {}'.format(method) 213 ) 214 215 # If the body is set, make sure Content-Length is set. 216 for method in itertools.chain(methods_without_body, methods_with_body): 217 conn = client.HTTPConnection('example.com') 218 conn.sock = FakeSocket(None) 219 conn._buffer = ContentLengthChecker() 220 conn.request(method, '/', ' ') 221 self.assertEqual( 222 conn._buffer.content_length, b'1', 223 'Header Content-Length incorrect on {}'.format(method) 224 ) 225 226 def test_putheader(self): 227 conn = client.HTTPConnection('example.com') 228 conn.sock = FakeSocket(None) 229 conn.putrequest('GET','/') 230 conn.putheader('Content-length', 42) 231 self.assertIn(b'Content-length: 42', conn._buffer) 232 233 conn.putheader('Foo', ' bar ') 234 self.assertIn(b'Foo: bar ', conn._buffer) 235 conn.putheader('Bar', '\tbaz\t') 236 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 237 conn.putheader('Authorization', 'Bearer mytoken') 238 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 239 conn.putheader('IterHeader', 'IterA', 'IterB') 240 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 241 conn.putheader('LatinHeader', b'\xFF') 242 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 243 conn.putheader('Utf8Header', b'\xc3\x80') 244 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 245 conn.putheader('C1-Control', b'next\x85line') 246 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 247 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 248 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 249 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 250 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 251 conn.putheader('Key Space', 'value') 252 self.assertIn(b'Key Space: value', conn._buffer) 253 conn.putheader('KeySpace ', 'value') 254 self.assertIn(b'KeySpace : value', conn._buffer) 255 conn.putheader(b'Nonbreak\xa0Space', 'value') 256 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 257 conn.putheader(b'\xa0NonbreakSpace', 'value') 258 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 259 260 def test_ipv6host_header(self): 261 # Default host header on IPv6 transaction should be wrapped by [] if 262 # it is an IPv6 address 263 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 264 b'Accept-Encoding: identity\r\n\r\n' 265 conn = client.HTTPConnection('[2001::]:81') 266 sock = FakeSocket('') 267 conn.sock = sock 268 conn.request('GET', '/foo') 269 self.assertTrue(sock.data.startswith(expected)) 270 271 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 272 b'Accept-Encoding: identity\r\n\r\n' 273 conn = client.HTTPConnection('[2001:102A::]') 274 sock = FakeSocket('') 275 conn.sock = sock 276 conn.request('GET', '/foo') 277 self.assertTrue(sock.data.startswith(expected)) 278 279 def test_malformed_headers_coped_with(self): 280 # Issue 19996 281 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 282 sock = FakeSocket(body) 283 resp = client.HTTPResponse(sock) 284 resp.begin() 285 286 self.assertEqual(resp.getheader('First'), 'val') 287 self.assertEqual(resp.getheader('Second'), 'val') 288 289 def test_parse_all_octets(self): 290 # Ensure no valid header field octet breaks the parser 291 body = ( 292 b'HTTP/1.1 200 OK\r\n' 293 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 294 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 295 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 296 b'obs-fold: text\r\n' 297 b' folded with space\r\n' 298 b'\tfolded with tab\r\n' 299 b'Content-Length: 0\r\n' 300 b'\r\n' 301 ) 302 sock = FakeSocket(body) 303 resp = client.HTTPResponse(sock) 304 resp.begin() 305 self.assertEqual(resp.getheader('Content-Length'), '0') 306 self.assertEqual(resp.msg['Content-Length'], '0') 307 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 308 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 309 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 310 self.assertEqual(resp.getheader('VCHAR'), vchar) 311 self.assertEqual(resp.msg['VCHAR'], vchar) 312 self.assertIsNotNone(resp.getheader('obs-text')) 313 self.assertIn('obs-text', resp.msg) 314 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 315 self.assertTrue(folded.startswith('text')) 316 self.assertIn(' folded with space', folded) 317 self.assertTrue(folded.endswith('folded with tab')) 318 319 def test_invalid_headers(self): 320 conn = client.HTTPConnection('example.com') 321 conn.sock = FakeSocket('') 322 conn.putrequest('GET', '/') 323 324 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 325 # longer allowed in header names 326 cases = ( 327 (b'Invalid\r\nName', b'ValidValue'), 328 (b'Invalid\rName', b'ValidValue'), 329 (b'Invalid\nName', b'ValidValue'), 330 (b'\r\nInvalidName', b'ValidValue'), 331 (b'\rInvalidName', b'ValidValue'), 332 (b'\nInvalidName', b'ValidValue'), 333 (b' InvalidName', b'ValidValue'), 334 (b'\tInvalidName', b'ValidValue'), 335 (b'Invalid:Name', b'ValidValue'), 336 (b':InvalidName', b'ValidValue'), 337 (b'ValidName', b'Invalid\r\nValue'), 338 (b'ValidName', b'Invalid\rValue'), 339 (b'ValidName', b'Invalid\nValue'), 340 (b'ValidName', b'InvalidValue\r\n'), 341 (b'ValidName', b'InvalidValue\r'), 342 (b'ValidName', b'InvalidValue\n'), 343 ) 344 for name, value in cases: 345 with self.subTest((name, value)): 346 with self.assertRaisesRegex(ValueError, 'Invalid header'): 347 conn.putheader(name, value) 348 349 def test_headers_debuglevel(self): 350 body = ( 351 b'HTTP/1.1 200 OK\r\n' 352 b'First: val\r\n' 353 b'Second: val1\r\n' 354 b'Second: val2\r\n' 355 ) 356 sock = FakeSocket(body) 357 resp = client.HTTPResponse(sock, debuglevel=1) 358 with support.captured_stdout() as output: 359 resp.begin() 360 lines = output.getvalue().splitlines() 361 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 362 self.assertEqual(lines[1], "header: First: val") 363 self.assertEqual(lines[2], "header: Second: val1") 364 self.assertEqual(lines[3], "header: Second: val2") 365 366 367class TransferEncodingTest(TestCase): 368 expected_body = b"It's just a flesh wound" 369 370 def test_endheaders_chunked(self): 371 conn = client.HTTPConnection('example.com') 372 conn.sock = FakeSocket(b'') 373 conn.putrequest('POST', '/') 374 conn.endheaders(self._make_body(), encode_chunked=True) 375 376 _, _, body = self._parse_request(conn.sock.data) 377 body = self._parse_chunked(body) 378 self.assertEqual(body, self.expected_body) 379 380 def test_explicit_headers(self): 381 # explicit chunked 382 conn = client.HTTPConnection('example.com') 383 conn.sock = FakeSocket(b'') 384 # this shouldn't actually be automatically chunk-encoded because the 385 # calling code has explicitly stated that it's taking care of it 386 conn.request( 387 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 388 389 _, headers, body = self._parse_request(conn.sock.data) 390 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 391 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 392 self.assertEqual(body, self.expected_body) 393 394 # explicit chunked, string body 395 conn = client.HTTPConnection('example.com') 396 conn.sock = FakeSocket(b'') 397 conn.request( 398 'POST', '/', self.expected_body.decode('latin-1'), 399 {'Transfer-Encoding': 'chunked'}) 400 401 _, headers, body = self._parse_request(conn.sock.data) 402 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 403 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 404 self.assertEqual(body, self.expected_body) 405 406 # User-specified TE, but request() does the chunk encoding 407 conn = client.HTTPConnection('example.com') 408 conn.sock = FakeSocket(b'') 409 conn.request('POST', '/', 410 headers={'Transfer-Encoding': 'gzip, chunked'}, 411 encode_chunked=True, 412 body=self._make_body()) 413 _, headers, body = self._parse_request(conn.sock.data) 414 self.assertNotIn('content-length', [k.lower() for k in headers]) 415 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 416 self.assertEqual(self._parse_chunked(body), self.expected_body) 417 418 def test_request(self): 419 for empty_lines in (False, True,): 420 conn = client.HTTPConnection('example.com') 421 conn.sock = FakeSocket(b'') 422 conn.request( 423 'POST', '/', self._make_body(empty_lines=empty_lines)) 424 425 _, headers, body = self._parse_request(conn.sock.data) 426 body = self._parse_chunked(body) 427 self.assertEqual(body, self.expected_body) 428 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 429 430 # Content-Length and Transfer-Encoding SHOULD not be sent in the 431 # same request 432 self.assertNotIn('content-length', [k.lower() for k in headers]) 433 434 def test_empty_body(self): 435 # Zero-length iterable should be treated like any other iterable 436 conn = client.HTTPConnection('example.com') 437 conn.sock = FakeSocket(b'') 438 conn.request('POST', '/', ()) 439 _, headers, body = self._parse_request(conn.sock.data) 440 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 441 self.assertNotIn('content-length', [k.lower() for k in headers]) 442 self.assertEqual(body, b"0\r\n\r\n") 443 444 def _make_body(self, empty_lines=False): 445 lines = self.expected_body.split(b' ') 446 for idx, line in enumerate(lines): 447 # for testing handling empty lines 448 if empty_lines and idx % 2: 449 yield b'' 450 if idx < len(lines) - 1: 451 yield line + b' ' 452 else: 453 yield line 454 455 def _parse_request(self, data): 456 lines = data.split(b'\r\n') 457 request = lines[0] 458 headers = {} 459 n = 1 460 while n < len(lines) and len(lines[n]) > 0: 461 key, val = lines[n].split(b':') 462 key = key.decode('latin-1').strip() 463 headers[key] = val.decode('latin-1').strip() 464 n += 1 465 466 return request, headers, b'\r\n'.join(lines[n + 1:]) 467 468 def _parse_chunked(self, data): 469 body = [] 470 trailers = {} 471 n = 0 472 lines = data.split(b'\r\n') 473 # parse body 474 while True: 475 size, chunk = lines[n:n+2] 476 size = int(size, 16) 477 478 if size == 0: 479 n += 1 480 break 481 482 self.assertEqual(size, len(chunk)) 483 body.append(chunk) 484 485 n += 2 486 # we /should/ hit the end chunk, but check against the size of 487 # lines so we're not stuck in an infinite loop should we get 488 # malformed data 489 if n > len(lines): 490 break 491 492 return b''.join(body) 493 494 495class BasicTest(TestCase): 496 def test_status_lines(self): 497 # Test HTTP status lines 498 499 body = "HTTP/1.1 200 Ok\r\n\r\nText" 500 sock = FakeSocket(body) 501 resp = client.HTTPResponse(sock) 502 resp.begin() 503 self.assertEqual(resp.read(0), b'') # Issue #20007 504 self.assertFalse(resp.isclosed()) 505 self.assertFalse(resp.closed) 506 self.assertEqual(resp.read(), b"Text") 507 self.assertTrue(resp.isclosed()) 508 self.assertFalse(resp.closed) 509 resp.close() 510 self.assertTrue(resp.closed) 511 512 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 513 sock = FakeSocket(body) 514 resp = client.HTTPResponse(sock) 515 self.assertRaises(client.BadStatusLine, resp.begin) 516 517 def test_bad_status_repr(self): 518 exc = client.BadStatusLine('') 519 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 520 521 def test_partial_reads(self): 522 # if we have Content-Length, HTTPResponse knows when to close itself, 523 # the same behaviour as when we read the whole thing with read() 524 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 525 sock = FakeSocket(body) 526 resp = client.HTTPResponse(sock) 527 resp.begin() 528 self.assertEqual(resp.read(2), b'Te') 529 self.assertFalse(resp.isclosed()) 530 self.assertEqual(resp.read(2), b'xt') 531 self.assertTrue(resp.isclosed()) 532 self.assertFalse(resp.closed) 533 resp.close() 534 self.assertTrue(resp.closed) 535 536 def test_mixed_reads(self): 537 # readline() should update the remaining length, so that read() knows 538 # how much data is left and does not raise IncompleteRead 539 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 540 sock = FakeSocket(body) 541 resp = client.HTTPResponse(sock) 542 resp.begin() 543 self.assertEqual(resp.readline(), b'Text\r\n') 544 self.assertFalse(resp.isclosed()) 545 self.assertEqual(resp.read(), b'Another') 546 self.assertTrue(resp.isclosed()) 547 self.assertFalse(resp.closed) 548 resp.close() 549 self.assertTrue(resp.closed) 550 551 def test_partial_readintos(self): 552 # if we have Content-Length, HTTPResponse knows when to close itself, 553 # the same behaviour as when we read the whole thing with read() 554 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 555 sock = FakeSocket(body) 556 resp = client.HTTPResponse(sock) 557 resp.begin() 558 b = bytearray(2) 559 n = resp.readinto(b) 560 self.assertEqual(n, 2) 561 self.assertEqual(bytes(b), b'Te') 562 self.assertFalse(resp.isclosed()) 563 n = resp.readinto(b) 564 self.assertEqual(n, 2) 565 self.assertEqual(bytes(b), b'xt') 566 self.assertTrue(resp.isclosed()) 567 self.assertFalse(resp.closed) 568 resp.close() 569 self.assertTrue(resp.closed) 570 571 def test_partial_reads_no_content_length(self): 572 # when no length is present, the socket should be gracefully closed when 573 # all data was read 574 body = "HTTP/1.1 200 Ok\r\n\r\nText" 575 sock = FakeSocket(body) 576 resp = client.HTTPResponse(sock) 577 resp.begin() 578 self.assertEqual(resp.read(2), b'Te') 579 self.assertFalse(resp.isclosed()) 580 self.assertEqual(resp.read(2), b'xt') 581 self.assertEqual(resp.read(1), b'') 582 self.assertTrue(resp.isclosed()) 583 self.assertFalse(resp.closed) 584 resp.close() 585 self.assertTrue(resp.closed) 586 587 def test_partial_readintos_no_content_length(self): 588 # when no length is present, the socket should be gracefully closed when 589 # all data was read 590 body = "HTTP/1.1 200 Ok\r\n\r\nText" 591 sock = FakeSocket(body) 592 resp = client.HTTPResponse(sock) 593 resp.begin() 594 b = bytearray(2) 595 n = resp.readinto(b) 596 self.assertEqual(n, 2) 597 self.assertEqual(bytes(b), b'Te') 598 self.assertFalse(resp.isclosed()) 599 n = resp.readinto(b) 600 self.assertEqual(n, 2) 601 self.assertEqual(bytes(b), b'xt') 602 n = resp.readinto(b) 603 self.assertEqual(n, 0) 604 self.assertTrue(resp.isclosed()) 605 606 def test_partial_reads_incomplete_body(self): 607 # if the server shuts down the connection before the whole 608 # content-length is delivered, the socket is gracefully closed 609 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 610 sock = FakeSocket(body) 611 resp = client.HTTPResponse(sock) 612 resp.begin() 613 self.assertEqual(resp.read(2), b'Te') 614 self.assertFalse(resp.isclosed()) 615 self.assertEqual(resp.read(2), b'xt') 616 self.assertEqual(resp.read(1), b'') 617 self.assertTrue(resp.isclosed()) 618 619 def test_partial_readintos_incomplete_body(self): 620 # if the server shuts down the connection before the whole 621 # content-length is delivered, the socket is gracefully closed 622 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 623 sock = FakeSocket(body) 624 resp = client.HTTPResponse(sock) 625 resp.begin() 626 b = bytearray(2) 627 n = resp.readinto(b) 628 self.assertEqual(n, 2) 629 self.assertEqual(bytes(b), b'Te') 630 self.assertFalse(resp.isclosed()) 631 n = resp.readinto(b) 632 self.assertEqual(n, 2) 633 self.assertEqual(bytes(b), b'xt') 634 n = resp.readinto(b) 635 self.assertEqual(n, 0) 636 self.assertTrue(resp.isclosed()) 637 self.assertFalse(resp.closed) 638 resp.close() 639 self.assertTrue(resp.closed) 640 641 def test_host_port(self): 642 # Check invalid host_port 643 644 for hp in ("www.python.org:abc", "user:password@www.python.org"): 645 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 646 647 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 648 "fe80::207:e9ff:fe9b", 8000), 649 ("www.python.org:80", "www.python.org", 80), 650 ("www.python.org:", "www.python.org", 80), 651 ("www.python.org", "www.python.org", 80), 652 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 653 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 654 c = client.HTTPConnection(hp) 655 self.assertEqual(h, c.host) 656 self.assertEqual(p, c.port) 657 658 def test_response_headers(self): 659 # test response with multiple message headers with the same field name. 660 text = ('HTTP/1.1 200 OK\r\n' 661 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 662 'Version="1"; Path="/acme"\r\n' 663 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 664 ' Path="/acme"\r\n' 665 '\r\n' 666 'No body\r\n') 667 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 668 ', ' 669 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 670 s = FakeSocket(text) 671 r = client.HTTPResponse(s) 672 r.begin() 673 cookies = r.getheader("Set-Cookie") 674 self.assertEqual(cookies, hdr) 675 676 def test_read_head(self): 677 # Test that the library doesn't attempt to read any data 678 # from a HEAD request. (Tickles SF bug #622042.) 679 sock = FakeSocket( 680 'HTTP/1.1 200 OK\r\n' 681 'Content-Length: 14432\r\n' 682 '\r\n', 683 NoEOFBytesIO) 684 resp = client.HTTPResponse(sock, method="HEAD") 685 resp.begin() 686 if resp.read(): 687 self.fail("Did not expect response from HEAD request") 688 689 def test_readinto_head(self): 690 # Test that the library doesn't attempt to read any data 691 # from a HEAD request. (Tickles SF bug #622042.) 692 sock = FakeSocket( 693 'HTTP/1.1 200 OK\r\n' 694 'Content-Length: 14432\r\n' 695 '\r\n', 696 NoEOFBytesIO) 697 resp = client.HTTPResponse(sock, method="HEAD") 698 resp.begin() 699 b = bytearray(5) 700 if resp.readinto(b) != 0: 701 self.fail("Did not expect response from HEAD request") 702 self.assertEqual(bytes(b), b'\x00'*5) 703 704 def test_too_many_headers(self): 705 headers = '\r\n'.join('Header%d: foo' % i 706 for i in range(client._MAXHEADERS + 1)) + '\r\n' 707 text = ('HTTP/1.1 200 OK\r\n' + headers) 708 s = FakeSocket(text) 709 r = client.HTTPResponse(s) 710 self.assertRaisesRegex(client.HTTPException, 711 r"got more than \d+ headers", r.begin) 712 713 def test_send_file(self): 714 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 715 b'Accept-Encoding: identity\r\n' 716 b'Transfer-Encoding: chunked\r\n' 717 b'\r\n') 718 719 with open(__file__, 'rb') as body: 720 conn = client.HTTPConnection('example.com') 721 sock = FakeSocket(body) 722 conn.sock = sock 723 conn.request('GET', '/foo', body) 724 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 725 (sock.data[:len(expected)], expected)) 726 727 def test_send(self): 728 expected = b'this is a test this is only a test' 729 conn = client.HTTPConnection('example.com') 730 sock = FakeSocket(None) 731 conn.sock = sock 732 conn.send(expected) 733 self.assertEqual(expected, sock.data) 734 sock.data = b'' 735 conn.send(array.array('b', expected)) 736 self.assertEqual(expected, sock.data) 737 sock.data = b'' 738 conn.send(io.BytesIO(expected)) 739 self.assertEqual(expected, sock.data) 740 741 def test_send_updating_file(self): 742 def data(): 743 yield 'data' 744 yield None 745 yield 'data_two' 746 747 class UpdatingFile(io.TextIOBase): 748 mode = 'r' 749 d = data() 750 def read(self, blocksize=-1): 751 return next(self.d) 752 753 expected = b'data' 754 755 conn = client.HTTPConnection('example.com') 756 sock = FakeSocket("") 757 conn.sock = sock 758 conn.send(UpdatingFile()) 759 self.assertEqual(sock.data, expected) 760 761 762 def test_send_iter(self): 763 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 764 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 765 b'\r\nonetwothree' 766 767 def body(): 768 yield b"one" 769 yield b"two" 770 yield b"three" 771 772 conn = client.HTTPConnection('example.com') 773 sock = FakeSocket("") 774 conn.sock = sock 775 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 776 self.assertEqual(sock.data, expected) 777 778 def test_blocksize_request(self): 779 """Check that request() respects the configured block size.""" 780 blocksize = 8 # For easy debugging. 781 conn = client.HTTPConnection('example.com', blocksize=blocksize) 782 sock = FakeSocket(None) 783 conn.sock = sock 784 expected = b"a" * blocksize + b"b" 785 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 786 self.assertEqual(sock.sendall_calls, 3) 787 body = sock.data.split(b"\r\n\r\n", 1)[1] 788 self.assertEqual(body, expected) 789 790 def test_blocksize_send(self): 791 """Check that send() respects the configured block size.""" 792 blocksize = 8 # For easy debugging. 793 conn = client.HTTPConnection('example.com', blocksize=blocksize) 794 sock = FakeSocket(None) 795 conn.sock = sock 796 expected = b"a" * blocksize + b"b" 797 conn.send(io.BytesIO(expected)) 798 self.assertEqual(sock.sendall_calls, 2) 799 self.assertEqual(sock.data, expected) 800 801 def test_send_type_error(self): 802 # See: Issue #12676 803 conn = client.HTTPConnection('example.com') 804 conn.sock = FakeSocket('') 805 with self.assertRaises(TypeError): 806 conn.request('POST', 'test', conn) 807 808 def test_chunked(self): 809 expected = chunked_expected 810 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 811 resp = client.HTTPResponse(sock, method="GET") 812 resp.begin() 813 self.assertEqual(resp.read(), expected) 814 resp.close() 815 816 # Various read sizes 817 for n in range(1, 12): 818 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 819 resp = client.HTTPResponse(sock, method="GET") 820 resp.begin() 821 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 822 resp.close() 823 824 for x in ('', 'foo\r\n'): 825 sock = FakeSocket(chunked_start + x) 826 resp = client.HTTPResponse(sock, method="GET") 827 resp.begin() 828 try: 829 resp.read() 830 except client.IncompleteRead as i: 831 self.assertEqual(i.partial, expected) 832 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 833 self.assertEqual(repr(i), expected_message) 834 self.assertEqual(str(i), expected_message) 835 else: 836 self.fail('IncompleteRead expected') 837 finally: 838 resp.close() 839 840 def test_readinto_chunked(self): 841 842 expected = chunked_expected 843 nexpected = len(expected) 844 b = bytearray(128) 845 846 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 847 resp = client.HTTPResponse(sock, method="GET") 848 resp.begin() 849 n = resp.readinto(b) 850 self.assertEqual(b[:nexpected], expected) 851 self.assertEqual(n, nexpected) 852 resp.close() 853 854 # Various read sizes 855 for n in range(1, 12): 856 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 857 resp = client.HTTPResponse(sock, method="GET") 858 resp.begin() 859 m = memoryview(b) 860 i = resp.readinto(m[0:n]) 861 i += resp.readinto(m[i:n + i]) 862 i += resp.readinto(m[i:]) 863 self.assertEqual(b[:nexpected], expected) 864 self.assertEqual(i, nexpected) 865 resp.close() 866 867 for x in ('', 'foo\r\n'): 868 sock = FakeSocket(chunked_start + x) 869 resp = client.HTTPResponse(sock, method="GET") 870 resp.begin() 871 try: 872 n = resp.readinto(b) 873 except client.IncompleteRead as i: 874 self.assertEqual(i.partial, expected) 875 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 876 self.assertEqual(repr(i), expected_message) 877 self.assertEqual(str(i), expected_message) 878 else: 879 self.fail('IncompleteRead expected') 880 finally: 881 resp.close() 882 883 def test_chunked_head(self): 884 chunked_start = ( 885 'HTTP/1.1 200 OK\r\n' 886 'Transfer-Encoding: chunked\r\n\r\n' 887 'a\r\n' 888 'hello world\r\n' 889 '1\r\n' 890 'd\r\n' 891 ) 892 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 893 resp = client.HTTPResponse(sock, method="HEAD") 894 resp.begin() 895 self.assertEqual(resp.read(), b'') 896 self.assertEqual(resp.status, 200) 897 self.assertEqual(resp.reason, 'OK') 898 self.assertTrue(resp.isclosed()) 899 self.assertFalse(resp.closed) 900 resp.close() 901 self.assertTrue(resp.closed) 902 903 def test_readinto_chunked_head(self): 904 chunked_start = ( 905 'HTTP/1.1 200 OK\r\n' 906 'Transfer-Encoding: chunked\r\n\r\n' 907 'a\r\n' 908 'hello world\r\n' 909 '1\r\n' 910 'd\r\n' 911 ) 912 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 913 resp = client.HTTPResponse(sock, method="HEAD") 914 resp.begin() 915 b = bytearray(5) 916 n = resp.readinto(b) 917 self.assertEqual(n, 0) 918 self.assertEqual(bytes(b), b'\x00'*5) 919 self.assertEqual(resp.status, 200) 920 self.assertEqual(resp.reason, 'OK') 921 self.assertTrue(resp.isclosed()) 922 self.assertFalse(resp.closed) 923 resp.close() 924 self.assertTrue(resp.closed) 925 926 def test_negative_content_length(self): 927 sock = FakeSocket( 928 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 929 resp = client.HTTPResponse(sock, method="GET") 930 resp.begin() 931 self.assertEqual(resp.read(), b'Hello\r\n') 932 self.assertTrue(resp.isclosed()) 933 934 def test_incomplete_read(self): 935 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 936 resp = client.HTTPResponse(sock, method="GET") 937 resp.begin() 938 try: 939 resp.read() 940 except client.IncompleteRead as i: 941 self.assertEqual(i.partial, b'Hello\r\n') 942 self.assertEqual(repr(i), 943 "IncompleteRead(7 bytes read, 3 more expected)") 944 self.assertEqual(str(i), 945 "IncompleteRead(7 bytes read, 3 more expected)") 946 self.assertTrue(resp.isclosed()) 947 else: 948 self.fail('IncompleteRead expected') 949 950 def test_epipe(self): 951 sock = EPipeSocket( 952 "HTTP/1.0 401 Authorization Required\r\n" 953 "Content-type: text/html\r\n" 954 "WWW-Authenticate: Basic realm=\"example\"\r\n", 955 b"Content-Length") 956 conn = client.HTTPConnection("example.com") 957 conn.sock = sock 958 self.assertRaises(OSError, 959 lambda: conn.request("PUT", "/url", "body")) 960 resp = conn.getresponse() 961 self.assertEqual(401, resp.status) 962 self.assertEqual("Basic realm=\"example\"", 963 resp.getheader("www-authenticate")) 964 965 # Test lines overflowing the max line size (_MAXLINE in http.client) 966 967 def test_overflowing_status_line(self): 968 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 969 resp = client.HTTPResponse(FakeSocket(body)) 970 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 971 972 def test_overflowing_header_line(self): 973 body = ( 974 'HTTP/1.1 200 OK\r\n' 975 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 976 ) 977 resp = client.HTTPResponse(FakeSocket(body)) 978 self.assertRaises(client.LineTooLong, resp.begin) 979 980 def test_overflowing_chunked_line(self): 981 body = ( 982 'HTTP/1.1 200 OK\r\n' 983 'Transfer-Encoding: chunked\r\n\r\n' 984 + '0' * 65536 + 'a\r\n' 985 'hello world\r\n' 986 '0\r\n' 987 '\r\n' 988 ) 989 resp = client.HTTPResponse(FakeSocket(body)) 990 resp.begin() 991 self.assertRaises(client.LineTooLong, resp.read) 992 993 def test_early_eof(self): 994 # Test httpresponse with no \r\n termination, 995 body = "HTTP/1.1 200 Ok" 996 sock = FakeSocket(body) 997 resp = client.HTTPResponse(sock) 998 resp.begin() 999 self.assertEqual(resp.read(), b'') 1000 self.assertTrue(resp.isclosed()) 1001 self.assertFalse(resp.closed) 1002 resp.close() 1003 self.assertTrue(resp.closed) 1004 1005 def test_error_leak(self): 1006 # Test that the socket is not leaked if getresponse() fails 1007 conn = client.HTTPConnection('example.com') 1008 response = None 1009 class Response(client.HTTPResponse): 1010 def __init__(self, *pos, **kw): 1011 nonlocal response 1012 response = self # Avoid garbage collector closing the socket 1013 client.HTTPResponse.__init__(self, *pos, **kw) 1014 conn.response_class = Response 1015 conn.sock = FakeSocket('Invalid status line') 1016 conn.request('GET', '/') 1017 self.assertRaises(client.BadStatusLine, conn.getresponse) 1018 self.assertTrue(response.closed) 1019 self.assertTrue(conn.sock.file_closed) 1020 1021 def test_chunked_extension(self): 1022 extra = '3;foo=bar\r\n' + 'abc\r\n' 1023 expected = chunked_expected + b'abc' 1024 1025 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1026 resp = client.HTTPResponse(sock, method="GET") 1027 resp.begin() 1028 self.assertEqual(resp.read(), expected) 1029 resp.close() 1030 1031 def test_chunked_missing_end(self): 1032 """some servers may serve up a short chunked encoding stream""" 1033 expected = chunked_expected 1034 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1035 resp = client.HTTPResponse(sock, method="GET") 1036 resp.begin() 1037 self.assertEqual(resp.read(), expected) 1038 resp.close() 1039 1040 def test_chunked_trailers(self): 1041 """See that trailers are read and ignored""" 1042 expected = chunked_expected 1043 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1044 resp = client.HTTPResponse(sock, method="GET") 1045 resp.begin() 1046 self.assertEqual(resp.read(), expected) 1047 # we should have reached the end of the file 1048 self.assertEqual(sock.file.read(), b"") #we read to the end 1049 resp.close() 1050 1051 def test_chunked_sync(self): 1052 """Check that we don't read past the end of the chunked-encoding stream""" 1053 expected = chunked_expected 1054 extradata = "extradata" 1055 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1056 resp = client.HTTPResponse(sock, method="GET") 1057 resp.begin() 1058 self.assertEqual(resp.read(), expected) 1059 # the file should now have our extradata ready to be read 1060 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1061 resp.close() 1062 1063 def test_content_length_sync(self): 1064 """Check that we don't read past the end of the Content-Length stream""" 1065 extradata = b"extradata" 1066 expected = b"Hello123\r\n" 1067 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1068 resp = client.HTTPResponse(sock, method="GET") 1069 resp.begin() 1070 self.assertEqual(resp.read(), expected) 1071 # the file should now have our extradata ready to be read 1072 self.assertEqual(sock.file.read(), extradata) #we read to the end 1073 resp.close() 1074 1075 def test_readlines_content_length(self): 1076 extradata = b"extradata" 1077 expected = b"Hello123\r\n" 1078 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1079 resp = client.HTTPResponse(sock, method="GET") 1080 resp.begin() 1081 self.assertEqual(resp.readlines(2000), [expected]) 1082 # the file should now have our extradata ready to be read 1083 self.assertEqual(sock.file.read(), extradata) #we read to the end 1084 resp.close() 1085 1086 def test_read1_content_length(self): 1087 extradata = b"extradata" 1088 expected = b"Hello123\r\n" 1089 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1090 resp = client.HTTPResponse(sock, method="GET") 1091 resp.begin() 1092 self.assertEqual(resp.read1(2000), expected) 1093 # the file should now have our extradata ready to be read 1094 self.assertEqual(sock.file.read(), extradata) #we read to the end 1095 resp.close() 1096 1097 def test_readline_bound_content_length(self): 1098 extradata = b"extradata" 1099 expected = b"Hello123\r\n" 1100 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1101 resp = client.HTTPResponse(sock, method="GET") 1102 resp.begin() 1103 self.assertEqual(resp.readline(10), expected) 1104 self.assertEqual(resp.readline(10), b"") 1105 # the file should now have our extradata ready to be read 1106 self.assertEqual(sock.file.read(), extradata) #we read to the end 1107 resp.close() 1108 1109 def test_read1_bound_content_length(self): 1110 extradata = b"extradata" 1111 expected = b"Hello123\r\n" 1112 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1113 resp = client.HTTPResponse(sock, method="GET") 1114 resp.begin() 1115 self.assertEqual(resp.read1(20), expected*2) 1116 self.assertEqual(resp.read(), expected) 1117 # the file should now have our extradata ready to be read 1118 self.assertEqual(sock.file.read(), extradata) #we read to the end 1119 resp.close() 1120 1121 def test_response_fileno(self): 1122 # Make sure fd returned by fileno is valid. 1123 serv = socket.create_server((HOST, 0)) 1124 self.addCleanup(serv.close) 1125 1126 result = None 1127 def run_server(): 1128 [conn, address] = serv.accept() 1129 with conn, conn.makefile("rb") as reader: 1130 # Read the request header until a blank line 1131 while True: 1132 line = reader.readline() 1133 if not line.rstrip(b"\r\n"): 1134 break 1135 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1136 nonlocal result 1137 result = reader.read() 1138 1139 thread = threading.Thread(target=run_server) 1140 thread.start() 1141 self.addCleanup(thread.join, float(1)) 1142 conn = client.HTTPConnection(*serv.getsockname()) 1143 conn.request("CONNECT", "dummy:1234") 1144 response = conn.getresponse() 1145 try: 1146 self.assertEqual(response.status, client.OK) 1147 s = socket.socket(fileno=response.fileno()) 1148 try: 1149 s.sendall(b"proxied data\n") 1150 finally: 1151 s.detach() 1152 finally: 1153 response.close() 1154 conn.close() 1155 thread.join() 1156 self.assertEqual(result, b"proxied data\n") 1157 1158 def test_putrequest_override_validation(self): 1159 """ 1160 It should be possible to override the default validation 1161 behavior in putrequest (bpo-38216). 1162 """ 1163 class UnsafeHTTPConnection(client.HTTPConnection): 1164 def _validate_path(self, url): 1165 pass 1166 1167 conn = UnsafeHTTPConnection('example.com') 1168 conn.sock = FakeSocket('') 1169 conn.putrequest('GET', '/\x00') 1170 1171 def test_putrequest_override_encoding(self): 1172 """ 1173 It should be possible to override the default encoding 1174 to transmit bytes in another encoding even if invalid 1175 (bpo-36274). 1176 """ 1177 class UnsafeHTTPConnection(client.HTTPConnection): 1178 def _encode_request(self, str_url): 1179 return str_url.encode('utf-8') 1180 1181 conn = UnsafeHTTPConnection('example.com') 1182 conn.sock = FakeSocket('') 1183 conn.putrequest('GET', '/☃') 1184 1185 1186class ExtendedReadTest(TestCase): 1187 """ 1188 Test peek(), read1(), readline() 1189 """ 1190 lines = ( 1191 'HTTP/1.1 200 OK\r\n' 1192 '\r\n' 1193 'hello world!\n' 1194 'and now \n' 1195 'for something completely different\n' 1196 'foo' 1197 ) 1198 lines_expected = lines[lines.find('hello'):].encode("ascii") 1199 lines_chunked = ( 1200 'HTTP/1.1 200 OK\r\n' 1201 'Transfer-Encoding: chunked\r\n\r\n' 1202 'a\r\n' 1203 'hello worl\r\n' 1204 '3\r\n' 1205 'd!\n\r\n' 1206 '9\r\n' 1207 'and now \n\r\n' 1208 '23\r\n' 1209 'for something completely different\n\r\n' 1210 '3\r\n' 1211 'foo\r\n' 1212 '0\r\n' # terminating chunk 1213 '\r\n' # end of trailers 1214 ) 1215 1216 def setUp(self): 1217 sock = FakeSocket(self.lines) 1218 resp = client.HTTPResponse(sock, method="GET") 1219 resp.begin() 1220 resp.fp = io.BufferedReader(resp.fp) 1221 self.resp = resp 1222 1223 1224 1225 def test_peek(self): 1226 resp = self.resp 1227 # patch up the buffered peek so that it returns not too much stuff 1228 oldpeek = resp.fp.peek 1229 def mypeek(n=-1): 1230 p = oldpeek(n) 1231 if n >= 0: 1232 return p[:n] 1233 return p[:10] 1234 resp.fp.peek = mypeek 1235 1236 all = [] 1237 while True: 1238 # try a short peek 1239 p = resp.peek(3) 1240 if p: 1241 self.assertGreater(len(p), 0) 1242 # then unbounded peek 1243 p2 = resp.peek() 1244 self.assertGreaterEqual(len(p2), len(p)) 1245 self.assertTrue(p2.startswith(p)) 1246 next = resp.read(len(p2)) 1247 self.assertEqual(next, p2) 1248 else: 1249 next = resp.read() 1250 self.assertFalse(next) 1251 all.append(next) 1252 if not next: 1253 break 1254 self.assertEqual(b"".join(all), self.lines_expected) 1255 1256 def test_readline(self): 1257 resp = self.resp 1258 self._verify_readline(self.resp.readline, self.lines_expected) 1259 1260 def _verify_readline(self, readline, expected): 1261 all = [] 1262 while True: 1263 # short readlines 1264 line = readline(5) 1265 if line and line != b"foo": 1266 if len(line) < 5: 1267 self.assertTrue(line.endswith(b"\n")) 1268 all.append(line) 1269 if not line: 1270 break 1271 self.assertEqual(b"".join(all), expected) 1272 1273 def test_read1(self): 1274 resp = self.resp 1275 def r(): 1276 res = resp.read1(4) 1277 self.assertLessEqual(len(res), 4) 1278 return res 1279 readliner = Readliner(r) 1280 self._verify_readline(readliner.readline, self.lines_expected) 1281 1282 def test_read1_unbounded(self): 1283 resp = self.resp 1284 all = [] 1285 while True: 1286 data = resp.read1() 1287 if not data: 1288 break 1289 all.append(data) 1290 self.assertEqual(b"".join(all), self.lines_expected) 1291 1292 def test_read1_bounded(self): 1293 resp = self.resp 1294 all = [] 1295 while True: 1296 data = resp.read1(10) 1297 if not data: 1298 break 1299 self.assertLessEqual(len(data), 10) 1300 all.append(data) 1301 self.assertEqual(b"".join(all), self.lines_expected) 1302 1303 def test_read1_0(self): 1304 self.assertEqual(self.resp.read1(0), b"") 1305 1306 def test_peek_0(self): 1307 p = self.resp.peek(0) 1308 self.assertLessEqual(0, len(p)) 1309 1310 1311class ExtendedReadTestChunked(ExtendedReadTest): 1312 """ 1313 Test peek(), read1(), readline() in chunked mode 1314 """ 1315 lines = ( 1316 'HTTP/1.1 200 OK\r\n' 1317 'Transfer-Encoding: chunked\r\n\r\n' 1318 'a\r\n' 1319 'hello worl\r\n' 1320 '3\r\n' 1321 'd!\n\r\n' 1322 '9\r\n' 1323 'and now \n\r\n' 1324 '23\r\n' 1325 'for something completely different\n\r\n' 1326 '3\r\n' 1327 'foo\r\n' 1328 '0\r\n' # terminating chunk 1329 '\r\n' # end of trailers 1330 ) 1331 1332 1333class Readliner: 1334 """ 1335 a simple readline class that uses an arbitrary read function and buffering 1336 """ 1337 def __init__(self, readfunc): 1338 self.readfunc = readfunc 1339 self.remainder = b"" 1340 1341 def readline(self, limit): 1342 data = [] 1343 datalen = 0 1344 read = self.remainder 1345 try: 1346 while True: 1347 idx = read.find(b'\n') 1348 if idx != -1: 1349 break 1350 if datalen + len(read) >= limit: 1351 idx = limit - datalen - 1 1352 # read more data 1353 data.append(read) 1354 read = self.readfunc() 1355 if not read: 1356 idx = 0 #eof condition 1357 break 1358 idx += 1 1359 data.append(read[:idx]) 1360 self.remainder = read[idx:] 1361 return b"".join(data) 1362 except: 1363 self.remainder = b"".join(data) 1364 raise 1365 1366 1367class OfflineTest(TestCase): 1368 def test_all(self): 1369 # Documented objects defined in the module should be in __all__ 1370 expected = {"responses"} # White-list documented dict() object 1371 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1372 # intentionally omitted for simplicity 1373 blacklist = {"HTTPMessage", "parse_headers"} 1374 for name in dir(client): 1375 if name.startswith("_") or name in blacklist: 1376 continue 1377 module_object = getattr(client, name) 1378 if getattr(module_object, "__module__", None) == "http.client": 1379 expected.add(name) 1380 self.assertCountEqual(client.__all__, expected) 1381 1382 def test_responses(self): 1383 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1384 1385 def test_client_constants(self): 1386 # Make sure we don't break backward compatibility with 3.4 1387 expected = [ 1388 'CONTINUE', 1389 'SWITCHING_PROTOCOLS', 1390 'PROCESSING', 1391 'OK', 1392 'CREATED', 1393 'ACCEPTED', 1394 'NON_AUTHORITATIVE_INFORMATION', 1395 'NO_CONTENT', 1396 'RESET_CONTENT', 1397 'PARTIAL_CONTENT', 1398 'MULTI_STATUS', 1399 'IM_USED', 1400 'MULTIPLE_CHOICES', 1401 'MOVED_PERMANENTLY', 1402 'FOUND', 1403 'SEE_OTHER', 1404 'NOT_MODIFIED', 1405 'USE_PROXY', 1406 'TEMPORARY_REDIRECT', 1407 'BAD_REQUEST', 1408 'UNAUTHORIZED', 1409 'PAYMENT_REQUIRED', 1410 'FORBIDDEN', 1411 'NOT_FOUND', 1412 'METHOD_NOT_ALLOWED', 1413 'NOT_ACCEPTABLE', 1414 'PROXY_AUTHENTICATION_REQUIRED', 1415 'REQUEST_TIMEOUT', 1416 'CONFLICT', 1417 'GONE', 1418 'LENGTH_REQUIRED', 1419 'PRECONDITION_FAILED', 1420 'REQUEST_ENTITY_TOO_LARGE', 1421 'REQUEST_URI_TOO_LONG', 1422 'UNSUPPORTED_MEDIA_TYPE', 1423 'REQUESTED_RANGE_NOT_SATISFIABLE', 1424 'EXPECTATION_FAILED', 1425 'MISDIRECTED_REQUEST', 1426 'UNPROCESSABLE_ENTITY', 1427 'LOCKED', 1428 'FAILED_DEPENDENCY', 1429 'UPGRADE_REQUIRED', 1430 'PRECONDITION_REQUIRED', 1431 'TOO_MANY_REQUESTS', 1432 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1433 'UNAVAILABLE_FOR_LEGAL_REASONS', 1434 'INTERNAL_SERVER_ERROR', 1435 'NOT_IMPLEMENTED', 1436 'BAD_GATEWAY', 1437 'SERVICE_UNAVAILABLE', 1438 'GATEWAY_TIMEOUT', 1439 'HTTP_VERSION_NOT_SUPPORTED', 1440 'INSUFFICIENT_STORAGE', 1441 'NOT_EXTENDED', 1442 'NETWORK_AUTHENTICATION_REQUIRED', 1443 ] 1444 for const in expected: 1445 with self.subTest(constant=const): 1446 self.assertTrue(hasattr(client, const)) 1447 1448 1449class SourceAddressTest(TestCase): 1450 def setUp(self): 1451 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1452 self.port = support.bind_port(self.serv) 1453 self.source_port = support.find_unused_port() 1454 self.serv.listen() 1455 self.conn = None 1456 1457 def tearDown(self): 1458 if self.conn: 1459 self.conn.close() 1460 self.conn = None 1461 self.serv.close() 1462 self.serv = None 1463 1464 def testHTTPConnectionSourceAddress(self): 1465 self.conn = client.HTTPConnection(HOST, self.port, 1466 source_address=('', self.source_port)) 1467 self.conn.connect() 1468 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1469 1470 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1471 'http.client.HTTPSConnection not defined') 1472 def testHTTPSConnectionSourceAddress(self): 1473 self.conn = client.HTTPSConnection(HOST, self.port, 1474 source_address=('', self.source_port)) 1475 # We don't test anything here other than the constructor not barfing as 1476 # this code doesn't deal with setting up an active running SSL server 1477 # for an ssl_wrapped connect() to actually return from. 1478 1479 1480class TimeoutTest(TestCase): 1481 PORT = None 1482 1483 def setUp(self): 1484 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1485 TimeoutTest.PORT = support.bind_port(self.serv) 1486 self.serv.listen() 1487 1488 def tearDown(self): 1489 self.serv.close() 1490 self.serv = None 1491 1492 def testTimeoutAttribute(self): 1493 # This will prove that the timeout gets through HTTPConnection 1494 # and into the socket. 1495 1496 # default -- use global socket timeout 1497 self.assertIsNone(socket.getdefaulttimeout()) 1498 socket.setdefaulttimeout(30) 1499 try: 1500 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1501 httpConn.connect() 1502 finally: 1503 socket.setdefaulttimeout(None) 1504 self.assertEqual(httpConn.sock.gettimeout(), 30) 1505 httpConn.close() 1506 1507 # no timeout -- do not use global socket default 1508 self.assertIsNone(socket.getdefaulttimeout()) 1509 socket.setdefaulttimeout(30) 1510 try: 1511 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1512 timeout=None) 1513 httpConn.connect() 1514 finally: 1515 socket.setdefaulttimeout(None) 1516 self.assertEqual(httpConn.sock.gettimeout(), None) 1517 httpConn.close() 1518 1519 # a value 1520 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1521 httpConn.connect() 1522 self.assertEqual(httpConn.sock.gettimeout(), 30) 1523 httpConn.close() 1524 1525 1526class PersistenceTest(TestCase): 1527 1528 def test_reuse_reconnect(self): 1529 # Should reuse or reconnect depending on header from server 1530 tests = ( 1531 ('1.0', '', False), 1532 ('1.0', 'Connection: keep-alive\r\n', True), 1533 ('1.1', '', True), 1534 ('1.1', 'Connection: close\r\n', False), 1535 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1536 ('1.1', 'Connection: cloSE\r\n', False), 1537 ) 1538 for version, header, reuse in tests: 1539 with self.subTest(version=version, header=header): 1540 msg = ( 1541 'HTTP/{} 200 OK\r\n' 1542 '{}' 1543 'Content-Length: 12\r\n' 1544 '\r\n' 1545 'Dummy body\r\n' 1546 ).format(version, header) 1547 conn = FakeSocketHTTPConnection(msg) 1548 self.assertIsNone(conn.sock) 1549 conn.request('GET', '/open-connection') 1550 with conn.getresponse() as response: 1551 self.assertEqual(conn.sock is None, not reuse) 1552 response.read() 1553 self.assertEqual(conn.sock is None, not reuse) 1554 self.assertEqual(conn.connections, 1) 1555 conn.request('GET', '/subsequent-request') 1556 self.assertEqual(conn.connections, 1 if reuse else 2) 1557 1558 def test_disconnected(self): 1559 1560 def make_reset_reader(text): 1561 """Return BufferedReader that raises ECONNRESET at EOF""" 1562 stream = io.BytesIO(text) 1563 def readinto(buffer): 1564 size = io.BytesIO.readinto(stream, buffer) 1565 if size == 0: 1566 raise ConnectionResetError() 1567 return size 1568 stream.readinto = readinto 1569 return io.BufferedReader(stream) 1570 1571 tests = ( 1572 (io.BytesIO, client.RemoteDisconnected), 1573 (make_reset_reader, ConnectionResetError), 1574 ) 1575 for stream_factory, exception in tests: 1576 with self.subTest(exception=exception): 1577 conn = FakeSocketHTTPConnection(b'', stream_factory) 1578 conn.request('GET', '/eof-response') 1579 self.assertRaises(exception, conn.getresponse) 1580 self.assertIsNone(conn.sock) 1581 # HTTPConnection.connect() should be automatically invoked 1582 conn.request('GET', '/reconnect') 1583 self.assertEqual(conn.connections, 2) 1584 1585 def test_100_close(self): 1586 conn = FakeSocketHTTPConnection( 1587 b'HTTP/1.1 100 Continue\r\n' 1588 b'\r\n' 1589 # Missing final response 1590 ) 1591 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1592 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1593 self.assertIsNone(conn.sock) 1594 conn.request('GET', '/reconnect') 1595 self.assertEqual(conn.connections, 2) 1596 1597 1598class HTTPSTest(TestCase): 1599 1600 def setUp(self): 1601 if not hasattr(client, 'HTTPSConnection'): 1602 self.skipTest('ssl support required') 1603 1604 def make_server(self, certfile): 1605 from test.ssl_servers import make_https_server 1606 return make_https_server(self, certfile=certfile) 1607 1608 def test_attributes(self): 1609 # simple test to check it's storing the timeout 1610 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1611 self.assertEqual(h.timeout, 30) 1612 1613 def test_networked(self): 1614 # Default settings: requires a valid cert from a trusted CA 1615 import ssl 1616 support.requires('network') 1617 with support.transient_internet('self-signed.pythontest.net'): 1618 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1619 with self.assertRaises(ssl.SSLError) as exc_info: 1620 h.request('GET', '/') 1621 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1622 1623 def test_networked_noverification(self): 1624 # Switch off cert verification 1625 import ssl 1626 support.requires('network') 1627 with support.transient_internet('self-signed.pythontest.net'): 1628 context = ssl._create_unverified_context() 1629 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1630 context=context) 1631 h.request('GET', '/') 1632 resp = h.getresponse() 1633 h.close() 1634 self.assertIn('nginx', resp.getheader('server')) 1635 resp.close() 1636 1637 @support.system_must_validate_cert 1638 def test_networked_trusted_by_default_cert(self): 1639 # Default settings: requires a valid cert from a trusted CA 1640 support.requires('network') 1641 with support.transient_internet('www.python.org'): 1642 h = client.HTTPSConnection('www.python.org', 443) 1643 h.request('GET', '/') 1644 resp = h.getresponse() 1645 content_type = resp.getheader('content-type') 1646 resp.close() 1647 h.close() 1648 self.assertIn('text/html', content_type) 1649 1650 def test_networked_good_cert(self): 1651 # We feed the server's cert as a validating cert 1652 import ssl 1653 support.requires('network') 1654 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1655 with support.transient_internet(selfsigned_pythontestdotnet): 1656 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1657 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1658 self.assertEqual(context.check_hostname, True) 1659 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1660 try: 1661 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1662 context=context) 1663 h.request('GET', '/') 1664 resp = h.getresponse() 1665 except ssl.SSLError as ssl_err: 1666 ssl_err_str = str(ssl_err) 1667 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1668 # modern Linux distros (Debian Buster, etc) default OpenSSL 1669 # configurations it'll fail saying "key too weak" until we 1670 # address https://bugs.python.org/issue36816 to use a proper 1671 # key size on self-signed.pythontest.net. 1672 if re.search(r'(?i)key.too.weak', ssl_err_str): 1673 raise unittest.SkipTest( 1674 f'Got {ssl_err_str} trying to connect ' 1675 f'to {selfsigned_pythontestdotnet}. ' 1676 'See https://bugs.python.org/issue36816.') 1677 raise 1678 server_string = resp.getheader('server') 1679 resp.close() 1680 h.close() 1681 self.assertIn('nginx', server_string) 1682 1683 def test_networked_bad_cert(self): 1684 # We feed a "CA" cert that is unrelated to the server's cert 1685 import ssl 1686 support.requires('network') 1687 with support.transient_internet('self-signed.pythontest.net'): 1688 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1689 context.load_verify_locations(CERT_localhost) 1690 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1691 with self.assertRaises(ssl.SSLError) as exc_info: 1692 h.request('GET', '/') 1693 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1694 1695 def test_local_unknown_cert(self): 1696 # The custom cert isn't known to the default trust bundle 1697 import ssl 1698 server = self.make_server(CERT_localhost) 1699 h = client.HTTPSConnection('localhost', server.port) 1700 with self.assertRaises(ssl.SSLError) as exc_info: 1701 h.request('GET', '/') 1702 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1703 1704 def test_local_good_hostname(self): 1705 # The (valid) cert validates the HTTP hostname 1706 import ssl 1707 server = self.make_server(CERT_localhost) 1708 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1709 context.load_verify_locations(CERT_localhost) 1710 h = client.HTTPSConnection('localhost', server.port, context=context) 1711 self.addCleanup(h.close) 1712 h.request('GET', '/nonexistent') 1713 resp = h.getresponse() 1714 self.addCleanup(resp.close) 1715 self.assertEqual(resp.status, 404) 1716 1717 def test_local_bad_hostname(self): 1718 # The (valid) cert doesn't validate the HTTP hostname 1719 import ssl 1720 server = self.make_server(CERT_fakehostname) 1721 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1722 context.load_verify_locations(CERT_fakehostname) 1723 h = client.HTTPSConnection('localhost', server.port, context=context) 1724 with self.assertRaises(ssl.CertificateError): 1725 h.request('GET', '/') 1726 # Same with explicit check_hostname=True 1727 with support.check_warnings(('', DeprecationWarning)): 1728 h = client.HTTPSConnection('localhost', server.port, 1729 context=context, check_hostname=True) 1730 with self.assertRaises(ssl.CertificateError): 1731 h.request('GET', '/') 1732 # With check_hostname=False, the mismatching is ignored 1733 context.check_hostname = False 1734 with support.check_warnings(('', DeprecationWarning)): 1735 h = client.HTTPSConnection('localhost', server.port, 1736 context=context, check_hostname=False) 1737 h.request('GET', '/nonexistent') 1738 resp = h.getresponse() 1739 resp.close() 1740 h.close() 1741 self.assertEqual(resp.status, 404) 1742 # The context's check_hostname setting is used if one isn't passed to 1743 # HTTPSConnection. 1744 context.check_hostname = False 1745 h = client.HTTPSConnection('localhost', server.port, context=context) 1746 h.request('GET', '/nonexistent') 1747 resp = h.getresponse() 1748 self.assertEqual(resp.status, 404) 1749 resp.close() 1750 h.close() 1751 # Passing check_hostname to HTTPSConnection should override the 1752 # context's setting. 1753 with support.check_warnings(('', DeprecationWarning)): 1754 h = client.HTTPSConnection('localhost', server.port, 1755 context=context, check_hostname=True) 1756 with self.assertRaises(ssl.CertificateError): 1757 h.request('GET', '/') 1758 1759 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1760 'http.client.HTTPSConnection not available') 1761 def test_host_port(self): 1762 # Check invalid host_port 1763 1764 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1765 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1766 1767 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1768 "fe80::207:e9ff:fe9b", 8000), 1769 ("www.python.org:443", "www.python.org", 443), 1770 ("www.python.org:", "www.python.org", 443), 1771 ("www.python.org", "www.python.org", 443), 1772 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1773 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1774 443)): 1775 c = client.HTTPSConnection(hp) 1776 self.assertEqual(h, c.host) 1777 self.assertEqual(p, c.port) 1778 1779 def test_tls13_pha(self): 1780 import ssl 1781 if not ssl.HAS_TLSv1_3: 1782 self.skipTest('TLS 1.3 support required') 1783 # just check status of PHA flag 1784 h = client.HTTPSConnection('localhost', 443) 1785 self.assertTrue(h._context.post_handshake_auth) 1786 1787 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1788 self.assertFalse(context.post_handshake_auth) 1789 h = client.HTTPSConnection('localhost', 443, context=context) 1790 self.assertIs(h._context, context) 1791 self.assertFalse(h._context.post_handshake_auth) 1792 1793 with warnings.catch_warnings(): 1794 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1795 DeprecationWarning) 1796 h = client.HTTPSConnection('localhost', 443, context=context, 1797 cert_file=CERT_localhost) 1798 self.assertTrue(h._context.post_handshake_auth) 1799 1800 1801class RequestBodyTest(TestCase): 1802 """Test cases where a request includes a message body.""" 1803 1804 def setUp(self): 1805 self.conn = client.HTTPConnection('example.com') 1806 self.conn.sock = self.sock = FakeSocket("") 1807 self.conn.sock = self.sock 1808 1809 def get_headers_and_fp(self): 1810 f = io.BytesIO(self.sock.data) 1811 f.readline() # read the request line 1812 message = client.parse_headers(f) 1813 return message, f 1814 1815 def test_list_body(self): 1816 # Note that no content-length is automatically calculated for 1817 # an iterable. The request will fall back to send chunked 1818 # transfer encoding. 1819 cases = ( 1820 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1821 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1822 ) 1823 for body, expected in cases: 1824 with self.subTest(body): 1825 self.conn = client.HTTPConnection('example.com') 1826 self.conn.sock = self.sock = FakeSocket('') 1827 1828 self.conn.request('PUT', '/url', body) 1829 msg, f = self.get_headers_and_fp() 1830 self.assertNotIn('Content-Type', msg) 1831 self.assertNotIn('Content-Length', msg) 1832 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1833 self.assertEqual(expected, f.read()) 1834 1835 def test_manual_content_length(self): 1836 # Set an incorrect content-length so that we can verify that 1837 # it will not be over-ridden by the library. 1838 self.conn.request("PUT", "/url", "body", 1839 {"Content-Length": "42"}) 1840 message, f = self.get_headers_and_fp() 1841 self.assertEqual("42", message.get("content-length")) 1842 self.assertEqual(4, len(f.read())) 1843 1844 def test_ascii_body(self): 1845 self.conn.request("PUT", "/url", "body") 1846 message, f = self.get_headers_and_fp() 1847 self.assertEqual("text/plain", message.get_content_type()) 1848 self.assertIsNone(message.get_charset()) 1849 self.assertEqual("4", message.get("content-length")) 1850 self.assertEqual(b'body', f.read()) 1851 1852 def test_latin1_body(self): 1853 self.conn.request("PUT", "/url", "body\xc1") 1854 message, f = self.get_headers_and_fp() 1855 self.assertEqual("text/plain", message.get_content_type()) 1856 self.assertIsNone(message.get_charset()) 1857 self.assertEqual("5", message.get("content-length")) 1858 self.assertEqual(b'body\xc1', f.read()) 1859 1860 def test_bytes_body(self): 1861 self.conn.request("PUT", "/url", b"body\xc1") 1862 message, f = self.get_headers_and_fp() 1863 self.assertEqual("text/plain", message.get_content_type()) 1864 self.assertIsNone(message.get_charset()) 1865 self.assertEqual("5", message.get("content-length")) 1866 self.assertEqual(b'body\xc1', f.read()) 1867 1868 def test_text_file_body(self): 1869 self.addCleanup(support.unlink, support.TESTFN) 1870 with open(support.TESTFN, "w") as f: 1871 f.write("body") 1872 with open(support.TESTFN) as f: 1873 self.conn.request("PUT", "/url", f) 1874 message, f = self.get_headers_and_fp() 1875 self.assertEqual("text/plain", message.get_content_type()) 1876 self.assertIsNone(message.get_charset()) 1877 # No content-length will be determined for files; the body 1878 # will be sent using chunked transfer encoding instead. 1879 self.assertIsNone(message.get("content-length")) 1880 self.assertEqual("chunked", message.get("transfer-encoding")) 1881 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1882 1883 def test_binary_file_body(self): 1884 self.addCleanup(support.unlink, support.TESTFN) 1885 with open(support.TESTFN, "wb") as f: 1886 f.write(b"body\xc1") 1887 with open(support.TESTFN, "rb") as f: 1888 self.conn.request("PUT", "/url", f) 1889 message, f = self.get_headers_and_fp() 1890 self.assertEqual("text/plain", message.get_content_type()) 1891 self.assertIsNone(message.get_charset()) 1892 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1893 self.assertNotIn("Content-Length", message) 1894 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1895 1896 1897class HTTPResponseTest(TestCase): 1898 1899 def setUp(self): 1900 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1901 second-value\r\n\r\nText" 1902 sock = FakeSocket(body) 1903 self.resp = client.HTTPResponse(sock) 1904 self.resp.begin() 1905 1906 def test_getting_header(self): 1907 header = self.resp.getheader('My-Header') 1908 self.assertEqual(header, 'first-value, second-value') 1909 1910 header = self.resp.getheader('My-Header', 'some default') 1911 self.assertEqual(header, 'first-value, second-value') 1912 1913 def test_getting_nonexistent_header_with_string_default(self): 1914 header = self.resp.getheader('No-Such-Header', 'default-value') 1915 self.assertEqual(header, 'default-value') 1916 1917 def test_getting_nonexistent_header_with_iterable_default(self): 1918 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1919 self.assertEqual(header, 'default, values') 1920 1921 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1922 self.assertEqual(header, 'default, values') 1923 1924 def test_getting_nonexistent_header_without_default(self): 1925 header = self.resp.getheader('No-Such-Header') 1926 self.assertEqual(header, None) 1927 1928 def test_getting_header_defaultint(self): 1929 header = self.resp.getheader('No-Such-Header',default=42) 1930 self.assertEqual(header, 42) 1931 1932class TunnelTests(TestCase): 1933 def setUp(self): 1934 response_text = ( 1935 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1936 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1937 'Content-Length: 42\r\n\r\n' 1938 ) 1939 self.host = 'proxy.com' 1940 self.conn = client.HTTPConnection(self.host) 1941 self.conn._create_connection = self._create_connection(response_text) 1942 1943 def tearDown(self): 1944 self.conn.close() 1945 1946 def _create_connection(self, response_text): 1947 def create_connection(address, timeout=None, source_address=None): 1948 return FakeSocket(response_text, host=address[0], port=address[1]) 1949 return create_connection 1950 1951 def test_set_tunnel_host_port_headers(self): 1952 tunnel_host = 'destination.com' 1953 tunnel_port = 8888 1954 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 1955 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 1956 headers=tunnel_headers) 1957 self.conn.request('HEAD', '/', '') 1958 self.assertEqual(self.conn.sock.host, self.host) 1959 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1960 self.assertEqual(self.conn._tunnel_host, tunnel_host) 1961 self.assertEqual(self.conn._tunnel_port, tunnel_port) 1962 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 1963 1964 def test_disallow_set_tunnel_after_connect(self): 1965 # Once connected, we shouldn't be able to tunnel anymore 1966 self.conn.connect() 1967 self.assertRaises(RuntimeError, self.conn.set_tunnel, 1968 'destination.com') 1969 1970 def test_connect_with_tunnel(self): 1971 self.conn.set_tunnel('destination.com') 1972 self.conn.request('HEAD', '/', '') 1973 self.assertEqual(self.conn.sock.host, self.host) 1974 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1975 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1976 # issue22095 1977 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 1978 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1979 1980 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 1981 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 1982 1983 def test_connect_put_request(self): 1984 self.conn.set_tunnel('destination.com') 1985 self.conn.request('PUT', '/', '') 1986 self.assertEqual(self.conn.sock.host, self.host) 1987 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1988 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1989 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1990 1991 def test_tunnel_debuglog(self): 1992 expected_header = 'X-Dummy: 1' 1993 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 1994 1995 self.conn.set_debuglevel(1) 1996 self.conn._create_connection = self._create_connection(response_text) 1997 self.conn.set_tunnel('destination.com') 1998 1999 with support.captured_stdout() as output: 2000 self.conn.request('PUT', '/', '') 2001 lines = output.getvalue().splitlines() 2002 self.assertIn('header: {}'.format(expected_header), lines) 2003 2004 2005if __name__ == '__main__': 2006 unittest.main(verbosity=2) 2007