1''' 2Tests for fileinput module. 3Nick Mathewson 4''' 5import io 6import os 7import sys 8import re 9import fileinput 10import collections 11import builtins 12import tempfile 13import unittest 14 15try: 16 import bz2 17except ImportError: 18 bz2 = None 19try: 20 import gzip 21except ImportError: 22 gzip = None 23 24from io import BytesIO, StringIO 25from fileinput import FileInput, hook_encoded 26from pathlib import Path 27 28from test.support import verbose 29from test.support.os_helper import TESTFN 30from test.support.os_helper import unlink as safe_unlink 31from test.support import os_helper 32from test.support import warnings_helper 33from test import support 34from unittest import mock 35 36 37# The fileinput module has 2 interfaces: the FileInput class which does 38# all the work, and a few functions (input, etc.) that use a global _state 39# variable. 40 41class BaseTests: 42 # Write a content (str or bytes) to temp file, and return the 43 # temp file's name. 44 def writeTmp(self, content, *, mode='w'): # opening in text mode is the default 45 fd, name = tempfile.mkstemp() 46 self.addCleanup(os_helper.unlink, name) 47 encoding = None if "b" in mode else "utf-8" 48 with open(fd, mode, encoding=encoding) as f: 49 f.write(content) 50 return name 51 52class LineReader: 53 54 def __init__(self): 55 self._linesread = [] 56 57 @property 58 def linesread(self): 59 try: 60 return self._linesread[:] 61 finally: 62 self._linesread = [] 63 64 def openhook(self, filename, mode): 65 self.it = iter(filename.splitlines(True)) 66 return self 67 68 def readline(self, size=None): 69 line = next(self.it, '') 70 self._linesread.append(line) 71 return line 72 73 def readlines(self, hint=-1): 74 lines = [] 75 size = 0 76 while True: 77 line = self.readline() 78 if not line: 79 return lines 80 lines.append(line) 81 size += len(line) 82 if size >= hint: 83 return lines 84 85 def close(self): 86 pass 87 88class BufferSizesTests(BaseTests, unittest.TestCase): 89 def test_buffer_sizes(self): 90 91 t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15))) 92 t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10))) 93 t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5))) 94 t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1))) 95 96 pat = re.compile(r'LINE (\d+) OF FILE (\d+)') 97 98 if verbose: 99 print('1. Simple iteration') 100 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 101 lines = list(fi) 102 fi.close() 103 self.assertEqual(len(lines), 31) 104 self.assertEqual(lines[4], 'Line 5 of file 1\n') 105 self.assertEqual(lines[30], 'Line 1 of file 4\n') 106 self.assertEqual(fi.lineno(), 31) 107 self.assertEqual(fi.filename(), t4) 108 109 if verbose: 110 print('2. Status variables') 111 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 112 s = "x" 113 while s and s != 'Line 6 of file 2\n': 114 s = fi.readline() 115 self.assertEqual(fi.filename(), t2) 116 self.assertEqual(fi.lineno(), 21) 117 self.assertEqual(fi.filelineno(), 6) 118 self.assertFalse(fi.isfirstline()) 119 self.assertFalse(fi.isstdin()) 120 121 if verbose: 122 print('3. Nextfile') 123 fi.nextfile() 124 self.assertEqual(fi.readline(), 'Line 1 of file 3\n') 125 self.assertEqual(fi.lineno(), 22) 126 fi.close() 127 128 if verbose: 129 print('4. Stdin') 130 fi = FileInput(files=(t1, t2, t3, t4, '-'), encoding="utf-8") 131 savestdin = sys.stdin 132 try: 133 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") 134 lines = list(fi) 135 self.assertEqual(len(lines), 33) 136 self.assertEqual(lines[32], 'Line 2 of stdin\n') 137 self.assertEqual(fi.filename(), '<stdin>') 138 fi.nextfile() 139 finally: 140 sys.stdin = savestdin 141 142 if verbose: 143 print('5. Boundary conditions') 144 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 145 self.assertEqual(fi.lineno(), 0) 146 self.assertEqual(fi.filename(), None) 147 fi.nextfile() 148 self.assertEqual(fi.lineno(), 0) 149 self.assertEqual(fi.filename(), None) 150 151 if verbose: 152 print('6. Inplace') 153 savestdout = sys.stdout 154 try: 155 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, encoding="utf-8") 156 for line in fi: 157 line = line[:-1].upper() 158 print(line) 159 fi.close() 160 finally: 161 sys.stdout = savestdout 162 163 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 164 for line in fi: 165 self.assertEqual(line[-1], '\n') 166 m = pat.match(line[:-1]) 167 self.assertNotEqual(m, None) 168 self.assertEqual(int(m.group(1)), fi.filelineno()) 169 fi.close() 170 171class UnconditionallyRaise: 172 def __init__(self, exception_type): 173 self.exception_type = exception_type 174 self.invoked = False 175 def __call__(self, *args, **kwargs): 176 self.invoked = True 177 raise self.exception_type() 178 179class FileInputTests(BaseTests, unittest.TestCase): 180 181 def test_zero_byte_files(self): 182 t1 = self.writeTmp("") 183 t2 = self.writeTmp("") 184 t3 = self.writeTmp("The only line there is.\n") 185 t4 = self.writeTmp("") 186 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 187 188 line = fi.readline() 189 self.assertEqual(line, 'The only line there is.\n') 190 self.assertEqual(fi.lineno(), 1) 191 self.assertEqual(fi.filelineno(), 1) 192 self.assertEqual(fi.filename(), t3) 193 194 line = fi.readline() 195 self.assertFalse(line) 196 self.assertEqual(fi.lineno(), 1) 197 self.assertEqual(fi.filelineno(), 0) 198 self.assertEqual(fi.filename(), t4) 199 fi.close() 200 201 def test_files_that_dont_end_with_newline(self): 202 t1 = self.writeTmp("A\nB\nC") 203 t2 = self.writeTmp("D\nE\nF") 204 fi = FileInput(files=(t1, t2), encoding="utf-8") 205 lines = list(fi) 206 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 207 self.assertEqual(fi.filelineno(), 3) 208 self.assertEqual(fi.lineno(), 6) 209 210## def test_unicode_filenames(self): 211## # XXX A unicode string is always returned by writeTmp. 212## # So is this needed? 213## t1 = self.writeTmp("A\nB") 214## encoding = sys.getfilesystemencoding() 215## if encoding is None: 216## encoding = 'ascii' 217## fi = FileInput(files=str(t1, encoding), encoding="utf-8") 218## lines = list(fi) 219## self.assertEqual(lines, ["A\n", "B"]) 220 221 def test_fileno(self): 222 t1 = self.writeTmp("A\nB") 223 t2 = self.writeTmp("C\nD") 224 fi = FileInput(files=(t1, t2), encoding="utf-8") 225 self.assertEqual(fi.fileno(), -1) 226 line = next(fi) 227 self.assertNotEqual(fi.fileno(), -1) 228 fi.nextfile() 229 self.assertEqual(fi.fileno(), -1) 230 line = list(fi) 231 self.assertEqual(fi.fileno(), -1) 232 233 def test_opening_mode(self): 234 try: 235 # invalid mode, should raise ValueError 236 fi = FileInput(mode="w", encoding="utf-8") 237 self.fail("FileInput should reject invalid mode argument") 238 except ValueError: 239 pass 240 # try opening in universal newline mode 241 t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb") 242 with warnings_helper.check_warnings(('', DeprecationWarning)): 243 fi = FileInput(files=t1, mode="U", encoding="utf-8") 244 with warnings_helper.check_warnings(('', DeprecationWarning)): 245 lines = list(fi) 246 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) 247 248 def test_stdin_binary_mode(self): 249 with mock.patch('sys.stdin') as m_stdin: 250 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam') 251 fi = FileInput(files=['-'], mode='rb') 252 lines = list(fi) 253 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 254 255 def test_detached_stdin_binary_mode(self): 256 orig_stdin = sys.stdin 257 try: 258 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam') 259 self.assertFalse(hasattr(sys.stdin, 'buffer')) 260 fi = FileInput(files=['-'], mode='rb') 261 lines = list(fi) 262 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 263 finally: 264 sys.stdin = orig_stdin 265 266 def test_file_opening_hook(self): 267 try: 268 # cannot use openhook and inplace mode 269 fi = FileInput(inplace=1, openhook=lambda f, m: None) 270 self.fail("FileInput should raise if both inplace " 271 "and openhook arguments are given") 272 except ValueError: 273 pass 274 try: 275 fi = FileInput(openhook=1) 276 self.fail("FileInput should check openhook for being callable") 277 except ValueError: 278 pass 279 280 class CustomOpenHook: 281 def __init__(self): 282 self.invoked = False 283 def __call__(self, *args, **kargs): 284 self.invoked = True 285 return open(*args, encoding="utf-8") 286 287 t = self.writeTmp("\n") 288 custom_open_hook = CustomOpenHook() 289 with FileInput([t], openhook=custom_open_hook) as fi: 290 fi.readline() 291 self.assertTrue(custom_open_hook.invoked, "openhook not invoked") 292 293 def test_readline(self): 294 with open(TESTFN, 'wb') as f: 295 f.write(b'A\nB\r\nC\r') 296 # Fill TextIOWrapper buffer. 297 f.write(b'123456789\n' * 1000) 298 # Issue #20501: readline() shouldn't read whole file. 299 f.write(b'\x80') 300 self.addCleanup(safe_unlink, TESTFN) 301 302 with FileInput(files=TESTFN, 303 openhook=hook_encoded('ascii')) as fi: 304 try: 305 self.assertEqual(fi.readline(), 'A\n') 306 self.assertEqual(fi.readline(), 'B\n') 307 self.assertEqual(fi.readline(), 'C\n') 308 except UnicodeDecodeError: 309 self.fail('Read to end of file') 310 with self.assertRaises(UnicodeDecodeError): 311 # Read to the end of file. 312 list(fi) 313 self.assertEqual(fi.readline(), '') 314 self.assertEqual(fi.readline(), '') 315 316 def test_readline_binary_mode(self): 317 with open(TESTFN, 'wb') as f: 318 f.write(b'A\nB\r\nC\rD') 319 self.addCleanup(safe_unlink, TESTFN) 320 321 with FileInput(files=TESTFN, mode='rb') as fi: 322 self.assertEqual(fi.readline(), b'A\n') 323 self.assertEqual(fi.readline(), b'B\r\n') 324 self.assertEqual(fi.readline(), b'C\rD') 325 # Read to the end of file. 326 self.assertEqual(fi.readline(), b'') 327 self.assertEqual(fi.readline(), b'') 328 329 def test_inplace_binary_write_mode(self): 330 temp_file = self.writeTmp(b'Initial text.', mode='wb') 331 with FileInput(temp_file, mode='rb', inplace=True) as fobj: 332 line = fobj.readline() 333 self.assertEqual(line, b'Initial text.') 334 # print() cannot be used with files opened in binary mode. 335 sys.stdout.write(b'New line.') 336 with open(temp_file, 'rb') as f: 337 self.assertEqual(f.read(), b'New line.') 338 339 def test_file_hook_backward_compatibility(self): 340 def old_hook(filename, mode): 341 return io.StringIO("I used to receive only filename and mode") 342 t = self.writeTmp("\n") 343 with FileInput([t], openhook=old_hook) as fi: 344 result = fi.readline() 345 self.assertEqual(result, "I used to receive only filename and mode") 346 347 def test_context_manager(self): 348 t1 = self.writeTmp("A\nB\nC") 349 t2 = self.writeTmp("D\nE\nF") 350 with FileInput(files=(t1, t2), encoding="utf-8") as fi: 351 lines = list(fi) 352 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 353 self.assertEqual(fi.filelineno(), 3) 354 self.assertEqual(fi.lineno(), 6) 355 self.assertEqual(fi._files, ()) 356 357 def test_close_on_exception(self): 358 t1 = self.writeTmp("") 359 try: 360 with FileInput(files=t1, encoding="utf-8") as fi: 361 raise OSError 362 except OSError: 363 self.assertEqual(fi._files, ()) 364 365 def test_empty_files_list_specified_to_constructor(self): 366 with FileInput(files=[], encoding="utf-8") as fi: 367 self.assertEqual(fi._files, ('-',)) 368 369 @warnings_helper.ignore_warnings(category=DeprecationWarning) 370 def test__getitem__(self): 371 """Tests invoking FileInput.__getitem__() with the current 372 line number""" 373 t = self.writeTmp("line1\nline2\n") 374 with FileInput(files=[t], encoding="utf-8") as fi: 375 retval1 = fi[0] 376 self.assertEqual(retval1, "line1\n") 377 retval2 = fi[1] 378 self.assertEqual(retval2, "line2\n") 379 380 def test__getitem___deprecation(self): 381 t = self.writeTmp("line1\nline2\n") 382 with self.assertWarnsRegex(DeprecationWarning, 383 r'Use iterator protocol instead'): 384 with FileInput(files=[t]) as fi: 385 self.assertEqual(fi[0], "line1\n") 386 387 @warnings_helper.ignore_warnings(category=DeprecationWarning) 388 def test__getitem__invalid_key(self): 389 """Tests invoking FileInput.__getitem__() with an index unequal to 390 the line number""" 391 t = self.writeTmp("line1\nline2\n") 392 with FileInput(files=[t], encoding="utf-8") as fi: 393 with self.assertRaises(RuntimeError) as cm: 394 fi[1] 395 self.assertEqual(cm.exception.args, ("accessing lines out of order",)) 396 397 @warnings_helper.ignore_warnings(category=DeprecationWarning) 398 def test__getitem__eof(self): 399 """Tests invoking FileInput.__getitem__() with the line number but at 400 end-of-input""" 401 t = self.writeTmp('') 402 with FileInput(files=[t], encoding="utf-8") as fi: 403 with self.assertRaises(IndexError) as cm: 404 fi[0] 405 self.assertEqual(cm.exception.args, ("end of input reached",)) 406 407 def test_nextfile_oserror_deleting_backup(self): 408 """Tests invoking FileInput.nextfile() when the attempt to delete 409 the backup file would raise OSError. This error is expected to be 410 silently ignored""" 411 412 os_unlink_orig = os.unlink 413 os_unlink_replacement = UnconditionallyRaise(OSError) 414 try: 415 t = self.writeTmp("\n") 416 self.addCleanup(safe_unlink, t + '.bak') 417 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 418 next(fi) # make sure the file is opened 419 os.unlink = os_unlink_replacement 420 fi.nextfile() 421 finally: 422 os.unlink = os_unlink_orig 423 424 # sanity check to make sure that our test scenario was actually hit 425 self.assertTrue(os_unlink_replacement.invoked, 426 "os.unlink() was not invoked") 427 428 def test_readline_os_fstat_raises_OSError(self): 429 """Tests invoking FileInput.readline() when os.fstat() raises OSError. 430 This exception should be silently discarded.""" 431 432 os_fstat_orig = os.fstat 433 os_fstat_replacement = UnconditionallyRaise(OSError) 434 try: 435 t = self.writeTmp("\n") 436 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 437 os.fstat = os_fstat_replacement 438 fi.readline() 439 finally: 440 os.fstat = os_fstat_orig 441 442 # sanity check to make sure that our test scenario was actually hit 443 self.assertTrue(os_fstat_replacement.invoked, 444 "os.fstat() was not invoked") 445 446 def test_readline_os_chmod_raises_OSError(self): 447 """Tests invoking FileInput.readline() when os.chmod() raises OSError. 448 This exception should be silently discarded.""" 449 450 os_chmod_orig = os.chmod 451 os_chmod_replacement = UnconditionallyRaise(OSError) 452 try: 453 t = self.writeTmp("\n") 454 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 455 os.chmod = os_chmod_replacement 456 fi.readline() 457 finally: 458 os.chmod = os_chmod_orig 459 460 # sanity check to make sure that our test scenario was actually hit 461 self.assertTrue(os_chmod_replacement.invoked, 462 "os.fstat() was not invoked") 463 464 def test_fileno_when_ValueError_raised(self): 465 class FilenoRaisesValueError(UnconditionallyRaise): 466 def __init__(self): 467 UnconditionallyRaise.__init__(self, ValueError) 468 def fileno(self): 469 self.__call__() 470 471 unconditionally_raise_ValueError = FilenoRaisesValueError() 472 t = self.writeTmp("\n") 473 with FileInput(files=[t], encoding="utf-8") as fi: 474 file_backup = fi._file 475 try: 476 fi._file = unconditionally_raise_ValueError 477 result = fi.fileno() 478 finally: 479 fi._file = file_backup # make sure the file gets cleaned up 480 481 # sanity check to make sure that our test scenario was actually hit 482 self.assertTrue(unconditionally_raise_ValueError.invoked, 483 "_file.fileno() was not invoked") 484 485 self.assertEqual(result, -1, "fileno() should return -1") 486 487 def test_readline_buffering(self): 488 src = LineReader() 489 with FileInput(files=['line1\nline2', 'line3\n'], 490 openhook=src.openhook) as fi: 491 self.assertEqual(src.linesread, []) 492 self.assertEqual(fi.readline(), 'line1\n') 493 self.assertEqual(src.linesread, ['line1\n']) 494 self.assertEqual(fi.readline(), 'line2') 495 self.assertEqual(src.linesread, ['line2']) 496 self.assertEqual(fi.readline(), 'line3\n') 497 self.assertEqual(src.linesread, ['', 'line3\n']) 498 self.assertEqual(fi.readline(), '') 499 self.assertEqual(src.linesread, ['']) 500 self.assertEqual(fi.readline(), '') 501 self.assertEqual(src.linesread, []) 502 503 def test_iteration_buffering(self): 504 src = LineReader() 505 with FileInput(files=['line1\nline2', 'line3\n'], 506 openhook=src.openhook) as fi: 507 self.assertEqual(src.linesread, []) 508 self.assertEqual(next(fi), 'line1\n') 509 self.assertEqual(src.linesread, ['line1\n']) 510 self.assertEqual(next(fi), 'line2') 511 self.assertEqual(src.linesread, ['line2']) 512 self.assertEqual(next(fi), 'line3\n') 513 self.assertEqual(src.linesread, ['', 'line3\n']) 514 self.assertRaises(StopIteration, next, fi) 515 self.assertEqual(src.linesread, ['']) 516 self.assertRaises(StopIteration, next, fi) 517 self.assertEqual(src.linesread, []) 518 519 def test_pathlib_file(self): 520 t1 = Path(self.writeTmp("Pathlib file.")) 521 with FileInput(t1, encoding="utf-8") as fi: 522 line = fi.readline() 523 self.assertEqual(line, 'Pathlib file.') 524 self.assertEqual(fi.lineno(), 1) 525 self.assertEqual(fi.filelineno(), 1) 526 self.assertEqual(fi.filename(), os.fspath(t1)) 527 528 def test_pathlib_file_inplace(self): 529 t1 = Path(self.writeTmp('Pathlib file.')) 530 with FileInput(t1, inplace=True, encoding="utf-8") as fi: 531 line = fi.readline() 532 self.assertEqual(line, 'Pathlib file.') 533 print('Modified %s' % line) 534 with open(t1, encoding="utf-8") as f: 535 self.assertEqual(f.read(), 'Modified Pathlib file.\n') 536 537 538class MockFileInput: 539 """A class that mocks out fileinput.FileInput for use during unit tests""" 540 541 def __init__(self, files=None, inplace=False, backup="", *, 542 mode="r", openhook=None, encoding=None, errors=None): 543 self.files = files 544 self.inplace = inplace 545 self.backup = backup 546 self.mode = mode 547 self.openhook = openhook 548 self.encoding = encoding 549 self.errors = errors 550 self._file = None 551 self.invocation_counts = collections.defaultdict(lambda: 0) 552 self.return_values = {} 553 554 def close(self): 555 self.invocation_counts["close"] += 1 556 557 def nextfile(self): 558 self.invocation_counts["nextfile"] += 1 559 return self.return_values["nextfile"] 560 561 def filename(self): 562 self.invocation_counts["filename"] += 1 563 return self.return_values["filename"] 564 565 def lineno(self): 566 self.invocation_counts["lineno"] += 1 567 return self.return_values["lineno"] 568 569 def filelineno(self): 570 self.invocation_counts["filelineno"] += 1 571 return self.return_values["filelineno"] 572 573 def fileno(self): 574 self.invocation_counts["fileno"] += 1 575 return self.return_values["fileno"] 576 577 def isfirstline(self): 578 self.invocation_counts["isfirstline"] += 1 579 return self.return_values["isfirstline"] 580 581 def isstdin(self): 582 self.invocation_counts["isstdin"] += 1 583 return self.return_values["isstdin"] 584 585class BaseFileInputGlobalMethodsTest(unittest.TestCase): 586 """Base class for unit tests for the global function of 587 the fileinput module.""" 588 589 def setUp(self): 590 self._orig_state = fileinput._state 591 self._orig_FileInput = fileinput.FileInput 592 fileinput.FileInput = MockFileInput 593 594 def tearDown(self): 595 fileinput.FileInput = self._orig_FileInput 596 fileinput._state = self._orig_state 597 598 def assertExactlyOneInvocation(self, mock_file_input, method_name): 599 # assert that the method with the given name was invoked once 600 actual_count = mock_file_input.invocation_counts[method_name] 601 self.assertEqual(actual_count, 1, method_name) 602 # assert that no other unexpected methods were invoked 603 actual_total_count = len(mock_file_input.invocation_counts) 604 self.assertEqual(actual_total_count, 1) 605 606class Test_fileinput_input(BaseFileInputGlobalMethodsTest): 607 """Unit tests for fileinput.input()""" 608 609 def test_state_is_not_None_and_state_file_is_not_None(self): 610 """Tests invoking fileinput.input() when fileinput._state is not None 611 and its _file attribute is also not None. Expect RuntimeError to 612 be raised with a meaningful error message and for fileinput._state 613 to *not* be modified.""" 614 instance = MockFileInput() 615 instance._file = object() 616 fileinput._state = instance 617 with self.assertRaises(RuntimeError) as cm: 618 fileinput.input() 619 self.assertEqual(("input() already active",), cm.exception.args) 620 self.assertIs(instance, fileinput._state, "fileinput._state") 621 622 def test_state_is_not_None_and_state_file_is_None(self): 623 """Tests invoking fileinput.input() when fileinput._state is not None 624 but its _file attribute *is* None. Expect it to create and return 625 a new fileinput.FileInput object with all method parameters passed 626 explicitly to the __init__() method; also ensure that 627 fileinput._state is set to the returned instance.""" 628 instance = MockFileInput() 629 instance._file = None 630 fileinput._state = instance 631 self.do_test_call_input() 632 633 def test_state_is_None(self): 634 """Tests invoking fileinput.input() when fileinput._state is None 635 Expect it to create and return a new fileinput.FileInput object 636 with all method parameters passed explicitly to the __init__() 637 method; also ensure that fileinput._state is set to the returned 638 instance.""" 639 fileinput._state = None 640 self.do_test_call_input() 641 642 def do_test_call_input(self): 643 """Tests that fileinput.input() creates a new fileinput.FileInput 644 object, passing the given parameters unmodified to 645 fileinput.FileInput.__init__(). Note that this test depends on the 646 monkey patching of fileinput.FileInput done by setUp().""" 647 files = object() 648 inplace = object() 649 backup = object() 650 mode = object() 651 openhook = object() 652 encoding = object() 653 654 # call fileinput.input() with different values for each argument 655 result = fileinput.input(files=files, inplace=inplace, backup=backup, 656 mode=mode, openhook=openhook, encoding=encoding) 657 658 # ensure fileinput._state was set to the returned object 659 self.assertIs(result, fileinput._state, "fileinput._state") 660 661 # ensure the parameters to fileinput.input() were passed directly 662 # to FileInput.__init__() 663 self.assertIs(files, result.files, "files") 664 self.assertIs(inplace, result.inplace, "inplace") 665 self.assertIs(backup, result.backup, "backup") 666 self.assertIs(mode, result.mode, "mode") 667 self.assertIs(openhook, result.openhook, "openhook") 668 669class Test_fileinput_close(BaseFileInputGlobalMethodsTest): 670 """Unit tests for fileinput.close()""" 671 672 def test_state_is_None(self): 673 """Tests that fileinput.close() does nothing if fileinput._state 674 is None""" 675 fileinput._state = None 676 fileinput.close() 677 self.assertIsNone(fileinput._state) 678 679 def test_state_is_not_None(self): 680 """Tests that fileinput.close() invokes close() on fileinput._state 681 and sets _state=None""" 682 instance = MockFileInput() 683 fileinput._state = instance 684 fileinput.close() 685 self.assertExactlyOneInvocation(instance, "close") 686 self.assertIsNone(fileinput._state) 687 688class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): 689 """Unit tests for fileinput.nextfile()""" 690 691 def test_state_is_None(self): 692 """Tests fileinput.nextfile() when fileinput._state is None. 693 Ensure that it raises RuntimeError with a meaningful error message 694 and does not modify fileinput._state""" 695 fileinput._state = None 696 with self.assertRaises(RuntimeError) as cm: 697 fileinput.nextfile() 698 self.assertEqual(("no active input()",), cm.exception.args) 699 self.assertIsNone(fileinput._state) 700 701 def test_state_is_not_None(self): 702 """Tests fileinput.nextfile() when fileinput._state is not None. 703 Ensure that it invokes fileinput._state.nextfile() exactly once, 704 returns whatever it returns, and does not modify fileinput._state 705 to point to a different object.""" 706 nextfile_retval = object() 707 instance = MockFileInput() 708 instance.return_values["nextfile"] = nextfile_retval 709 fileinput._state = instance 710 retval = fileinput.nextfile() 711 self.assertExactlyOneInvocation(instance, "nextfile") 712 self.assertIs(retval, nextfile_retval) 713 self.assertIs(fileinput._state, instance) 714 715class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): 716 """Unit tests for fileinput.filename()""" 717 718 def test_state_is_None(self): 719 """Tests fileinput.filename() when fileinput._state is None. 720 Ensure that it raises RuntimeError with a meaningful error message 721 and does not modify fileinput._state""" 722 fileinput._state = None 723 with self.assertRaises(RuntimeError) as cm: 724 fileinput.filename() 725 self.assertEqual(("no active input()",), cm.exception.args) 726 self.assertIsNone(fileinput._state) 727 728 def test_state_is_not_None(self): 729 """Tests fileinput.filename() when fileinput._state is not None. 730 Ensure that it invokes fileinput._state.filename() exactly once, 731 returns whatever it returns, and does not modify fileinput._state 732 to point to a different object.""" 733 filename_retval = object() 734 instance = MockFileInput() 735 instance.return_values["filename"] = filename_retval 736 fileinput._state = instance 737 retval = fileinput.filename() 738 self.assertExactlyOneInvocation(instance, "filename") 739 self.assertIs(retval, filename_retval) 740 self.assertIs(fileinput._state, instance) 741 742class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): 743 """Unit tests for fileinput.lineno()""" 744 745 def test_state_is_None(self): 746 """Tests fileinput.lineno() when fileinput._state is None. 747 Ensure that it raises RuntimeError with a meaningful error message 748 and does not modify fileinput._state""" 749 fileinput._state = None 750 with self.assertRaises(RuntimeError) as cm: 751 fileinput.lineno() 752 self.assertEqual(("no active input()",), cm.exception.args) 753 self.assertIsNone(fileinput._state) 754 755 def test_state_is_not_None(self): 756 """Tests fileinput.lineno() when fileinput._state is not None. 757 Ensure that it invokes fileinput._state.lineno() exactly once, 758 returns whatever it returns, and does not modify fileinput._state 759 to point to a different object.""" 760 lineno_retval = object() 761 instance = MockFileInput() 762 instance.return_values["lineno"] = lineno_retval 763 fileinput._state = instance 764 retval = fileinput.lineno() 765 self.assertExactlyOneInvocation(instance, "lineno") 766 self.assertIs(retval, lineno_retval) 767 self.assertIs(fileinput._state, instance) 768 769class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): 770 """Unit tests for fileinput.filelineno()""" 771 772 def test_state_is_None(self): 773 """Tests fileinput.filelineno() when fileinput._state is None. 774 Ensure that it raises RuntimeError with a meaningful error message 775 and does not modify fileinput._state""" 776 fileinput._state = None 777 with self.assertRaises(RuntimeError) as cm: 778 fileinput.filelineno() 779 self.assertEqual(("no active input()",), cm.exception.args) 780 self.assertIsNone(fileinput._state) 781 782 def test_state_is_not_None(self): 783 """Tests fileinput.filelineno() when fileinput._state is not None. 784 Ensure that it invokes fileinput._state.filelineno() exactly once, 785 returns whatever it returns, and does not modify fileinput._state 786 to point to a different object.""" 787 filelineno_retval = object() 788 instance = MockFileInput() 789 instance.return_values["filelineno"] = filelineno_retval 790 fileinput._state = instance 791 retval = fileinput.filelineno() 792 self.assertExactlyOneInvocation(instance, "filelineno") 793 self.assertIs(retval, filelineno_retval) 794 self.assertIs(fileinput._state, instance) 795 796class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): 797 """Unit tests for fileinput.fileno()""" 798 799 def test_state_is_None(self): 800 """Tests fileinput.fileno() when fileinput._state is None. 801 Ensure that it raises RuntimeError with a meaningful error message 802 and does not modify fileinput._state""" 803 fileinput._state = None 804 with self.assertRaises(RuntimeError) as cm: 805 fileinput.fileno() 806 self.assertEqual(("no active input()",), cm.exception.args) 807 self.assertIsNone(fileinput._state) 808 809 def test_state_is_not_None(self): 810 """Tests fileinput.fileno() when fileinput._state is not None. 811 Ensure that it invokes fileinput._state.fileno() exactly once, 812 returns whatever it returns, and does not modify fileinput._state 813 to point to a different object.""" 814 fileno_retval = object() 815 instance = MockFileInput() 816 instance.return_values["fileno"] = fileno_retval 817 instance.fileno_retval = fileno_retval 818 fileinput._state = instance 819 retval = fileinput.fileno() 820 self.assertExactlyOneInvocation(instance, "fileno") 821 self.assertIs(retval, fileno_retval) 822 self.assertIs(fileinput._state, instance) 823 824class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): 825 """Unit tests for fileinput.isfirstline()""" 826 827 def test_state_is_None(self): 828 """Tests fileinput.isfirstline() when fileinput._state is None. 829 Ensure that it raises RuntimeError with a meaningful error message 830 and does not modify fileinput._state""" 831 fileinput._state = None 832 with self.assertRaises(RuntimeError) as cm: 833 fileinput.isfirstline() 834 self.assertEqual(("no active input()",), cm.exception.args) 835 self.assertIsNone(fileinput._state) 836 837 def test_state_is_not_None(self): 838 """Tests fileinput.isfirstline() when fileinput._state is not None. 839 Ensure that it invokes fileinput._state.isfirstline() exactly once, 840 returns whatever it returns, and does not modify fileinput._state 841 to point to a different object.""" 842 isfirstline_retval = object() 843 instance = MockFileInput() 844 instance.return_values["isfirstline"] = isfirstline_retval 845 fileinput._state = instance 846 retval = fileinput.isfirstline() 847 self.assertExactlyOneInvocation(instance, "isfirstline") 848 self.assertIs(retval, isfirstline_retval) 849 self.assertIs(fileinput._state, instance) 850 851class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): 852 """Unit tests for fileinput.isstdin()""" 853 854 def test_state_is_None(self): 855 """Tests fileinput.isstdin() when fileinput._state is None. 856 Ensure that it raises RuntimeError with a meaningful error message 857 and does not modify fileinput._state""" 858 fileinput._state = None 859 with self.assertRaises(RuntimeError) as cm: 860 fileinput.isstdin() 861 self.assertEqual(("no active input()",), cm.exception.args) 862 self.assertIsNone(fileinput._state) 863 864 def test_state_is_not_None(self): 865 """Tests fileinput.isstdin() when fileinput._state is not None. 866 Ensure that it invokes fileinput._state.isstdin() exactly once, 867 returns whatever it returns, and does not modify fileinput._state 868 to point to a different object.""" 869 isstdin_retval = object() 870 instance = MockFileInput() 871 instance.return_values["isstdin"] = isstdin_retval 872 fileinput._state = instance 873 retval = fileinput.isstdin() 874 self.assertExactlyOneInvocation(instance, "isstdin") 875 self.assertIs(retval, isstdin_retval) 876 self.assertIs(fileinput._state, instance) 877 878class InvocationRecorder: 879 880 def __init__(self): 881 self.invocation_count = 0 882 883 def __call__(self, *args, **kwargs): 884 self.invocation_count += 1 885 self.last_invocation = (args, kwargs) 886 return io.BytesIO(b'some bytes') 887 888 889class Test_hook_compressed(unittest.TestCase): 890 """Unit tests for fileinput.hook_compressed()""" 891 892 def setUp(self): 893 self.fake_open = InvocationRecorder() 894 895 def test_empty_string(self): 896 self.do_test_use_builtin_open("", 1) 897 898 def test_no_ext(self): 899 self.do_test_use_builtin_open("abcd", 2) 900 901 @unittest.skipUnless(gzip, "Requires gzip and zlib") 902 def test_gz_ext_fake(self): 903 original_open = gzip.open 904 gzip.open = self.fake_open 905 try: 906 result = fileinput.hook_compressed("test.gz", "3") 907 finally: 908 gzip.open = original_open 909 910 self.assertEqual(self.fake_open.invocation_count, 1) 911 self.assertEqual(self.fake_open.last_invocation, (("test.gz", "3"), {})) 912 913 @unittest.skipUnless(gzip, "Requires gzip and zlib") 914 def test_gz_with_encoding_fake(self): 915 original_open = gzip.open 916 gzip.open = lambda filename, mode: io.BytesIO(b'Ex-binary string') 917 try: 918 result = fileinput.hook_compressed("test.gz", "3", encoding="utf-8") 919 finally: 920 gzip.open = original_open 921 self.assertEqual(list(result), ['Ex-binary string']) 922 923 @unittest.skipUnless(bz2, "Requires bz2") 924 def test_bz2_ext_fake(self): 925 original_open = bz2.BZ2File 926 bz2.BZ2File = self.fake_open 927 try: 928 result = fileinput.hook_compressed("test.bz2", "4") 929 finally: 930 bz2.BZ2File = original_open 931 932 self.assertEqual(self.fake_open.invocation_count, 1) 933 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", "4"), {})) 934 935 def test_blah_ext(self): 936 self.do_test_use_builtin_open("abcd.blah", "5") 937 938 def test_gz_ext_builtin(self): 939 self.do_test_use_builtin_open("abcd.Gz", "6") 940 941 def test_bz2_ext_builtin(self): 942 self.do_test_use_builtin_open("abcd.Bz2", "7") 943 944 def do_test_use_builtin_open(self, filename, mode): 945 original_open = self.replace_builtin_open(self.fake_open) 946 try: 947 result = fileinput.hook_compressed(filename, mode) 948 finally: 949 self.replace_builtin_open(original_open) 950 951 self.assertEqual(self.fake_open.invocation_count, 1) 952 self.assertEqual(self.fake_open.last_invocation, 953 ((filename, mode), {'encoding': 'locale', 'errors': None})) 954 955 @staticmethod 956 def replace_builtin_open(new_open_func): 957 original_open = builtins.open 958 builtins.open = new_open_func 959 return original_open 960 961class Test_hook_encoded(unittest.TestCase): 962 """Unit tests for fileinput.hook_encoded()""" 963 964 def test(self): 965 encoding = object() 966 errors = object() 967 result = fileinput.hook_encoded(encoding, errors=errors) 968 969 fake_open = InvocationRecorder() 970 original_open = builtins.open 971 builtins.open = fake_open 972 try: 973 filename = object() 974 mode = object() 975 open_result = result(filename, mode) 976 finally: 977 builtins.open = original_open 978 979 self.assertEqual(fake_open.invocation_count, 1) 980 981 args, kwargs = fake_open.last_invocation 982 self.assertIs(args[0], filename) 983 self.assertIs(args[1], mode) 984 self.assertIs(kwargs.pop('encoding'), encoding) 985 self.assertIs(kwargs.pop('errors'), errors) 986 self.assertFalse(kwargs) 987 988 def test_errors(self): 989 with open(TESTFN, 'wb') as f: 990 f.write(b'\x80abc') 991 self.addCleanup(safe_unlink, TESTFN) 992 993 def check(errors, expected_lines): 994 with FileInput(files=TESTFN, mode='r', 995 openhook=hook_encoded('utf-8', errors=errors)) as fi: 996 lines = list(fi) 997 self.assertEqual(lines, expected_lines) 998 999 check('ignore', ['abc']) 1000 with self.assertRaises(UnicodeDecodeError): 1001 check('strict', ['abc']) 1002 check('replace', ['\ufffdabc']) 1003 check('backslashreplace', ['\\x80abc']) 1004 1005 def test_modes(self): 1006 with open(TESTFN, 'wb') as f: 1007 # UTF-7 is a convenient, seldom used encoding 1008 f.write(b'A\nB\r\nC\rD+IKw-') 1009 self.addCleanup(safe_unlink, TESTFN) 1010 1011 def check(mode, expected_lines): 1012 with FileInput(files=TESTFN, mode=mode, 1013 openhook=hook_encoded('utf-7')) as fi: 1014 lines = list(fi) 1015 self.assertEqual(lines, expected_lines) 1016 1017 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1018 with self.assertWarns(DeprecationWarning): 1019 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1020 with self.assertWarns(DeprecationWarning): 1021 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1022 with self.assertRaises(ValueError): 1023 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac']) 1024 1025 1026class MiscTest(unittest.TestCase): 1027 1028 def test_all(self): 1029 support.check__all__(self, fileinput) 1030 1031 1032if __name__ == "__main__": 1033 unittest.main() 1034