1import errno 2from http import client 3import io 4import itertools 5import os 6import array 7import socket 8import threading 9 10import unittest 11TestCase = unittest.TestCase 12 13from test import support 14 15here = os.path.dirname(__file__) 16# Self-signed cert file for 'localhost' 17CERT_localhost = os.path.join(here, 'keycert.pem') 18# Self-signed cert file for 'fakehostname' 19CERT_fakehostname = os.path.join(here, 'keycert2.pem') 20# Self-signed cert file for self-signed.pythontest.net 21CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 22 23# constants for testing chunked encoding 24chunked_start = ( 25 'HTTP/1.1 200 OK\r\n' 26 'Transfer-Encoding: chunked\r\n\r\n' 27 'a\r\n' 28 'hello worl\r\n' 29 '3\r\n' 30 'd! \r\n' 31 '8\r\n' 32 'and now \r\n' 33 '22\r\n' 34 'for something completely different\r\n' 35) 36chunked_expected = b'hello world! and now for something completely different' 37chunk_extension = ";foo=bar" 38last_chunk = "0\r\n" 39last_chunk_extended = "0" + chunk_extension + "\r\n" 40trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 41chunked_end = "\r\n" 42 43HOST = support.HOST 44 45class FakeSocket: 46 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 47 if isinstance(text, str): 48 text = text.encode("ascii") 49 self.text = text 50 self.fileclass = fileclass 51 self.data = b'' 52 self.sendall_calls = 0 53 self.file_closed = False 54 self.host = host 55 self.port = port 56 57 def sendall(self, data): 58 self.sendall_calls += 1 59 self.data += data 60 61 def makefile(self, mode, bufsize=None): 62 if mode != 'r' and mode != 'rb': 63 raise client.UnimplementedFileMode() 64 # keep the file around so we can check how much was read from it 65 self.file = self.fileclass(self.text) 66 self.file.close = self.file_close #nerf close () 67 return self.file 68 69 def file_close(self): 70 self.file_closed = True 71 72 def close(self): 73 pass 74 75 def setsockopt(self, level, optname, value): 76 pass 77 78class EPipeSocket(FakeSocket): 79 80 def __init__(self, text, pipe_trigger): 81 # When sendall() is called with pipe_trigger, raise EPIPE. 82 FakeSocket.__init__(self, text) 83 self.pipe_trigger = pipe_trigger 84 85 def sendall(self, data): 86 if self.pipe_trigger in data: 87 raise OSError(errno.EPIPE, "gotcha") 88 self.data += data 89 90 def close(self): 91 pass 92 93class NoEOFBytesIO(io.BytesIO): 94 """Like BytesIO, but raises AssertionError on EOF. 95 96 This is used below to test that http.client doesn't try to read 97 more from the underlying file than it should. 98 """ 99 def read(self, n=-1): 100 data = io.BytesIO.read(self, n) 101 if data == b'': 102 raise AssertionError('caller tried to read past EOF') 103 return data 104 105 def readline(self, length=None): 106 data = io.BytesIO.readline(self, length) 107 if data == b'': 108 raise AssertionError('caller tried to read past EOF') 109 return data 110 111class FakeSocketHTTPConnection(client.HTTPConnection): 112 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 113 114 def __init__(self, *args): 115 self.connections = 0 116 super().__init__('example.com') 117 self.fake_socket_args = args 118 self._create_connection = self.create_connection 119 120 def connect(self): 121 """Count the number of times connect() is invoked""" 122 self.connections += 1 123 return super().connect() 124 125 def create_connection(self, *pos, **kw): 126 return FakeSocket(*self.fake_socket_args) 127 128class HeaderTests(TestCase): 129 def test_auto_headers(self): 130 # Some headers are added automatically, but should not be added by 131 # .request() if they are explicitly set. 132 133 class HeaderCountingBuffer(list): 134 def __init__(self): 135 self.count = {} 136 def append(self, item): 137 kv = item.split(b':') 138 if len(kv) > 1: 139 # item is a 'Key: Value' header string 140 lcKey = kv[0].decode('ascii').lower() 141 self.count.setdefault(lcKey, 0) 142 self.count[lcKey] += 1 143 list.append(self, item) 144 145 for explicit_header in True, False: 146 for header in 'Content-length', 'Host', 'Accept-encoding': 147 conn = client.HTTPConnection('example.com') 148 conn.sock = FakeSocket('blahblahblah') 149 conn._buffer = HeaderCountingBuffer() 150 151 body = 'spamspamspam' 152 headers = {} 153 if explicit_header: 154 headers[header] = str(len(body)) 155 conn.request('POST', '/', body, headers) 156 self.assertEqual(conn._buffer.count[header.lower()], 1) 157 158 def test_content_length_0(self): 159 160 class ContentLengthChecker(list): 161 def __init__(self): 162 list.__init__(self) 163 self.content_length = None 164 def append(self, item): 165 kv = item.split(b':', 1) 166 if len(kv) > 1 and kv[0].lower() == b'content-length': 167 self.content_length = kv[1].strip() 168 list.append(self, item) 169 170 # Here, we're testing that methods expecting a body get a 171 # content-length set to zero if the body is empty (either None or '') 172 bodies = (None, '') 173 methods_with_body = ('PUT', 'POST', 'PATCH') 174 for method, body in itertools.product(methods_with_body, bodies): 175 conn = client.HTTPConnection('example.com') 176 conn.sock = FakeSocket(None) 177 conn._buffer = ContentLengthChecker() 178 conn.request(method, '/', body) 179 self.assertEqual( 180 conn._buffer.content_length, b'0', 181 'Header Content-Length incorrect on {}'.format(method) 182 ) 183 184 # For these methods, we make sure that content-length is not set when 185 # the body is None because it might cause unexpected behaviour on the 186 # server. 187 methods_without_body = ( 188 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 189 ) 190 for method in methods_without_body: 191 conn = client.HTTPConnection('example.com') 192 conn.sock = FakeSocket(None) 193 conn._buffer = ContentLengthChecker() 194 conn.request(method, '/', None) 195 self.assertEqual( 196 conn._buffer.content_length, None, 197 'Header Content-Length set for empty body on {}'.format(method) 198 ) 199 200 # If the body is set to '', that's considered to be "present but 201 # empty" rather than "missing", so content length would be set, even 202 # for methods that don't expect a body. 203 for method in methods_without_body: 204 conn = client.HTTPConnection('example.com') 205 conn.sock = FakeSocket(None) 206 conn._buffer = ContentLengthChecker() 207 conn.request(method, '/', '') 208 self.assertEqual( 209 conn._buffer.content_length, b'0', 210 'Header Content-Length incorrect on {}'.format(method) 211 ) 212 213 # If the body is set, make sure Content-Length is set. 214 for method in itertools.chain(methods_without_body, methods_with_body): 215 conn = client.HTTPConnection('example.com') 216 conn.sock = FakeSocket(None) 217 conn._buffer = ContentLengthChecker() 218 conn.request(method, '/', ' ') 219 self.assertEqual( 220 conn._buffer.content_length, b'1', 221 'Header Content-Length incorrect on {}'.format(method) 222 ) 223 224 def test_putheader(self): 225 conn = client.HTTPConnection('example.com') 226 conn.sock = FakeSocket(None) 227 conn.putrequest('GET','/') 228 conn.putheader('Content-length', 42) 229 self.assertIn(b'Content-length: 42', conn._buffer) 230 231 conn.putheader('Foo', ' bar ') 232 self.assertIn(b'Foo: bar ', conn._buffer) 233 conn.putheader('Bar', '\tbaz\t') 234 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 235 conn.putheader('Authorization', 'Bearer mytoken') 236 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 237 conn.putheader('IterHeader', 'IterA', 'IterB') 238 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 239 conn.putheader('LatinHeader', b'\xFF') 240 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 241 conn.putheader('Utf8Header', b'\xc3\x80') 242 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 243 conn.putheader('C1-Control', b'next\x85line') 244 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 245 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 246 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 247 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 248 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 249 conn.putheader('Key Space', 'value') 250 self.assertIn(b'Key Space: value', conn._buffer) 251 conn.putheader('KeySpace ', 'value') 252 self.assertIn(b'KeySpace : value', conn._buffer) 253 conn.putheader(b'Nonbreak\xa0Space', 'value') 254 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 255 conn.putheader(b'\xa0NonbreakSpace', 'value') 256 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 257 258 def test_ipv6host_header(self): 259 # Default host header on IPv6 transaction should be wrapped by [] if 260 # it is an IPv6 address 261 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 262 b'Accept-Encoding: identity\r\n\r\n' 263 conn = client.HTTPConnection('[2001::]:81') 264 sock = FakeSocket('') 265 conn.sock = sock 266 conn.request('GET', '/foo') 267 self.assertTrue(sock.data.startswith(expected)) 268 269 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 270 b'Accept-Encoding: identity\r\n\r\n' 271 conn = client.HTTPConnection('[2001:102A::]') 272 sock = FakeSocket('') 273 conn.sock = sock 274 conn.request('GET', '/foo') 275 self.assertTrue(sock.data.startswith(expected)) 276 277 def test_malformed_headers_coped_with(self): 278 # Issue 19996 279 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 280 sock = FakeSocket(body) 281 resp = client.HTTPResponse(sock) 282 resp.begin() 283 284 self.assertEqual(resp.getheader('First'), 'val') 285 self.assertEqual(resp.getheader('Second'), 'val') 286 287 def test_parse_all_octets(self): 288 # Ensure no valid header field octet breaks the parser 289 body = ( 290 b'HTTP/1.1 200 OK\r\n' 291 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 292 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 293 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 294 b'obs-fold: text\r\n' 295 b' folded with space\r\n' 296 b'\tfolded with tab\r\n' 297 b'Content-Length: 0\r\n' 298 b'\r\n' 299 ) 300 sock = FakeSocket(body) 301 resp = client.HTTPResponse(sock) 302 resp.begin() 303 self.assertEqual(resp.getheader('Content-Length'), '0') 304 self.assertEqual(resp.msg['Content-Length'], '0') 305 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 306 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 307 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 308 self.assertEqual(resp.getheader('VCHAR'), vchar) 309 self.assertEqual(resp.msg['VCHAR'], vchar) 310 self.assertIsNotNone(resp.getheader('obs-text')) 311 self.assertIn('obs-text', resp.msg) 312 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 313 self.assertTrue(folded.startswith('text')) 314 self.assertIn(' folded with space', folded) 315 self.assertTrue(folded.endswith('folded with tab')) 316 317 def test_invalid_headers(self): 318 conn = client.HTTPConnection('example.com') 319 conn.sock = FakeSocket('') 320 conn.putrequest('GET', '/') 321 322 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 323 # longer allowed in header names 324 cases = ( 325 (b'Invalid\r\nName', b'ValidValue'), 326 (b'Invalid\rName', b'ValidValue'), 327 (b'Invalid\nName', b'ValidValue'), 328 (b'\r\nInvalidName', b'ValidValue'), 329 (b'\rInvalidName', b'ValidValue'), 330 (b'\nInvalidName', b'ValidValue'), 331 (b' InvalidName', b'ValidValue'), 332 (b'\tInvalidName', b'ValidValue'), 333 (b'Invalid:Name', b'ValidValue'), 334 (b':InvalidName', b'ValidValue'), 335 (b'ValidName', b'Invalid\r\nValue'), 336 (b'ValidName', b'Invalid\rValue'), 337 (b'ValidName', b'Invalid\nValue'), 338 (b'ValidName', b'InvalidValue\r\n'), 339 (b'ValidName', b'InvalidValue\r'), 340 (b'ValidName', b'InvalidValue\n'), 341 ) 342 for name, value in cases: 343 with self.subTest((name, value)): 344 with self.assertRaisesRegex(ValueError, 'Invalid header'): 345 conn.putheader(name, value) 346 347 def test_headers_debuglevel(self): 348 body = ( 349 b'HTTP/1.1 200 OK\r\n' 350 b'First: val\r\n' 351 b'Second: val\r\n' 352 ) 353 sock = FakeSocket(body) 354 resp = client.HTTPResponse(sock, debuglevel=1) 355 with support.captured_stdout() as output: 356 resp.begin() 357 lines = output.getvalue().splitlines() 358 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 359 self.assertEqual(lines[1], "header: First: val") 360 self.assertEqual(lines[2], "header: Second: val") 361 362 363class TransferEncodingTest(TestCase): 364 expected_body = b"It's just a flesh wound" 365 366 def test_endheaders_chunked(self): 367 conn = client.HTTPConnection('example.com') 368 conn.sock = FakeSocket(b'') 369 conn.putrequest('POST', '/') 370 conn.endheaders(self._make_body(), encode_chunked=True) 371 372 _, _, body = self._parse_request(conn.sock.data) 373 body = self._parse_chunked(body) 374 self.assertEqual(body, self.expected_body) 375 376 def test_explicit_headers(self): 377 # explicit chunked 378 conn = client.HTTPConnection('example.com') 379 conn.sock = FakeSocket(b'') 380 # this shouldn't actually be automatically chunk-encoded because the 381 # calling code has explicitly stated that it's taking care of it 382 conn.request( 383 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 384 385 _, headers, body = self._parse_request(conn.sock.data) 386 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 387 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 388 self.assertEqual(body, self.expected_body) 389 390 # explicit chunked, string body 391 conn = client.HTTPConnection('example.com') 392 conn.sock = FakeSocket(b'') 393 conn.request( 394 'POST', '/', self.expected_body.decode('latin-1'), 395 {'Transfer-Encoding': 'chunked'}) 396 397 _, headers, body = self._parse_request(conn.sock.data) 398 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 399 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 400 self.assertEqual(body, self.expected_body) 401 402 # User-specified TE, but request() does the chunk encoding 403 conn = client.HTTPConnection('example.com') 404 conn.sock = FakeSocket(b'') 405 conn.request('POST', '/', 406 headers={'Transfer-Encoding': 'gzip, chunked'}, 407 encode_chunked=True, 408 body=self._make_body()) 409 _, headers, body = self._parse_request(conn.sock.data) 410 self.assertNotIn('content-length', [k.lower() for k in headers]) 411 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 412 self.assertEqual(self._parse_chunked(body), self.expected_body) 413 414 def test_request(self): 415 for empty_lines in (False, True,): 416 conn = client.HTTPConnection('example.com') 417 conn.sock = FakeSocket(b'') 418 conn.request( 419 'POST', '/', self._make_body(empty_lines=empty_lines)) 420 421 _, headers, body = self._parse_request(conn.sock.data) 422 body = self._parse_chunked(body) 423 self.assertEqual(body, self.expected_body) 424 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 425 426 # Content-Length and Transfer-Encoding SHOULD not be sent in the 427 # same request 428 self.assertNotIn('content-length', [k.lower() for k in headers]) 429 430 def test_empty_body(self): 431 # Zero-length iterable should be treated like any other iterable 432 conn = client.HTTPConnection('example.com') 433 conn.sock = FakeSocket(b'') 434 conn.request('POST', '/', ()) 435 _, headers, body = self._parse_request(conn.sock.data) 436 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 437 self.assertNotIn('content-length', [k.lower() for k in headers]) 438 self.assertEqual(body, b"0\r\n\r\n") 439 440 def _make_body(self, empty_lines=False): 441 lines = self.expected_body.split(b' ') 442 for idx, line in enumerate(lines): 443 # for testing handling empty lines 444 if empty_lines and idx % 2: 445 yield b'' 446 if idx < len(lines) - 1: 447 yield line + b' ' 448 else: 449 yield line 450 451 def _parse_request(self, data): 452 lines = data.split(b'\r\n') 453 request = lines[0] 454 headers = {} 455 n = 1 456 while n < len(lines) and len(lines[n]) > 0: 457 key, val = lines[n].split(b':') 458 key = key.decode('latin-1').strip() 459 headers[key] = val.decode('latin-1').strip() 460 n += 1 461 462 return request, headers, b'\r\n'.join(lines[n + 1:]) 463 464 def _parse_chunked(self, data): 465 body = [] 466 trailers = {} 467 n = 0 468 lines = data.split(b'\r\n') 469 # parse body 470 while True: 471 size, chunk = lines[n:n+2] 472 size = int(size, 16) 473 474 if size == 0: 475 n += 1 476 break 477 478 self.assertEqual(size, len(chunk)) 479 body.append(chunk) 480 481 n += 2 482 # we /should/ hit the end chunk, but check against the size of 483 # lines so we're not stuck in an infinite loop should we get 484 # malformed data 485 if n > len(lines): 486 break 487 488 return b''.join(body) 489 490 491class BasicTest(TestCase): 492 def test_status_lines(self): 493 # Test HTTP status lines 494 495 body = "HTTP/1.1 200 Ok\r\n\r\nText" 496 sock = FakeSocket(body) 497 resp = client.HTTPResponse(sock) 498 resp.begin() 499 self.assertEqual(resp.read(0), b'') # Issue #20007 500 self.assertFalse(resp.isclosed()) 501 self.assertFalse(resp.closed) 502 self.assertEqual(resp.read(), b"Text") 503 self.assertTrue(resp.isclosed()) 504 self.assertFalse(resp.closed) 505 resp.close() 506 self.assertTrue(resp.closed) 507 508 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 509 sock = FakeSocket(body) 510 resp = client.HTTPResponse(sock) 511 self.assertRaises(client.BadStatusLine, resp.begin) 512 513 def test_bad_status_repr(self): 514 exc = client.BadStatusLine('') 515 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 516 517 def test_partial_reads(self): 518 # if we have Content-Length, HTTPResponse knows when to close itself, 519 # the same behaviour as when we read the whole thing with read() 520 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 521 sock = FakeSocket(body) 522 resp = client.HTTPResponse(sock) 523 resp.begin() 524 self.assertEqual(resp.read(2), b'Te') 525 self.assertFalse(resp.isclosed()) 526 self.assertEqual(resp.read(2), b'xt') 527 self.assertTrue(resp.isclosed()) 528 self.assertFalse(resp.closed) 529 resp.close() 530 self.assertTrue(resp.closed) 531 532 def test_mixed_reads(self): 533 # readline() should update the remaining length, so that read() knows 534 # how much data is left and does not raise IncompleteRead 535 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 536 sock = FakeSocket(body) 537 resp = client.HTTPResponse(sock) 538 resp.begin() 539 self.assertEqual(resp.readline(), b'Text\r\n') 540 self.assertFalse(resp.isclosed()) 541 self.assertEqual(resp.read(), b'Another') 542 self.assertTrue(resp.isclosed()) 543 self.assertFalse(resp.closed) 544 resp.close() 545 self.assertTrue(resp.closed) 546 547 def test_partial_readintos(self): 548 # if we have Content-Length, HTTPResponse knows when to close itself, 549 # the same behaviour as when we read the whole thing with read() 550 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 551 sock = FakeSocket(body) 552 resp = client.HTTPResponse(sock) 553 resp.begin() 554 b = bytearray(2) 555 n = resp.readinto(b) 556 self.assertEqual(n, 2) 557 self.assertEqual(bytes(b), b'Te') 558 self.assertFalse(resp.isclosed()) 559 n = resp.readinto(b) 560 self.assertEqual(n, 2) 561 self.assertEqual(bytes(b), b'xt') 562 self.assertTrue(resp.isclosed()) 563 self.assertFalse(resp.closed) 564 resp.close() 565 self.assertTrue(resp.closed) 566 567 def test_partial_reads_no_content_length(self): 568 # when no length is present, the socket should be gracefully closed when 569 # all data was read 570 body = "HTTP/1.1 200 Ok\r\n\r\nText" 571 sock = FakeSocket(body) 572 resp = client.HTTPResponse(sock) 573 resp.begin() 574 self.assertEqual(resp.read(2), b'Te') 575 self.assertFalse(resp.isclosed()) 576 self.assertEqual(resp.read(2), b'xt') 577 self.assertEqual(resp.read(1), b'') 578 self.assertTrue(resp.isclosed()) 579 self.assertFalse(resp.closed) 580 resp.close() 581 self.assertTrue(resp.closed) 582 583 def test_partial_readintos_no_content_length(self): 584 # when no length is present, the socket should be gracefully closed when 585 # all data was read 586 body = "HTTP/1.1 200 Ok\r\n\r\nText" 587 sock = FakeSocket(body) 588 resp = client.HTTPResponse(sock) 589 resp.begin() 590 b = bytearray(2) 591 n = resp.readinto(b) 592 self.assertEqual(n, 2) 593 self.assertEqual(bytes(b), b'Te') 594 self.assertFalse(resp.isclosed()) 595 n = resp.readinto(b) 596 self.assertEqual(n, 2) 597 self.assertEqual(bytes(b), b'xt') 598 n = resp.readinto(b) 599 self.assertEqual(n, 0) 600 self.assertTrue(resp.isclosed()) 601 602 def test_partial_reads_incomplete_body(self): 603 # if the server shuts down the connection before the whole 604 # content-length is delivered, the socket is gracefully closed 605 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 606 sock = FakeSocket(body) 607 resp = client.HTTPResponse(sock) 608 resp.begin() 609 self.assertEqual(resp.read(2), b'Te') 610 self.assertFalse(resp.isclosed()) 611 self.assertEqual(resp.read(2), b'xt') 612 self.assertEqual(resp.read(1), b'') 613 self.assertTrue(resp.isclosed()) 614 615 def test_partial_readintos_incomplete_body(self): 616 # if the server shuts down the connection before the whole 617 # content-length is delivered, the socket is gracefully closed 618 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 619 sock = FakeSocket(body) 620 resp = client.HTTPResponse(sock) 621 resp.begin() 622 b = bytearray(2) 623 n = resp.readinto(b) 624 self.assertEqual(n, 2) 625 self.assertEqual(bytes(b), b'Te') 626 self.assertFalse(resp.isclosed()) 627 n = resp.readinto(b) 628 self.assertEqual(n, 2) 629 self.assertEqual(bytes(b), b'xt') 630 n = resp.readinto(b) 631 self.assertEqual(n, 0) 632 self.assertTrue(resp.isclosed()) 633 self.assertFalse(resp.closed) 634 resp.close() 635 self.assertTrue(resp.closed) 636 637 def test_host_port(self): 638 # Check invalid host_port 639 640 for hp in ("www.python.org:abc", "user:password@www.python.org"): 641 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 642 643 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 644 "fe80::207:e9ff:fe9b", 8000), 645 ("www.python.org:80", "www.python.org", 80), 646 ("www.python.org:", "www.python.org", 80), 647 ("www.python.org", "www.python.org", 80), 648 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 649 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 650 c = client.HTTPConnection(hp) 651 self.assertEqual(h, c.host) 652 self.assertEqual(p, c.port) 653 654 def test_response_headers(self): 655 # test response with multiple message headers with the same field name. 656 text = ('HTTP/1.1 200 OK\r\n' 657 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 658 'Version="1"; Path="/acme"\r\n' 659 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 660 ' Path="/acme"\r\n' 661 '\r\n' 662 'No body\r\n') 663 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 664 ', ' 665 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 666 s = FakeSocket(text) 667 r = client.HTTPResponse(s) 668 r.begin() 669 cookies = r.getheader("Set-Cookie") 670 self.assertEqual(cookies, hdr) 671 672 def test_read_head(self): 673 # Test that the library doesn't attempt to read any data 674 # from a HEAD request. (Tickles SF bug #622042.) 675 sock = FakeSocket( 676 'HTTP/1.1 200 OK\r\n' 677 'Content-Length: 14432\r\n' 678 '\r\n', 679 NoEOFBytesIO) 680 resp = client.HTTPResponse(sock, method="HEAD") 681 resp.begin() 682 if resp.read(): 683 self.fail("Did not expect response from HEAD request") 684 685 def test_readinto_head(self): 686 # Test that the library doesn't attempt to read any data 687 # from a HEAD request. (Tickles SF bug #622042.) 688 sock = FakeSocket( 689 'HTTP/1.1 200 OK\r\n' 690 'Content-Length: 14432\r\n' 691 '\r\n', 692 NoEOFBytesIO) 693 resp = client.HTTPResponse(sock, method="HEAD") 694 resp.begin() 695 b = bytearray(5) 696 if resp.readinto(b) != 0: 697 self.fail("Did not expect response from HEAD request") 698 self.assertEqual(bytes(b), b'\x00'*5) 699 700 def test_too_many_headers(self): 701 headers = '\r\n'.join('Header%d: foo' % i 702 for i in range(client._MAXHEADERS + 1)) + '\r\n' 703 text = ('HTTP/1.1 200 OK\r\n' + headers) 704 s = FakeSocket(text) 705 r = client.HTTPResponse(s) 706 self.assertRaisesRegex(client.HTTPException, 707 r"got more than \d+ headers", r.begin) 708 709 def test_send_file(self): 710 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 711 b'Accept-Encoding: identity\r\n' 712 b'Transfer-Encoding: chunked\r\n' 713 b'\r\n') 714 715 with open(__file__, 'rb') as body: 716 conn = client.HTTPConnection('example.com') 717 sock = FakeSocket(body) 718 conn.sock = sock 719 conn.request('GET', '/foo', body) 720 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 721 (sock.data[:len(expected)], expected)) 722 723 def test_send(self): 724 expected = b'this is a test this is only a test' 725 conn = client.HTTPConnection('example.com') 726 sock = FakeSocket(None) 727 conn.sock = sock 728 conn.send(expected) 729 self.assertEqual(expected, sock.data) 730 sock.data = b'' 731 conn.send(array.array('b', expected)) 732 self.assertEqual(expected, sock.data) 733 sock.data = b'' 734 conn.send(io.BytesIO(expected)) 735 self.assertEqual(expected, sock.data) 736 737 def test_send_updating_file(self): 738 def data(): 739 yield 'data' 740 yield None 741 yield 'data_two' 742 743 class UpdatingFile(io.TextIOBase): 744 mode = 'r' 745 d = data() 746 def read(self, blocksize=-1): 747 return next(self.d) 748 749 expected = b'data' 750 751 conn = client.HTTPConnection('example.com') 752 sock = FakeSocket("") 753 conn.sock = sock 754 conn.send(UpdatingFile()) 755 self.assertEqual(sock.data, expected) 756 757 758 def test_send_iter(self): 759 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 760 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 761 b'\r\nonetwothree' 762 763 def body(): 764 yield b"one" 765 yield b"two" 766 yield b"three" 767 768 conn = client.HTTPConnection('example.com') 769 sock = FakeSocket("") 770 conn.sock = sock 771 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 772 self.assertEqual(sock.data, expected) 773 774 def test_blocksize_request(self): 775 """Check that request() respects the configured block size.""" 776 blocksize = 8 # For easy debugging. 777 conn = client.HTTPConnection('example.com', blocksize=blocksize) 778 sock = FakeSocket(None) 779 conn.sock = sock 780 expected = b"a" * blocksize + b"b" 781 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 782 self.assertEqual(sock.sendall_calls, 3) 783 body = sock.data.split(b"\r\n\r\n", 1)[1] 784 self.assertEqual(body, expected) 785 786 def test_blocksize_send(self): 787 """Check that send() respects the configured block size.""" 788 blocksize = 8 # For easy debugging. 789 conn = client.HTTPConnection('example.com', blocksize=blocksize) 790 sock = FakeSocket(None) 791 conn.sock = sock 792 expected = b"a" * blocksize + b"b" 793 conn.send(io.BytesIO(expected)) 794 self.assertEqual(sock.sendall_calls, 2) 795 self.assertEqual(sock.data, expected) 796 797 def test_send_type_error(self): 798 # See: Issue #12676 799 conn = client.HTTPConnection('example.com') 800 conn.sock = FakeSocket('') 801 with self.assertRaises(TypeError): 802 conn.request('POST', 'test', conn) 803 804 def test_chunked(self): 805 expected = chunked_expected 806 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 807 resp = client.HTTPResponse(sock, method="GET") 808 resp.begin() 809 self.assertEqual(resp.read(), expected) 810 resp.close() 811 812 # Various read sizes 813 for n in range(1, 12): 814 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 815 resp = client.HTTPResponse(sock, method="GET") 816 resp.begin() 817 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 818 resp.close() 819 820 for x in ('', 'foo\r\n'): 821 sock = FakeSocket(chunked_start + x) 822 resp = client.HTTPResponse(sock, method="GET") 823 resp.begin() 824 try: 825 resp.read() 826 except client.IncompleteRead as i: 827 self.assertEqual(i.partial, expected) 828 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 829 self.assertEqual(repr(i), expected_message) 830 self.assertEqual(str(i), expected_message) 831 else: 832 self.fail('IncompleteRead expected') 833 finally: 834 resp.close() 835 836 def test_readinto_chunked(self): 837 838 expected = chunked_expected 839 nexpected = len(expected) 840 b = bytearray(128) 841 842 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 843 resp = client.HTTPResponse(sock, method="GET") 844 resp.begin() 845 n = resp.readinto(b) 846 self.assertEqual(b[:nexpected], expected) 847 self.assertEqual(n, nexpected) 848 resp.close() 849 850 # Various read sizes 851 for n in range(1, 12): 852 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 853 resp = client.HTTPResponse(sock, method="GET") 854 resp.begin() 855 m = memoryview(b) 856 i = resp.readinto(m[0:n]) 857 i += resp.readinto(m[i:n + i]) 858 i += resp.readinto(m[i:]) 859 self.assertEqual(b[:nexpected], expected) 860 self.assertEqual(i, nexpected) 861 resp.close() 862 863 for x in ('', 'foo\r\n'): 864 sock = FakeSocket(chunked_start + x) 865 resp = client.HTTPResponse(sock, method="GET") 866 resp.begin() 867 try: 868 n = resp.readinto(b) 869 except client.IncompleteRead as i: 870 self.assertEqual(i.partial, expected) 871 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 872 self.assertEqual(repr(i), expected_message) 873 self.assertEqual(str(i), expected_message) 874 else: 875 self.fail('IncompleteRead expected') 876 finally: 877 resp.close() 878 879 def test_chunked_head(self): 880 chunked_start = ( 881 'HTTP/1.1 200 OK\r\n' 882 'Transfer-Encoding: chunked\r\n\r\n' 883 'a\r\n' 884 'hello world\r\n' 885 '1\r\n' 886 'd\r\n' 887 ) 888 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 889 resp = client.HTTPResponse(sock, method="HEAD") 890 resp.begin() 891 self.assertEqual(resp.read(), b'') 892 self.assertEqual(resp.status, 200) 893 self.assertEqual(resp.reason, 'OK') 894 self.assertTrue(resp.isclosed()) 895 self.assertFalse(resp.closed) 896 resp.close() 897 self.assertTrue(resp.closed) 898 899 def test_readinto_chunked_head(self): 900 chunked_start = ( 901 'HTTP/1.1 200 OK\r\n' 902 'Transfer-Encoding: chunked\r\n\r\n' 903 'a\r\n' 904 'hello world\r\n' 905 '1\r\n' 906 'd\r\n' 907 ) 908 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 909 resp = client.HTTPResponse(sock, method="HEAD") 910 resp.begin() 911 b = bytearray(5) 912 n = resp.readinto(b) 913 self.assertEqual(n, 0) 914 self.assertEqual(bytes(b), b'\x00'*5) 915 self.assertEqual(resp.status, 200) 916 self.assertEqual(resp.reason, 'OK') 917 self.assertTrue(resp.isclosed()) 918 self.assertFalse(resp.closed) 919 resp.close() 920 self.assertTrue(resp.closed) 921 922 def test_negative_content_length(self): 923 sock = FakeSocket( 924 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 925 resp = client.HTTPResponse(sock, method="GET") 926 resp.begin() 927 self.assertEqual(resp.read(), b'Hello\r\n') 928 self.assertTrue(resp.isclosed()) 929 930 def test_incomplete_read(self): 931 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 932 resp = client.HTTPResponse(sock, method="GET") 933 resp.begin() 934 try: 935 resp.read() 936 except client.IncompleteRead as i: 937 self.assertEqual(i.partial, b'Hello\r\n') 938 self.assertEqual(repr(i), 939 "IncompleteRead(7 bytes read, 3 more expected)") 940 self.assertEqual(str(i), 941 "IncompleteRead(7 bytes read, 3 more expected)") 942 self.assertTrue(resp.isclosed()) 943 else: 944 self.fail('IncompleteRead expected') 945 946 def test_epipe(self): 947 sock = EPipeSocket( 948 "HTTP/1.0 401 Authorization Required\r\n" 949 "Content-type: text/html\r\n" 950 "WWW-Authenticate: Basic realm=\"example\"\r\n", 951 b"Content-Length") 952 conn = client.HTTPConnection("example.com") 953 conn.sock = sock 954 self.assertRaises(OSError, 955 lambda: conn.request("PUT", "/url", "body")) 956 resp = conn.getresponse() 957 self.assertEqual(401, resp.status) 958 self.assertEqual("Basic realm=\"example\"", 959 resp.getheader("www-authenticate")) 960 961 # Test lines overflowing the max line size (_MAXLINE in http.client) 962 963 def test_overflowing_status_line(self): 964 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 965 resp = client.HTTPResponse(FakeSocket(body)) 966 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 967 968 def test_overflowing_header_line(self): 969 body = ( 970 'HTTP/1.1 200 OK\r\n' 971 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 972 ) 973 resp = client.HTTPResponse(FakeSocket(body)) 974 self.assertRaises(client.LineTooLong, resp.begin) 975 976 def test_overflowing_chunked_line(self): 977 body = ( 978 'HTTP/1.1 200 OK\r\n' 979 'Transfer-Encoding: chunked\r\n\r\n' 980 + '0' * 65536 + 'a\r\n' 981 'hello world\r\n' 982 '0\r\n' 983 '\r\n' 984 ) 985 resp = client.HTTPResponse(FakeSocket(body)) 986 resp.begin() 987 self.assertRaises(client.LineTooLong, resp.read) 988 989 def test_early_eof(self): 990 # Test httpresponse with no \r\n termination, 991 body = "HTTP/1.1 200 Ok" 992 sock = FakeSocket(body) 993 resp = client.HTTPResponse(sock) 994 resp.begin() 995 self.assertEqual(resp.read(), b'') 996 self.assertTrue(resp.isclosed()) 997 self.assertFalse(resp.closed) 998 resp.close() 999 self.assertTrue(resp.closed) 1000 1001 def test_error_leak(self): 1002 # Test that the socket is not leaked if getresponse() fails 1003 conn = client.HTTPConnection('example.com') 1004 response = None 1005 class Response(client.HTTPResponse): 1006 def __init__(self, *pos, **kw): 1007 nonlocal response 1008 response = self # Avoid garbage collector closing the socket 1009 client.HTTPResponse.__init__(self, *pos, **kw) 1010 conn.response_class = Response 1011 conn.sock = FakeSocket('Invalid status line') 1012 conn.request('GET', '/') 1013 self.assertRaises(client.BadStatusLine, conn.getresponse) 1014 self.assertTrue(response.closed) 1015 self.assertTrue(conn.sock.file_closed) 1016 1017 def test_chunked_extension(self): 1018 extra = '3;foo=bar\r\n' + 'abc\r\n' 1019 expected = chunked_expected + b'abc' 1020 1021 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1022 resp = client.HTTPResponse(sock, method="GET") 1023 resp.begin() 1024 self.assertEqual(resp.read(), expected) 1025 resp.close() 1026 1027 def test_chunked_missing_end(self): 1028 """some servers may serve up a short chunked encoding stream""" 1029 expected = chunked_expected 1030 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1031 resp = client.HTTPResponse(sock, method="GET") 1032 resp.begin() 1033 self.assertEqual(resp.read(), expected) 1034 resp.close() 1035 1036 def test_chunked_trailers(self): 1037 """See that trailers are read and ignored""" 1038 expected = chunked_expected 1039 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1040 resp = client.HTTPResponse(sock, method="GET") 1041 resp.begin() 1042 self.assertEqual(resp.read(), expected) 1043 # we should have reached the end of the file 1044 self.assertEqual(sock.file.read(), b"") #we read to the end 1045 resp.close() 1046 1047 def test_chunked_sync(self): 1048 """Check that we don't read past the end of the chunked-encoding stream""" 1049 expected = chunked_expected 1050 extradata = "extradata" 1051 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1052 resp = client.HTTPResponse(sock, method="GET") 1053 resp.begin() 1054 self.assertEqual(resp.read(), expected) 1055 # the file should now have our extradata ready to be read 1056 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1057 resp.close() 1058 1059 def test_content_length_sync(self): 1060 """Check that we don't read past the end of the Content-Length stream""" 1061 extradata = b"extradata" 1062 expected = b"Hello123\r\n" 1063 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1064 resp = client.HTTPResponse(sock, method="GET") 1065 resp.begin() 1066 self.assertEqual(resp.read(), expected) 1067 # the file should now have our extradata ready to be read 1068 self.assertEqual(sock.file.read(), extradata) #we read to the end 1069 resp.close() 1070 1071 def test_readlines_content_length(self): 1072 extradata = b"extradata" 1073 expected = b"Hello123\r\n" 1074 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1075 resp = client.HTTPResponse(sock, method="GET") 1076 resp.begin() 1077 self.assertEqual(resp.readlines(2000), [expected]) 1078 # the file should now have our extradata ready to be read 1079 self.assertEqual(sock.file.read(), extradata) #we read to the end 1080 resp.close() 1081 1082 def test_read1_content_length(self): 1083 extradata = b"extradata" 1084 expected = b"Hello123\r\n" 1085 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1086 resp = client.HTTPResponse(sock, method="GET") 1087 resp.begin() 1088 self.assertEqual(resp.read1(2000), expected) 1089 # the file should now have our extradata ready to be read 1090 self.assertEqual(sock.file.read(), extradata) #we read to the end 1091 resp.close() 1092 1093 def test_readline_bound_content_length(self): 1094 extradata = b"extradata" 1095 expected = b"Hello123\r\n" 1096 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1097 resp = client.HTTPResponse(sock, method="GET") 1098 resp.begin() 1099 self.assertEqual(resp.readline(10), expected) 1100 self.assertEqual(resp.readline(10), b"") 1101 # the file should now have our extradata ready to be read 1102 self.assertEqual(sock.file.read(), extradata) #we read to the end 1103 resp.close() 1104 1105 def test_read1_bound_content_length(self): 1106 extradata = b"extradata" 1107 expected = b"Hello123\r\n" 1108 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1109 resp = client.HTTPResponse(sock, method="GET") 1110 resp.begin() 1111 self.assertEqual(resp.read1(20), expected*2) 1112 self.assertEqual(resp.read(), expected) 1113 # the file should now have our extradata ready to be read 1114 self.assertEqual(sock.file.read(), extradata) #we read to the end 1115 resp.close() 1116 1117 def test_response_fileno(self): 1118 # Make sure fd returned by fileno is valid. 1119 serv = socket.socket( 1120 socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) 1121 self.addCleanup(serv.close) 1122 serv.bind((HOST, 0)) 1123 serv.listen() 1124 1125 result = None 1126 def run_server(): 1127 [conn, address] = serv.accept() 1128 with conn, conn.makefile("rb") as reader: 1129 # Read the request header until a blank line 1130 while True: 1131 line = reader.readline() 1132 if not line.rstrip(b"\r\n"): 1133 break 1134 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1135 nonlocal result 1136 result = reader.read() 1137 1138 thread = threading.Thread(target=run_server) 1139 thread.start() 1140 self.addCleanup(thread.join, float(1)) 1141 conn = client.HTTPConnection(*serv.getsockname()) 1142 conn.request("CONNECT", "dummy:1234") 1143 response = conn.getresponse() 1144 try: 1145 self.assertEqual(response.status, client.OK) 1146 s = socket.socket(fileno=response.fileno()) 1147 try: 1148 s.sendall(b"proxied data\n") 1149 finally: 1150 s.detach() 1151 finally: 1152 response.close() 1153 conn.close() 1154 thread.join() 1155 self.assertEqual(result, b"proxied data\n") 1156 1157class ExtendedReadTest(TestCase): 1158 """ 1159 Test peek(), read1(), readline() 1160 """ 1161 lines = ( 1162 'HTTP/1.1 200 OK\r\n' 1163 '\r\n' 1164 'hello world!\n' 1165 'and now \n' 1166 'for something completely different\n' 1167 'foo' 1168 ) 1169 lines_expected = lines[lines.find('hello'):].encode("ascii") 1170 lines_chunked = ( 1171 'HTTP/1.1 200 OK\r\n' 1172 'Transfer-Encoding: chunked\r\n\r\n' 1173 'a\r\n' 1174 'hello worl\r\n' 1175 '3\r\n' 1176 'd!\n\r\n' 1177 '9\r\n' 1178 'and now \n\r\n' 1179 '23\r\n' 1180 'for something completely different\n\r\n' 1181 '3\r\n' 1182 'foo\r\n' 1183 '0\r\n' # terminating chunk 1184 '\r\n' # end of trailers 1185 ) 1186 1187 def setUp(self): 1188 sock = FakeSocket(self.lines) 1189 resp = client.HTTPResponse(sock, method="GET") 1190 resp.begin() 1191 resp.fp = io.BufferedReader(resp.fp) 1192 self.resp = resp 1193 1194 1195 1196 def test_peek(self): 1197 resp = self.resp 1198 # patch up the buffered peek so that it returns not too much stuff 1199 oldpeek = resp.fp.peek 1200 def mypeek(n=-1): 1201 p = oldpeek(n) 1202 if n >= 0: 1203 return p[:n] 1204 return p[:10] 1205 resp.fp.peek = mypeek 1206 1207 all = [] 1208 while True: 1209 # try a short peek 1210 p = resp.peek(3) 1211 if p: 1212 self.assertGreater(len(p), 0) 1213 # then unbounded peek 1214 p2 = resp.peek() 1215 self.assertGreaterEqual(len(p2), len(p)) 1216 self.assertTrue(p2.startswith(p)) 1217 next = resp.read(len(p2)) 1218 self.assertEqual(next, p2) 1219 else: 1220 next = resp.read() 1221 self.assertFalse(next) 1222 all.append(next) 1223 if not next: 1224 break 1225 self.assertEqual(b"".join(all), self.lines_expected) 1226 1227 def test_readline(self): 1228 resp = self.resp 1229 self._verify_readline(self.resp.readline, self.lines_expected) 1230 1231 def _verify_readline(self, readline, expected): 1232 all = [] 1233 while True: 1234 # short readlines 1235 line = readline(5) 1236 if line and line != b"foo": 1237 if len(line) < 5: 1238 self.assertTrue(line.endswith(b"\n")) 1239 all.append(line) 1240 if not line: 1241 break 1242 self.assertEqual(b"".join(all), expected) 1243 1244 def test_read1(self): 1245 resp = self.resp 1246 def r(): 1247 res = resp.read1(4) 1248 self.assertLessEqual(len(res), 4) 1249 return res 1250 readliner = Readliner(r) 1251 self._verify_readline(readliner.readline, self.lines_expected) 1252 1253 def test_read1_unbounded(self): 1254 resp = self.resp 1255 all = [] 1256 while True: 1257 data = resp.read1() 1258 if not data: 1259 break 1260 all.append(data) 1261 self.assertEqual(b"".join(all), self.lines_expected) 1262 1263 def test_read1_bounded(self): 1264 resp = self.resp 1265 all = [] 1266 while True: 1267 data = resp.read1(10) 1268 if not data: 1269 break 1270 self.assertLessEqual(len(data), 10) 1271 all.append(data) 1272 self.assertEqual(b"".join(all), self.lines_expected) 1273 1274 def test_read1_0(self): 1275 self.assertEqual(self.resp.read1(0), b"") 1276 1277 def test_peek_0(self): 1278 p = self.resp.peek(0) 1279 self.assertLessEqual(0, len(p)) 1280 1281class ExtendedReadTestChunked(ExtendedReadTest): 1282 """ 1283 Test peek(), read1(), readline() in chunked mode 1284 """ 1285 lines = ( 1286 'HTTP/1.1 200 OK\r\n' 1287 'Transfer-Encoding: chunked\r\n\r\n' 1288 'a\r\n' 1289 'hello worl\r\n' 1290 '3\r\n' 1291 'd!\n\r\n' 1292 '9\r\n' 1293 'and now \n\r\n' 1294 '23\r\n' 1295 'for something completely different\n\r\n' 1296 '3\r\n' 1297 'foo\r\n' 1298 '0\r\n' # terminating chunk 1299 '\r\n' # end of trailers 1300 ) 1301 1302 1303class Readliner: 1304 """ 1305 a simple readline class that uses an arbitrary read function and buffering 1306 """ 1307 def __init__(self, readfunc): 1308 self.readfunc = readfunc 1309 self.remainder = b"" 1310 1311 def readline(self, limit): 1312 data = [] 1313 datalen = 0 1314 read = self.remainder 1315 try: 1316 while True: 1317 idx = read.find(b'\n') 1318 if idx != -1: 1319 break 1320 if datalen + len(read) >= limit: 1321 idx = limit - datalen - 1 1322 # read more data 1323 data.append(read) 1324 read = self.readfunc() 1325 if not read: 1326 idx = 0 #eof condition 1327 break 1328 idx += 1 1329 data.append(read[:idx]) 1330 self.remainder = read[idx:] 1331 return b"".join(data) 1332 except: 1333 self.remainder = b"".join(data) 1334 raise 1335 1336 1337class OfflineTest(TestCase): 1338 def test_all(self): 1339 # Documented objects defined in the module should be in __all__ 1340 expected = {"responses"} # White-list documented dict() object 1341 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1342 # intentionally omitted for simplicity 1343 blacklist = {"HTTPMessage", "parse_headers"} 1344 for name in dir(client): 1345 if name.startswith("_") or name in blacklist: 1346 continue 1347 module_object = getattr(client, name) 1348 if getattr(module_object, "__module__", None) == "http.client": 1349 expected.add(name) 1350 self.assertCountEqual(client.__all__, expected) 1351 1352 def test_responses(self): 1353 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1354 1355 def test_client_constants(self): 1356 # Make sure we don't break backward compatibility with 3.4 1357 expected = [ 1358 'CONTINUE', 1359 'SWITCHING_PROTOCOLS', 1360 'PROCESSING', 1361 'OK', 1362 'CREATED', 1363 'ACCEPTED', 1364 'NON_AUTHORITATIVE_INFORMATION', 1365 'NO_CONTENT', 1366 'RESET_CONTENT', 1367 'PARTIAL_CONTENT', 1368 'MULTI_STATUS', 1369 'IM_USED', 1370 'MULTIPLE_CHOICES', 1371 'MOVED_PERMANENTLY', 1372 'FOUND', 1373 'SEE_OTHER', 1374 'NOT_MODIFIED', 1375 'USE_PROXY', 1376 'TEMPORARY_REDIRECT', 1377 'BAD_REQUEST', 1378 'UNAUTHORIZED', 1379 'PAYMENT_REQUIRED', 1380 'FORBIDDEN', 1381 'NOT_FOUND', 1382 'METHOD_NOT_ALLOWED', 1383 'NOT_ACCEPTABLE', 1384 'PROXY_AUTHENTICATION_REQUIRED', 1385 'REQUEST_TIMEOUT', 1386 'CONFLICT', 1387 'GONE', 1388 'LENGTH_REQUIRED', 1389 'PRECONDITION_FAILED', 1390 'REQUEST_ENTITY_TOO_LARGE', 1391 'REQUEST_URI_TOO_LONG', 1392 'UNSUPPORTED_MEDIA_TYPE', 1393 'REQUESTED_RANGE_NOT_SATISFIABLE', 1394 'EXPECTATION_FAILED', 1395 'MISDIRECTED_REQUEST', 1396 'UNPROCESSABLE_ENTITY', 1397 'LOCKED', 1398 'FAILED_DEPENDENCY', 1399 'UPGRADE_REQUIRED', 1400 'PRECONDITION_REQUIRED', 1401 'TOO_MANY_REQUESTS', 1402 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1403 'INTERNAL_SERVER_ERROR', 1404 'NOT_IMPLEMENTED', 1405 'BAD_GATEWAY', 1406 'SERVICE_UNAVAILABLE', 1407 'GATEWAY_TIMEOUT', 1408 'HTTP_VERSION_NOT_SUPPORTED', 1409 'INSUFFICIENT_STORAGE', 1410 'NOT_EXTENDED', 1411 'NETWORK_AUTHENTICATION_REQUIRED', 1412 ] 1413 for const in expected: 1414 with self.subTest(constant=const): 1415 self.assertTrue(hasattr(client, const)) 1416 1417 1418class SourceAddressTest(TestCase): 1419 def setUp(self): 1420 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1421 self.port = support.bind_port(self.serv) 1422 self.source_port = support.find_unused_port() 1423 self.serv.listen() 1424 self.conn = None 1425 1426 def tearDown(self): 1427 if self.conn: 1428 self.conn.close() 1429 self.conn = None 1430 self.serv.close() 1431 self.serv = None 1432 1433 def testHTTPConnectionSourceAddress(self): 1434 self.conn = client.HTTPConnection(HOST, self.port, 1435 source_address=('', self.source_port)) 1436 self.conn.connect() 1437 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1438 1439 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1440 'http.client.HTTPSConnection not defined') 1441 def testHTTPSConnectionSourceAddress(self): 1442 self.conn = client.HTTPSConnection(HOST, self.port, 1443 source_address=('', self.source_port)) 1444 # We don't test anything here other than the constructor not barfing as 1445 # this code doesn't deal with setting up an active running SSL server 1446 # for an ssl_wrapped connect() to actually return from. 1447 1448 1449class TimeoutTest(TestCase): 1450 PORT = None 1451 1452 def setUp(self): 1453 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1454 TimeoutTest.PORT = support.bind_port(self.serv) 1455 self.serv.listen() 1456 1457 def tearDown(self): 1458 self.serv.close() 1459 self.serv = None 1460 1461 def testTimeoutAttribute(self): 1462 # This will prove that the timeout gets through HTTPConnection 1463 # and into the socket. 1464 1465 # default -- use global socket timeout 1466 self.assertIsNone(socket.getdefaulttimeout()) 1467 socket.setdefaulttimeout(30) 1468 try: 1469 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1470 httpConn.connect() 1471 finally: 1472 socket.setdefaulttimeout(None) 1473 self.assertEqual(httpConn.sock.gettimeout(), 30) 1474 httpConn.close() 1475 1476 # no timeout -- do not use global socket default 1477 self.assertIsNone(socket.getdefaulttimeout()) 1478 socket.setdefaulttimeout(30) 1479 try: 1480 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1481 timeout=None) 1482 httpConn.connect() 1483 finally: 1484 socket.setdefaulttimeout(None) 1485 self.assertEqual(httpConn.sock.gettimeout(), None) 1486 httpConn.close() 1487 1488 # a value 1489 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1490 httpConn.connect() 1491 self.assertEqual(httpConn.sock.gettimeout(), 30) 1492 httpConn.close() 1493 1494 1495class PersistenceTest(TestCase): 1496 1497 def test_reuse_reconnect(self): 1498 # Should reuse or reconnect depending on header from server 1499 tests = ( 1500 ('1.0', '', False), 1501 ('1.0', 'Connection: keep-alive\r\n', True), 1502 ('1.1', '', True), 1503 ('1.1', 'Connection: close\r\n', False), 1504 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1505 ('1.1', 'Connection: cloSE\r\n', False), 1506 ) 1507 for version, header, reuse in tests: 1508 with self.subTest(version=version, header=header): 1509 msg = ( 1510 'HTTP/{} 200 OK\r\n' 1511 '{}' 1512 'Content-Length: 12\r\n' 1513 '\r\n' 1514 'Dummy body\r\n' 1515 ).format(version, header) 1516 conn = FakeSocketHTTPConnection(msg) 1517 self.assertIsNone(conn.sock) 1518 conn.request('GET', '/open-connection') 1519 with conn.getresponse() as response: 1520 self.assertEqual(conn.sock is None, not reuse) 1521 response.read() 1522 self.assertEqual(conn.sock is None, not reuse) 1523 self.assertEqual(conn.connections, 1) 1524 conn.request('GET', '/subsequent-request') 1525 self.assertEqual(conn.connections, 1 if reuse else 2) 1526 1527 def test_disconnected(self): 1528 1529 def make_reset_reader(text): 1530 """Return BufferedReader that raises ECONNRESET at EOF""" 1531 stream = io.BytesIO(text) 1532 def readinto(buffer): 1533 size = io.BytesIO.readinto(stream, buffer) 1534 if size == 0: 1535 raise ConnectionResetError() 1536 return size 1537 stream.readinto = readinto 1538 return io.BufferedReader(stream) 1539 1540 tests = ( 1541 (io.BytesIO, client.RemoteDisconnected), 1542 (make_reset_reader, ConnectionResetError), 1543 ) 1544 for stream_factory, exception in tests: 1545 with self.subTest(exception=exception): 1546 conn = FakeSocketHTTPConnection(b'', stream_factory) 1547 conn.request('GET', '/eof-response') 1548 self.assertRaises(exception, conn.getresponse) 1549 self.assertIsNone(conn.sock) 1550 # HTTPConnection.connect() should be automatically invoked 1551 conn.request('GET', '/reconnect') 1552 self.assertEqual(conn.connections, 2) 1553 1554 def test_100_close(self): 1555 conn = FakeSocketHTTPConnection( 1556 b'HTTP/1.1 100 Continue\r\n' 1557 b'\r\n' 1558 # Missing final response 1559 ) 1560 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1561 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1562 self.assertIsNone(conn.sock) 1563 conn.request('GET', '/reconnect') 1564 self.assertEqual(conn.connections, 2) 1565 1566 1567class HTTPSTest(TestCase): 1568 1569 def setUp(self): 1570 if not hasattr(client, 'HTTPSConnection'): 1571 self.skipTest('ssl support required') 1572 1573 def make_server(self, certfile): 1574 from test.ssl_servers import make_https_server 1575 return make_https_server(self, certfile=certfile) 1576 1577 def test_attributes(self): 1578 # simple test to check it's storing the timeout 1579 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1580 self.assertEqual(h.timeout, 30) 1581 1582 def test_networked(self): 1583 # Default settings: requires a valid cert from a trusted CA 1584 import ssl 1585 support.requires('network') 1586 with support.transient_internet('self-signed.pythontest.net'): 1587 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1588 with self.assertRaises(ssl.SSLError) as exc_info: 1589 h.request('GET', '/') 1590 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1591 1592 def test_networked_noverification(self): 1593 # Switch off cert verification 1594 import ssl 1595 support.requires('network') 1596 with support.transient_internet('self-signed.pythontest.net'): 1597 context = ssl._create_unverified_context() 1598 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1599 context=context) 1600 h.request('GET', '/') 1601 resp = h.getresponse() 1602 h.close() 1603 self.assertIn('nginx', resp.getheader('server')) 1604 resp.close() 1605 1606 @support.system_must_validate_cert 1607 def test_networked_trusted_by_default_cert(self): 1608 # Default settings: requires a valid cert from a trusted CA 1609 support.requires('network') 1610 with support.transient_internet('www.python.org'): 1611 h = client.HTTPSConnection('www.python.org', 443) 1612 h.request('GET', '/') 1613 resp = h.getresponse() 1614 content_type = resp.getheader('content-type') 1615 resp.close() 1616 h.close() 1617 self.assertIn('text/html', content_type) 1618 1619 def test_networked_good_cert(self): 1620 # We feed the server's cert as a validating cert 1621 import ssl 1622 support.requires('network') 1623 with support.transient_internet('self-signed.pythontest.net'): 1624 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1625 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1626 self.assertEqual(context.check_hostname, True) 1627 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1628 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1629 h.request('GET', '/') 1630 resp = h.getresponse() 1631 server_string = resp.getheader('server') 1632 resp.close() 1633 h.close() 1634 self.assertIn('nginx', server_string) 1635 1636 def test_networked_bad_cert(self): 1637 # We feed a "CA" cert that is unrelated to the server's cert 1638 import ssl 1639 support.requires('network') 1640 with support.transient_internet('self-signed.pythontest.net'): 1641 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1642 context.load_verify_locations(CERT_localhost) 1643 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1644 with self.assertRaises(ssl.SSLError) as exc_info: 1645 h.request('GET', '/') 1646 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1647 1648 def test_local_unknown_cert(self): 1649 # The custom cert isn't known to the default trust bundle 1650 import ssl 1651 server = self.make_server(CERT_localhost) 1652 h = client.HTTPSConnection('localhost', server.port) 1653 with self.assertRaises(ssl.SSLError) as exc_info: 1654 h.request('GET', '/') 1655 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1656 1657 def test_local_good_hostname(self): 1658 # The (valid) cert validates the HTTP hostname 1659 import ssl 1660 server = self.make_server(CERT_localhost) 1661 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1662 context.load_verify_locations(CERT_localhost) 1663 h = client.HTTPSConnection('localhost', server.port, context=context) 1664 self.addCleanup(h.close) 1665 h.request('GET', '/nonexistent') 1666 resp = h.getresponse() 1667 self.addCleanup(resp.close) 1668 self.assertEqual(resp.status, 404) 1669 1670 def test_local_bad_hostname(self): 1671 # The (valid) cert doesn't validate the HTTP hostname 1672 import ssl 1673 server = self.make_server(CERT_fakehostname) 1674 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1675 context.load_verify_locations(CERT_fakehostname) 1676 h = client.HTTPSConnection('localhost', server.port, context=context) 1677 with self.assertRaises(ssl.CertificateError): 1678 h.request('GET', '/') 1679 # Same with explicit check_hostname=True 1680 with support.check_warnings(('', DeprecationWarning)): 1681 h = client.HTTPSConnection('localhost', server.port, 1682 context=context, check_hostname=True) 1683 with self.assertRaises(ssl.CertificateError): 1684 h.request('GET', '/') 1685 # With check_hostname=False, the mismatching is ignored 1686 context.check_hostname = False 1687 with support.check_warnings(('', DeprecationWarning)): 1688 h = client.HTTPSConnection('localhost', server.port, 1689 context=context, check_hostname=False) 1690 h.request('GET', '/nonexistent') 1691 resp = h.getresponse() 1692 resp.close() 1693 h.close() 1694 self.assertEqual(resp.status, 404) 1695 # The context's check_hostname setting is used if one isn't passed to 1696 # HTTPSConnection. 1697 context.check_hostname = False 1698 h = client.HTTPSConnection('localhost', server.port, context=context) 1699 h.request('GET', '/nonexistent') 1700 resp = h.getresponse() 1701 self.assertEqual(resp.status, 404) 1702 resp.close() 1703 h.close() 1704 # Passing check_hostname to HTTPSConnection should override the 1705 # context's setting. 1706 with support.check_warnings(('', DeprecationWarning)): 1707 h = client.HTTPSConnection('localhost', server.port, 1708 context=context, check_hostname=True) 1709 with self.assertRaises(ssl.CertificateError): 1710 h.request('GET', '/') 1711 1712 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1713 'http.client.HTTPSConnection not available') 1714 def test_host_port(self): 1715 # Check invalid host_port 1716 1717 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1718 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1719 1720 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1721 "fe80::207:e9ff:fe9b", 8000), 1722 ("www.python.org:443", "www.python.org", 443), 1723 ("www.python.org:", "www.python.org", 443), 1724 ("www.python.org", "www.python.org", 443), 1725 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1726 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1727 443)): 1728 c = client.HTTPSConnection(hp) 1729 self.assertEqual(h, c.host) 1730 self.assertEqual(p, c.port) 1731 1732 1733class RequestBodyTest(TestCase): 1734 """Test cases where a request includes a message body.""" 1735 1736 def setUp(self): 1737 self.conn = client.HTTPConnection('example.com') 1738 self.conn.sock = self.sock = FakeSocket("") 1739 self.conn.sock = self.sock 1740 1741 def get_headers_and_fp(self): 1742 f = io.BytesIO(self.sock.data) 1743 f.readline() # read the request line 1744 message = client.parse_headers(f) 1745 return message, f 1746 1747 def test_list_body(self): 1748 # Note that no content-length is automatically calculated for 1749 # an iterable. The request will fall back to send chunked 1750 # transfer encoding. 1751 cases = ( 1752 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1753 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1754 ) 1755 for body, expected in cases: 1756 with self.subTest(body): 1757 self.conn = client.HTTPConnection('example.com') 1758 self.conn.sock = self.sock = FakeSocket('') 1759 1760 self.conn.request('PUT', '/url', body) 1761 msg, f = self.get_headers_and_fp() 1762 self.assertNotIn('Content-Type', msg) 1763 self.assertNotIn('Content-Length', msg) 1764 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1765 self.assertEqual(expected, f.read()) 1766 1767 def test_manual_content_length(self): 1768 # Set an incorrect content-length so that we can verify that 1769 # it will not be over-ridden by the library. 1770 self.conn.request("PUT", "/url", "body", 1771 {"Content-Length": "42"}) 1772 message, f = self.get_headers_and_fp() 1773 self.assertEqual("42", message.get("content-length")) 1774 self.assertEqual(4, len(f.read())) 1775 1776 def test_ascii_body(self): 1777 self.conn.request("PUT", "/url", "body") 1778 message, f = self.get_headers_and_fp() 1779 self.assertEqual("text/plain", message.get_content_type()) 1780 self.assertIsNone(message.get_charset()) 1781 self.assertEqual("4", message.get("content-length")) 1782 self.assertEqual(b'body', f.read()) 1783 1784 def test_latin1_body(self): 1785 self.conn.request("PUT", "/url", "body\xc1") 1786 message, f = self.get_headers_and_fp() 1787 self.assertEqual("text/plain", message.get_content_type()) 1788 self.assertIsNone(message.get_charset()) 1789 self.assertEqual("5", message.get("content-length")) 1790 self.assertEqual(b'body\xc1', f.read()) 1791 1792 def test_bytes_body(self): 1793 self.conn.request("PUT", "/url", b"body\xc1") 1794 message, f = self.get_headers_and_fp() 1795 self.assertEqual("text/plain", message.get_content_type()) 1796 self.assertIsNone(message.get_charset()) 1797 self.assertEqual("5", message.get("content-length")) 1798 self.assertEqual(b'body\xc1', f.read()) 1799 1800 def test_text_file_body(self): 1801 self.addCleanup(support.unlink, support.TESTFN) 1802 with open(support.TESTFN, "w") as f: 1803 f.write("body") 1804 with open(support.TESTFN) as f: 1805 self.conn.request("PUT", "/url", f) 1806 message, f = self.get_headers_and_fp() 1807 self.assertEqual("text/plain", message.get_content_type()) 1808 self.assertIsNone(message.get_charset()) 1809 # No content-length will be determined for files; the body 1810 # will be sent using chunked transfer encoding instead. 1811 self.assertIsNone(message.get("content-length")) 1812 self.assertEqual("chunked", message.get("transfer-encoding")) 1813 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1814 1815 def test_binary_file_body(self): 1816 self.addCleanup(support.unlink, support.TESTFN) 1817 with open(support.TESTFN, "wb") as f: 1818 f.write(b"body\xc1") 1819 with open(support.TESTFN, "rb") as f: 1820 self.conn.request("PUT", "/url", f) 1821 message, f = self.get_headers_and_fp() 1822 self.assertEqual("text/plain", message.get_content_type()) 1823 self.assertIsNone(message.get_charset()) 1824 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1825 self.assertNotIn("Content-Length", message) 1826 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1827 1828 1829class HTTPResponseTest(TestCase): 1830 1831 def setUp(self): 1832 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1833 second-value\r\n\r\nText" 1834 sock = FakeSocket(body) 1835 self.resp = client.HTTPResponse(sock) 1836 self.resp.begin() 1837 1838 def test_getting_header(self): 1839 header = self.resp.getheader('My-Header') 1840 self.assertEqual(header, 'first-value, second-value') 1841 1842 header = self.resp.getheader('My-Header', 'some default') 1843 self.assertEqual(header, 'first-value, second-value') 1844 1845 def test_getting_nonexistent_header_with_string_default(self): 1846 header = self.resp.getheader('No-Such-Header', 'default-value') 1847 self.assertEqual(header, 'default-value') 1848 1849 def test_getting_nonexistent_header_with_iterable_default(self): 1850 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1851 self.assertEqual(header, 'default, values') 1852 1853 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1854 self.assertEqual(header, 'default, values') 1855 1856 def test_getting_nonexistent_header_without_default(self): 1857 header = self.resp.getheader('No-Such-Header') 1858 self.assertEqual(header, None) 1859 1860 def test_getting_header_defaultint(self): 1861 header = self.resp.getheader('No-Such-Header',default=42) 1862 self.assertEqual(header, 42) 1863 1864class TunnelTests(TestCase): 1865 def setUp(self): 1866 response_text = ( 1867 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1868 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1869 'Content-Length: 42\r\n\r\n' 1870 ) 1871 self.host = 'proxy.com' 1872 self.conn = client.HTTPConnection(self.host) 1873 self.conn._create_connection = self._create_connection(response_text) 1874 1875 def tearDown(self): 1876 self.conn.close() 1877 1878 def _create_connection(self, response_text): 1879 def create_connection(address, timeout=None, source_address=None): 1880 return FakeSocket(response_text, host=address[0], port=address[1]) 1881 return create_connection 1882 1883 def test_set_tunnel_host_port_headers(self): 1884 tunnel_host = 'destination.com' 1885 tunnel_port = 8888 1886 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 1887 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 1888 headers=tunnel_headers) 1889 self.conn.request('HEAD', '/', '') 1890 self.assertEqual(self.conn.sock.host, self.host) 1891 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1892 self.assertEqual(self.conn._tunnel_host, tunnel_host) 1893 self.assertEqual(self.conn._tunnel_port, tunnel_port) 1894 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 1895 1896 def test_disallow_set_tunnel_after_connect(self): 1897 # Once connected, we shouldn't be able to tunnel anymore 1898 self.conn.connect() 1899 self.assertRaises(RuntimeError, self.conn.set_tunnel, 1900 'destination.com') 1901 1902 def test_connect_with_tunnel(self): 1903 self.conn.set_tunnel('destination.com') 1904 self.conn.request('HEAD', '/', '') 1905 self.assertEqual(self.conn.sock.host, self.host) 1906 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1907 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1908 # issue22095 1909 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 1910 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1911 1912 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 1913 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 1914 1915 def test_connect_put_request(self): 1916 self.conn.set_tunnel('destination.com') 1917 self.conn.request('PUT', '/', '') 1918 self.assertEqual(self.conn.sock.host, self.host) 1919 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1920 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 1921 self.assertIn(b'Host: destination.com', self.conn.sock.data) 1922 1923 def test_tunnel_debuglog(self): 1924 expected_header = 'X-Dummy: 1' 1925 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 1926 1927 self.conn.set_debuglevel(1) 1928 self.conn._create_connection = self._create_connection(response_text) 1929 self.conn.set_tunnel('destination.com') 1930 1931 with support.captured_stdout() as output: 1932 self.conn.request('PUT', '/', '') 1933 lines = output.getvalue().splitlines() 1934 self.assertIn('header: {}'.format(expected_header), lines) 1935 1936 1937if __name__ == '__main__': 1938 unittest.main(verbosity=2) 1939