1''' 2Tests for fileinput module. 3Nick Mathewson 4''' 5import os 6import sys 7import re 8import fileinput 9import collections 10import builtins 11import tempfile 12import unittest 13 14try: 15 import bz2 16except ImportError: 17 bz2 = None 18try: 19 import gzip 20except ImportError: 21 gzip = None 22 23from io import BytesIO, StringIO 24from fileinput import FileInput, hook_encoded 25from pathlib import Path 26 27from test.support import verbose, TESTFN, check_warnings 28from test.support import unlink as safe_unlink 29from test import support 30from unittest import mock 31 32 33# The fileinput module has 2 interfaces: the FileInput class which does 34# all the work, and a few functions (input, etc.) that use a global _state 35# variable. 36 37class BaseTests: 38 # Write a content (str or bytes) to temp file, and return the 39 # temp file's name. 40 def writeTmp(self, content, *, mode='w'): # opening in text mode is the default 41 fd, name = tempfile.mkstemp() 42 self.addCleanup(support.unlink, name) 43 with open(fd, mode) as f: 44 f.write(content) 45 return name 46 47class LineReader: 48 49 def __init__(self): 50 self._linesread = [] 51 52 @property 53 def linesread(self): 54 try: 55 return self._linesread[:] 56 finally: 57 self._linesread = [] 58 59 def openhook(self, filename, mode): 60 self.it = iter(filename.splitlines(True)) 61 return self 62 63 def readline(self, size=None): 64 line = next(self.it, '') 65 self._linesread.append(line) 66 return line 67 68 def readlines(self, hint=-1): 69 lines = [] 70 size = 0 71 while True: 72 line = self.readline() 73 if not line: 74 return lines 75 lines.append(line) 76 size += len(line) 77 if size >= hint: 78 return lines 79 80 def close(self): 81 pass 82 83class BufferSizesTests(BaseTests, unittest.TestCase): 84 def test_buffer_sizes(self): 85 86 t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15))) 87 t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10))) 88 t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5))) 89 t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1))) 90 91 pat = re.compile(r'LINE (\d+) OF FILE (\d+)') 92 93 if verbose: 94 print('1. Simple iteration') 95 fi = FileInput(files=(t1, t2, t3, t4)) 96 lines = list(fi) 97 fi.close() 98 self.assertEqual(len(lines), 31) 99 self.assertEqual(lines[4], 'Line 5 of file 1\n') 100 self.assertEqual(lines[30], 'Line 1 of file 4\n') 101 self.assertEqual(fi.lineno(), 31) 102 self.assertEqual(fi.filename(), t4) 103 104 if verbose: 105 print('2. Status variables') 106 fi = FileInput(files=(t1, t2, t3, t4)) 107 s = "x" 108 while s and s != 'Line 6 of file 2\n': 109 s = fi.readline() 110 self.assertEqual(fi.filename(), t2) 111 self.assertEqual(fi.lineno(), 21) 112 self.assertEqual(fi.filelineno(), 6) 113 self.assertFalse(fi.isfirstline()) 114 self.assertFalse(fi.isstdin()) 115 116 if verbose: 117 print('3. Nextfile') 118 fi.nextfile() 119 self.assertEqual(fi.readline(), 'Line 1 of file 3\n') 120 self.assertEqual(fi.lineno(), 22) 121 fi.close() 122 123 if verbose: 124 print('4. Stdin') 125 fi = FileInput(files=(t1, t2, t3, t4, '-')) 126 savestdin = sys.stdin 127 try: 128 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") 129 lines = list(fi) 130 self.assertEqual(len(lines), 33) 131 self.assertEqual(lines[32], 'Line 2 of stdin\n') 132 self.assertEqual(fi.filename(), '<stdin>') 133 fi.nextfile() 134 finally: 135 sys.stdin = savestdin 136 137 if verbose: 138 print('5. Boundary conditions') 139 fi = FileInput(files=(t1, t2, t3, t4)) 140 self.assertEqual(fi.lineno(), 0) 141 self.assertEqual(fi.filename(), None) 142 fi.nextfile() 143 self.assertEqual(fi.lineno(), 0) 144 self.assertEqual(fi.filename(), None) 145 146 if verbose: 147 print('6. Inplace') 148 savestdout = sys.stdout 149 try: 150 fi = FileInput(files=(t1, t2, t3, t4), inplace=1) 151 for line in fi: 152 line = line[:-1].upper() 153 print(line) 154 fi.close() 155 finally: 156 sys.stdout = savestdout 157 158 fi = FileInput(files=(t1, t2, t3, t4)) 159 for line in fi: 160 self.assertEqual(line[-1], '\n') 161 m = pat.match(line[:-1]) 162 self.assertNotEqual(m, None) 163 self.assertEqual(int(m.group(1)), fi.filelineno()) 164 fi.close() 165 166class UnconditionallyRaise: 167 def __init__(self, exception_type): 168 self.exception_type = exception_type 169 self.invoked = False 170 def __call__(self, *args, **kwargs): 171 self.invoked = True 172 raise self.exception_type() 173 174class FileInputTests(BaseTests, unittest.TestCase): 175 176 def test_zero_byte_files(self): 177 t1 = self.writeTmp("") 178 t2 = self.writeTmp("") 179 t3 = self.writeTmp("The only line there is.\n") 180 t4 = self.writeTmp("") 181 fi = FileInput(files=(t1, t2, t3, t4)) 182 183 line = fi.readline() 184 self.assertEqual(line, 'The only line there is.\n') 185 self.assertEqual(fi.lineno(), 1) 186 self.assertEqual(fi.filelineno(), 1) 187 self.assertEqual(fi.filename(), t3) 188 189 line = fi.readline() 190 self.assertFalse(line) 191 self.assertEqual(fi.lineno(), 1) 192 self.assertEqual(fi.filelineno(), 0) 193 self.assertEqual(fi.filename(), t4) 194 fi.close() 195 196 def test_files_that_dont_end_with_newline(self): 197 t1 = self.writeTmp("A\nB\nC") 198 t2 = self.writeTmp("D\nE\nF") 199 fi = FileInput(files=(t1, t2)) 200 lines = list(fi) 201 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 202 self.assertEqual(fi.filelineno(), 3) 203 self.assertEqual(fi.lineno(), 6) 204 205## def test_unicode_filenames(self): 206## # XXX A unicode string is always returned by writeTmp. 207## # So is this needed? 208## t1 = self.writeTmp("A\nB") 209## encoding = sys.getfilesystemencoding() 210## if encoding is None: 211## encoding = 'ascii' 212## fi = FileInput(files=str(t1, encoding)) 213## lines = list(fi) 214## self.assertEqual(lines, ["A\n", "B"]) 215 216 def test_fileno(self): 217 t1 = self.writeTmp("A\nB") 218 t2 = self.writeTmp("C\nD") 219 fi = FileInput(files=(t1, t2)) 220 self.assertEqual(fi.fileno(), -1) 221 line = next(fi) 222 self.assertNotEqual(fi.fileno(), -1) 223 fi.nextfile() 224 self.assertEqual(fi.fileno(), -1) 225 line = list(fi) 226 self.assertEqual(fi.fileno(), -1) 227 228 def test_opening_mode(self): 229 try: 230 # invalid mode, should raise ValueError 231 fi = FileInput(mode="w") 232 self.fail("FileInput should reject invalid mode argument") 233 except ValueError: 234 pass 235 # try opening in universal newline mode 236 t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb") 237 with check_warnings(('', DeprecationWarning)): 238 fi = FileInput(files=t1, mode="U") 239 with check_warnings(('', DeprecationWarning)): 240 lines = list(fi) 241 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) 242 243 def test_stdin_binary_mode(self): 244 with mock.patch('sys.stdin') as m_stdin: 245 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam') 246 fi = FileInput(files=['-'], mode='rb') 247 lines = list(fi) 248 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 249 250 def test_detached_stdin_binary_mode(self): 251 orig_stdin = sys.stdin 252 try: 253 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam') 254 self.assertFalse(hasattr(sys.stdin, 'buffer')) 255 fi = FileInput(files=['-'], mode='rb') 256 lines = list(fi) 257 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 258 finally: 259 sys.stdin = orig_stdin 260 261 def test_file_opening_hook(self): 262 try: 263 # cannot use openhook and inplace mode 264 fi = FileInput(inplace=1, openhook=lambda f, m: None) 265 self.fail("FileInput should raise if both inplace " 266 "and openhook arguments are given") 267 except ValueError: 268 pass 269 try: 270 fi = FileInput(openhook=1) 271 self.fail("FileInput should check openhook for being callable") 272 except ValueError: 273 pass 274 275 class CustomOpenHook: 276 def __init__(self): 277 self.invoked = False 278 def __call__(self, *args): 279 self.invoked = True 280 return open(*args) 281 282 t = self.writeTmp("\n") 283 custom_open_hook = CustomOpenHook() 284 with FileInput([t], openhook=custom_open_hook) as fi: 285 fi.readline() 286 self.assertTrue(custom_open_hook.invoked, "openhook not invoked") 287 288 def test_readline(self): 289 with open(TESTFN, 'wb') as f: 290 f.write(b'A\nB\r\nC\r') 291 # Fill TextIOWrapper buffer. 292 f.write(b'123456789\n' * 1000) 293 # Issue #20501: readline() shouldn't read whole file. 294 f.write(b'\x80') 295 self.addCleanup(safe_unlink, TESTFN) 296 297 with FileInput(files=TESTFN, 298 openhook=hook_encoded('ascii')) as fi: 299 try: 300 self.assertEqual(fi.readline(), 'A\n') 301 self.assertEqual(fi.readline(), 'B\n') 302 self.assertEqual(fi.readline(), 'C\n') 303 except UnicodeDecodeError: 304 self.fail('Read to end of file') 305 with self.assertRaises(UnicodeDecodeError): 306 # Read to the end of file. 307 list(fi) 308 self.assertEqual(fi.readline(), '') 309 self.assertEqual(fi.readline(), '') 310 311 def test_readline_binary_mode(self): 312 with open(TESTFN, 'wb') as f: 313 f.write(b'A\nB\r\nC\rD') 314 self.addCleanup(safe_unlink, TESTFN) 315 316 with FileInput(files=TESTFN, mode='rb') as fi: 317 self.assertEqual(fi.readline(), b'A\n') 318 self.assertEqual(fi.readline(), b'B\r\n') 319 self.assertEqual(fi.readline(), b'C\rD') 320 # Read to the end of file. 321 self.assertEqual(fi.readline(), b'') 322 self.assertEqual(fi.readline(), b'') 323 324 def test_inplace_binary_write_mode(self): 325 temp_file = self.writeTmp(b'Initial text.', mode='wb') 326 with FileInput(temp_file, mode='rb', inplace=True) as fobj: 327 line = fobj.readline() 328 self.assertEqual(line, b'Initial text.') 329 # print() cannot be used with files opened in binary mode. 330 sys.stdout.write(b'New line.') 331 with open(temp_file, 'rb') as f: 332 self.assertEqual(f.read(), b'New line.') 333 334 def test_context_manager(self): 335 t1 = self.writeTmp("A\nB\nC") 336 t2 = self.writeTmp("D\nE\nF") 337 with FileInput(files=(t1, t2)) as fi: 338 lines = list(fi) 339 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 340 self.assertEqual(fi.filelineno(), 3) 341 self.assertEqual(fi.lineno(), 6) 342 self.assertEqual(fi._files, ()) 343 344 def test_close_on_exception(self): 345 t1 = self.writeTmp("") 346 try: 347 with FileInput(files=t1) as fi: 348 raise OSError 349 except OSError: 350 self.assertEqual(fi._files, ()) 351 352 def test_empty_files_list_specified_to_constructor(self): 353 with FileInput(files=[]) as fi: 354 self.assertEqual(fi._files, ('-',)) 355 356 @support.ignore_warnings(category=DeprecationWarning) 357 def test__getitem__(self): 358 """Tests invoking FileInput.__getitem__() with the current 359 line number""" 360 t = self.writeTmp("line1\nline2\n") 361 with FileInput(files=[t]) as fi: 362 retval1 = fi[0] 363 self.assertEqual(retval1, "line1\n") 364 retval2 = fi[1] 365 self.assertEqual(retval2, "line2\n") 366 367 def test__getitem___deprecation(self): 368 t = self.writeTmp("line1\nline2\n") 369 with self.assertWarnsRegex(DeprecationWarning, 370 r'Use iterator protocol instead'): 371 with FileInput(files=[t]) as fi: 372 self.assertEqual(fi[0], "line1\n") 373 374 @support.ignore_warnings(category=DeprecationWarning) 375 def test__getitem__invalid_key(self): 376 """Tests invoking FileInput.__getitem__() with an index unequal to 377 the line number""" 378 t = self.writeTmp("line1\nline2\n") 379 with FileInput(files=[t]) as fi: 380 with self.assertRaises(RuntimeError) as cm: 381 fi[1] 382 self.assertEqual(cm.exception.args, ("accessing lines out of order",)) 383 384 @support.ignore_warnings(category=DeprecationWarning) 385 def test__getitem__eof(self): 386 """Tests invoking FileInput.__getitem__() with the line number but at 387 end-of-input""" 388 t = self.writeTmp('') 389 with FileInput(files=[t]) as fi: 390 with self.assertRaises(IndexError) as cm: 391 fi[0] 392 self.assertEqual(cm.exception.args, ("end of input reached",)) 393 394 def test_nextfile_oserror_deleting_backup(self): 395 """Tests invoking FileInput.nextfile() when the attempt to delete 396 the backup file would raise OSError. This error is expected to be 397 silently ignored""" 398 399 os_unlink_orig = os.unlink 400 os_unlink_replacement = UnconditionallyRaise(OSError) 401 try: 402 t = self.writeTmp("\n") 403 self.addCleanup(support.unlink, t + '.bak') 404 with FileInput(files=[t], inplace=True) as fi: 405 next(fi) # make sure the file is opened 406 os.unlink = os_unlink_replacement 407 fi.nextfile() 408 finally: 409 os.unlink = os_unlink_orig 410 411 # sanity check to make sure that our test scenario was actually hit 412 self.assertTrue(os_unlink_replacement.invoked, 413 "os.unlink() was not invoked") 414 415 def test_readline_os_fstat_raises_OSError(self): 416 """Tests invoking FileInput.readline() when os.fstat() raises OSError. 417 This exception should be silently discarded.""" 418 419 os_fstat_orig = os.fstat 420 os_fstat_replacement = UnconditionallyRaise(OSError) 421 try: 422 t = self.writeTmp("\n") 423 with FileInput(files=[t], inplace=True) as fi: 424 os.fstat = os_fstat_replacement 425 fi.readline() 426 finally: 427 os.fstat = os_fstat_orig 428 429 # sanity check to make sure that our test scenario was actually hit 430 self.assertTrue(os_fstat_replacement.invoked, 431 "os.fstat() was not invoked") 432 433 def test_readline_os_chmod_raises_OSError(self): 434 """Tests invoking FileInput.readline() when os.chmod() raises OSError. 435 This exception should be silently discarded.""" 436 437 os_chmod_orig = os.chmod 438 os_chmod_replacement = UnconditionallyRaise(OSError) 439 try: 440 t = self.writeTmp("\n") 441 with FileInput(files=[t], inplace=True) as fi: 442 os.chmod = os_chmod_replacement 443 fi.readline() 444 finally: 445 os.chmod = os_chmod_orig 446 447 # sanity check to make sure that our test scenario was actually hit 448 self.assertTrue(os_chmod_replacement.invoked, 449 "os.fstat() was not invoked") 450 451 def test_fileno_when_ValueError_raised(self): 452 class FilenoRaisesValueError(UnconditionallyRaise): 453 def __init__(self): 454 UnconditionallyRaise.__init__(self, ValueError) 455 def fileno(self): 456 self.__call__() 457 458 unconditionally_raise_ValueError = FilenoRaisesValueError() 459 t = self.writeTmp("\n") 460 with FileInput(files=[t]) as fi: 461 file_backup = fi._file 462 try: 463 fi._file = unconditionally_raise_ValueError 464 result = fi.fileno() 465 finally: 466 fi._file = file_backup # make sure the file gets cleaned up 467 468 # sanity check to make sure that our test scenario was actually hit 469 self.assertTrue(unconditionally_raise_ValueError.invoked, 470 "_file.fileno() was not invoked") 471 472 self.assertEqual(result, -1, "fileno() should return -1") 473 474 def test_readline_buffering(self): 475 src = LineReader() 476 with FileInput(files=['line1\nline2', 'line3\n'], 477 openhook=src.openhook) as fi: 478 self.assertEqual(src.linesread, []) 479 self.assertEqual(fi.readline(), 'line1\n') 480 self.assertEqual(src.linesread, ['line1\n']) 481 self.assertEqual(fi.readline(), 'line2') 482 self.assertEqual(src.linesread, ['line2']) 483 self.assertEqual(fi.readline(), 'line3\n') 484 self.assertEqual(src.linesread, ['', 'line3\n']) 485 self.assertEqual(fi.readline(), '') 486 self.assertEqual(src.linesread, ['']) 487 self.assertEqual(fi.readline(), '') 488 self.assertEqual(src.linesread, []) 489 490 def test_iteration_buffering(self): 491 src = LineReader() 492 with FileInput(files=['line1\nline2', 'line3\n'], 493 openhook=src.openhook) as fi: 494 self.assertEqual(src.linesread, []) 495 self.assertEqual(next(fi), 'line1\n') 496 self.assertEqual(src.linesread, ['line1\n']) 497 self.assertEqual(next(fi), 'line2') 498 self.assertEqual(src.linesread, ['line2']) 499 self.assertEqual(next(fi), 'line3\n') 500 self.assertEqual(src.linesread, ['', 'line3\n']) 501 self.assertRaises(StopIteration, next, fi) 502 self.assertEqual(src.linesread, ['']) 503 self.assertRaises(StopIteration, next, fi) 504 self.assertEqual(src.linesread, []) 505 506 def test_pathlib_file(self): 507 t1 = Path(self.writeTmp("Pathlib file.")) 508 with FileInput(t1) as fi: 509 line = fi.readline() 510 self.assertEqual(line, 'Pathlib file.') 511 self.assertEqual(fi.lineno(), 1) 512 self.assertEqual(fi.filelineno(), 1) 513 self.assertEqual(fi.filename(), os.fspath(t1)) 514 515 def test_pathlib_file_inplace(self): 516 t1 = Path(self.writeTmp('Pathlib file.')) 517 with FileInput(t1, inplace=True) as fi: 518 line = fi.readline() 519 self.assertEqual(line, 'Pathlib file.') 520 print('Modified %s' % line) 521 with open(t1) as f: 522 self.assertEqual(f.read(), 'Modified Pathlib file.\n') 523 524 525class MockFileInput: 526 """A class that mocks out fileinput.FileInput for use during unit tests""" 527 528 def __init__(self, files=None, inplace=False, backup="", *, 529 mode="r", openhook=None): 530 self.files = files 531 self.inplace = inplace 532 self.backup = backup 533 self.mode = mode 534 self.openhook = openhook 535 self._file = None 536 self.invocation_counts = collections.defaultdict(lambda: 0) 537 self.return_values = {} 538 539 def close(self): 540 self.invocation_counts["close"] += 1 541 542 def nextfile(self): 543 self.invocation_counts["nextfile"] += 1 544 return self.return_values["nextfile"] 545 546 def filename(self): 547 self.invocation_counts["filename"] += 1 548 return self.return_values["filename"] 549 550 def lineno(self): 551 self.invocation_counts["lineno"] += 1 552 return self.return_values["lineno"] 553 554 def filelineno(self): 555 self.invocation_counts["filelineno"] += 1 556 return self.return_values["filelineno"] 557 558 def fileno(self): 559 self.invocation_counts["fileno"] += 1 560 return self.return_values["fileno"] 561 562 def isfirstline(self): 563 self.invocation_counts["isfirstline"] += 1 564 return self.return_values["isfirstline"] 565 566 def isstdin(self): 567 self.invocation_counts["isstdin"] += 1 568 return self.return_values["isstdin"] 569 570class BaseFileInputGlobalMethodsTest(unittest.TestCase): 571 """Base class for unit tests for the global function of 572 the fileinput module.""" 573 574 def setUp(self): 575 self._orig_state = fileinput._state 576 self._orig_FileInput = fileinput.FileInput 577 fileinput.FileInput = MockFileInput 578 579 def tearDown(self): 580 fileinput.FileInput = self._orig_FileInput 581 fileinput._state = self._orig_state 582 583 def assertExactlyOneInvocation(self, mock_file_input, method_name): 584 # assert that the method with the given name was invoked once 585 actual_count = mock_file_input.invocation_counts[method_name] 586 self.assertEqual(actual_count, 1, method_name) 587 # assert that no other unexpected methods were invoked 588 actual_total_count = len(mock_file_input.invocation_counts) 589 self.assertEqual(actual_total_count, 1) 590 591class Test_fileinput_input(BaseFileInputGlobalMethodsTest): 592 """Unit tests for fileinput.input()""" 593 594 def test_state_is_not_None_and_state_file_is_not_None(self): 595 """Tests invoking fileinput.input() when fileinput._state is not None 596 and its _file attribute is also not None. Expect RuntimeError to 597 be raised with a meaningful error message and for fileinput._state 598 to *not* be modified.""" 599 instance = MockFileInput() 600 instance._file = object() 601 fileinput._state = instance 602 with self.assertRaises(RuntimeError) as cm: 603 fileinput.input() 604 self.assertEqual(("input() already active",), cm.exception.args) 605 self.assertIs(instance, fileinput._state, "fileinput._state") 606 607 def test_state_is_not_None_and_state_file_is_None(self): 608 """Tests invoking fileinput.input() when fileinput._state is not None 609 but its _file attribute *is* None. Expect it to create and return 610 a new fileinput.FileInput object with all method parameters passed 611 explicitly to the __init__() method; also ensure that 612 fileinput._state is set to the returned instance.""" 613 instance = MockFileInput() 614 instance._file = None 615 fileinput._state = instance 616 self.do_test_call_input() 617 618 def test_state_is_None(self): 619 """Tests invoking fileinput.input() when fileinput._state is None 620 Expect it to create and return a new fileinput.FileInput object 621 with all method parameters passed explicitly to the __init__() 622 method; also ensure that fileinput._state is set to the returned 623 instance.""" 624 fileinput._state = None 625 self.do_test_call_input() 626 627 def do_test_call_input(self): 628 """Tests that fileinput.input() creates a new fileinput.FileInput 629 object, passing the given parameters unmodified to 630 fileinput.FileInput.__init__(). Note that this test depends on the 631 monkey patching of fileinput.FileInput done by setUp().""" 632 files = object() 633 inplace = object() 634 backup = object() 635 mode = object() 636 openhook = object() 637 638 # call fileinput.input() with different values for each argument 639 result = fileinput.input(files=files, inplace=inplace, backup=backup, 640 mode=mode, openhook=openhook) 641 642 # ensure fileinput._state was set to the returned object 643 self.assertIs(result, fileinput._state, "fileinput._state") 644 645 # ensure the parameters to fileinput.input() were passed directly 646 # to FileInput.__init__() 647 self.assertIs(files, result.files, "files") 648 self.assertIs(inplace, result.inplace, "inplace") 649 self.assertIs(backup, result.backup, "backup") 650 self.assertIs(mode, result.mode, "mode") 651 self.assertIs(openhook, result.openhook, "openhook") 652 653class Test_fileinput_close(BaseFileInputGlobalMethodsTest): 654 """Unit tests for fileinput.close()""" 655 656 def test_state_is_None(self): 657 """Tests that fileinput.close() does nothing if fileinput._state 658 is None""" 659 fileinput._state = None 660 fileinput.close() 661 self.assertIsNone(fileinput._state) 662 663 def test_state_is_not_None(self): 664 """Tests that fileinput.close() invokes close() on fileinput._state 665 and sets _state=None""" 666 instance = MockFileInput() 667 fileinput._state = instance 668 fileinput.close() 669 self.assertExactlyOneInvocation(instance, "close") 670 self.assertIsNone(fileinput._state) 671 672class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): 673 """Unit tests for fileinput.nextfile()""" 674 675 def test_state_is_None(self): 676 """Tests fileinput.nextfile() when fileinput._state is None. 677 Ensure that it raises RuntimeError with a meaningful error message 678 and does not modify fileinput._state""" 679 fileinput._state = None 680 with self.assertRaises(RuntimeError) as cm: 681 fileinput.nextfile() 682 self.assertEqual(("no active input()",), cm.exception.args) 683 self.assertIsNone(fileinput._state) 684 685 def test_state_is_not_None(self): 686 """Tests fileinput.nextfile() when fileinput._state is not None. 687 Ensure that it invokes fileinput._state.nextfile() exactly once, 688 returns whatever it returns, and does not modify fileinput._state 689 to point to a different object.""" 690 nextfile_retval = object() 691 instance = MockFileInput() 692 instance.return_values["nextfile"] = nextfile_retval 693 fileinput._state = instance 694 retval = fileinput.nextfile() 695 self.assertExactlyOneInvocation(instance, "nextfile") 696 self.assertIs(retval, nextfile_retval) 697 self.assertIs(fileinput._state, instance) 698 699class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): 700 """Unit tests for fileinput.filename()""" 701 702 def test_state_is_None(self): 703 """Tests fileinput.filename() when fileinput._state is None. 704 Ensure that it raises RuntimeError with a meaningful error message 705 and does not modify fileinput._state""" 706 fileinput._state = None 707 with self.assertRaises(RuntimeError) as cm: 708 fileinput.filename() 709 self.assertEqual(("no active input()",), cm.exception.args) 710 self.assertIsNone(fileinput._state) 711 712 def test_state_is_not_None(self): 713 """Tests fileinput.filename() when fileinput._state is not None. 714 Ensure that it invokes fileinput._state.filename() exactly once, 715 returns whatever it returns, and does not modify fileinput._state 716 to point to a different object.""" 717 filename_retval = object() 718 instance = MockFileInput() 719 instance.return_values["filename"] = filename_retval 720 fileinput._state = instance 721 retval = fileinput.filename() 722 self.assertExactlyOneInvocation(instance, "filename") 723 self.assertIs(retval, filename_retval) 724 self.assertIs(fileinput._state, instance) 725 726class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): 727 """Unit tests for fileinput.lineno()""" 728 729 def test_state_is_None(self): 730 """Tests fileinput.lineno() when fileinput._state is None. 731 Ensure that it raises RuntimeError with a meaningful error message 732 and does not modify fileinput._state""" 733 fileinput._state = None 734 with self.assertRaises(RuntimeError) as cm: 735 fileinput.lineno() 736 self.assertEqual(("no active input()",), cm.exception.args) 737 self.assertIsNone(fileinput._state) 738 739 def test_state_is_not_None(self): 740 """Tests fileinput.lineno() when fileinput._state is not None. 741 Ensure that it invokes fileinput._state.lineno() exactly once, 742 returns whatever it returns, and does not modify fileinput._state 743 to point to a different object.""" 744 lineno_retval = object() 745 instance = MockFileInput() 746 instance.return_values["lineno"] = lineno_retval 747 fileinput._state = instance 748 retval = fileinput.lineno() 749 self.assertExactlyOneInvocation(instance, "lineno") 750 self.assertIs(retval, lineno_retval) 751 self.assertIs(fileinput._state, instance) 752 753class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): 754 """Unit tests for fileinput.filelineno()""" 755 756 def test_state_is_None(self): 757 """Tests fileinput.filelineno() when fileinput._state is None. 758 Ensure that it raises RuntimeError with a meaningful error message 759 and does not modify fileinput._state""" 760 fileinput._state = None 761 with self.assertRaises(RuntimeError) as cm: 762 fileinput.filelineno() 763 self.assertEqual(("no active input()",), cm.exception.args) 764 self.assertIsNone(fileinput._state) 765 766 def test_state_is_not_None(self): 767 """Tests fileinput.filelineno() when fileinput._state is not None. 768 Ensure that it invokes fileinput._state.filelineno() exactly once, 769 returns whatever it returns, and does not modify fileinput._state 770 to point to a different object.""" 771 filelineno_retval = object() 772 instance = MockFileInput() 773 instance.return_values["filelineno"] = filelineno_retval 774 fileinput._state = instance 775 retval = fileinput.filelineno() 776 self.assertExactlyOneInvocation(instance, "filelineno") 777 self.assertIs(retval, filelineno_retval) 778 self.assertIs(fileinput._state, instance) 779 780class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): 781 """Unit tests for fileinput.fileno()""" 782 783 def test_state_is_None(self): 784 """Tests fileinput.fileno() when fileinput._state is None. 785 Ensure that it raises RuntimeError with a meaningful error message 786 and does not modify fileinput._state""" 787 fileinput._state = None 788 with self.assertRaises(RuntimeError) as cm: 789 fileinput.fileno() 790 self.assertEqual(("no active input()",), cm.exception.args) 791 self.assertIsNone(fileinput._state) 792 793 def test_state_is_not_None(self): 794 """Tests fileinput.fileno() when fileinput._state is not None. 795 Ensure that it invokes fileinput._state.fileno() exactly once, 796 returns whatever it returns, and does not modify fileinput._state 797 to point to a different object.""" 798 fileno_retval = object() 799 instance = MockFileInput() 800 instance.return_values["fileno"] = fileno_retval 801 instance.fileno_retval = fileno_retval 802 fileinput._state = instance 803 retval = fileinput.fileno() 804 self.assertExactlyOneInvocation(instance, "fileno") 805 self.assertIs(retval, fileno_retval) 806 self.assertIs(fileinput._state, instance) 807 808class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): 809 """Unit tests for fileinput.isfirstline()""" 810 811 def test_state_is_None(self): 812 """Tests fileinput.isfirstline() when fileinput._state is None. 813 Ensure that it raises RuntimeError with a meaningful error message 814 and does not modify fileinput._state""" 815 fileinput._state = None 816 with self.assertRaises(RuntimeError) as cm: 817 fileinput.isfirstline() 818 self.assertEqual(("no active input()",), cm.exception.args) 819 self.assertIsNone(fileinput._state) 820 821 def test_state_is_not_None(self): 822 """Tests fileinput.isfirstline() when fileinput._state is not None. 823 Ensure that it invokes fileinput._state.isfirstline() exactly once, 824 returns whatever it returns, and does not modify fileinput._state 825 to point to a different object.""" 826 isfirstline_retval = object() 827 instance = MockFileInput() 828 instance.return_values["isfirstline"] = isfirstline_retval 829 fileinput._state = instance 830 retval = fileinput.isfirstline() 831 self.assertExactlyOneInvocation(instance, "isfirstline") 832 self.assertIs(retval, isfirstline_retval) 833 self.assertIs(fileinput._state, instance) 834 835class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): 836 """Unit tests for fileinput.isstdin()""" 837 838 def test_state_is_None(self): 839 """Tests fileinput.isstdin() when fileinput._state is None. 840 Ensure that it raises RuntimeError with a meaningful error message 841 and does not modify fileinput._state""" 842 fileinput._state = None 843 with self.assertRaises(RuntimeError) as cm: 844 fileinput.isstdin() 845 self.assertEqual(("no active input()",), cm.exception.args) 846 self.assertIsNone(fileinput._state) 847 848 def test_state_is_not_None(self): 849 """Tests fileinput.isstdin() when fileinput._state is not None. 850 Ensure that it invokes fileinput._state.isstdin() exactly once, 851 returns whatever it returns, and does not modify fileinput._state 852 to point to a different object.""" 853 isstdin_retval = object() 854 instance = MockFileInput() 855 instance.return_values["isstdin"] = isstdin_retval 856 fileinput._state = instance 857 retval = fileinput.isstdin() 858 self.assertExactlyOneInvocation(instance, "isstdin") 859 self.assertIs(retval, isstdin_retval) 860 self.assertIs(fileinput._state, instance) 861 862class InvocationRecorder: 863 def __init__(self): 864 self.invocation_count = 0 865 def __call__(self, *args, **kwargs): 866 self.invocation_count += 1 867 self.last_invocation = (args, kwargs) 868 869class Test_hook_compressed(unittest.TestCase): 870 """Unit tests for fileinput.hook_compressed()""" 871 872 def setUp(self): 873 self.fake_open = InvocationRecorder() 874 875 def test_empty_string(self): 876 self.do_test_use_builtin_open("", 1) 877 878 def test_no_ext(self): 879 self.do_test_use_builtin_open("abcd", 2) 880 881 @unittest.skipUnless(gzip, "Requires gzip and zlib") 882 def test_gz_ext_fake(self): 883 original_open = gzip.open 884 gzip.open = self.fake_open 885 try: 886 result = fileinput.hook_compressed("test.gz", 3) 887 finally: 888 gzip.open = original_open 889 890 self.assertEqual(self.fake_open.invocation_count, 1) 891 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {})) 892 893 @unittest.skipUnless(bz2, "Requires bz2") 894 def test_bz2_ext_fake(self): 895 original_open = bz2.BZ2File 896 bz2.BZ2File = self.fake_open 897 try: 898 result = fileinput.hook_compressed("test.bz2", 4) 899 finally: 900 bz2.BZ2File = original_open 901 902 self.assertEqual(self.fake_open.invocation_count, 1) 903 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {})) 904 905 def test_blah_ext(self): 906 self.do_test_use_builtin_open("abcd.blah", 5) 907 908 def test_gz_ext_builtin(self): 909 self.do_test_use_builtin_open("abcd.Gz", 6) 910 911 def test_bz2_ext_builtin(self): 912 self.do_test_use_builtin_open("abcd.Bz2", 7) 913 914 def do_test_use_builtin_open(self, filename, mode): 915 original_open = self.replace_builtin_open(self.fake_open) 916 try: 917 result = fileinput.hook_compressed(filename, mode) 918 finally: 919 self.replace_builtin_open(original_open) 920 921 self.assertEqual(self.fake_open.invocation_count, 1) 922 self.assertEqual(self.fake_open.last_invocation, 923 ((filename, mode), {})) 924 925 @staticmethod 926 def replace_builtin_open(new_open_func): 927 original_open = builtins.open 928 builtins.open = new_open_func 929 return original_open 930 931class Test_hook_encoded(unittest.TestCase): 932 """Unit tests for fileinput.hook_encoded()""" 933 934 def test(self): 935 encoding = object() 936 errors = object() 937 result = fileinput.hook_encoded(encoding, errors=errors) 938 939 fake_open = InvocationRecorder() 940 original_open = builtins.open 941 builtins.open = fake_open 942 try: 943 filename = object() 944 mode = object() 945 open_result = result(filename, mode) 946 finally: 947 builtins.open = original_open 948 949 self.assertEqual(fake_open.invocation_count, 1) 950 951 args, kwargs = fake_open.last_invocation 952 self.assertIs(args[0], filename) 953 self.assertIs(args[1], mode) 954 self.assertIs(kwargs.pop('encoding'), encoding) 955 self.assertIs(kwargs.pop('errors'), errors) 956 self.assertFalse(kwargs) 957 958 def test_errors(self): 959 with open(TESTFN, 'wb') as f: 960 f.write(b'\x80abc') 961 self.addCleanup(safe_unlink, TESTFN) 962 963 def check(errors, expected_lines): 964 with FileInput(files=TESTFN, mode='r', 965 openhook=hook_encoded('utf-8', errors=errors)) as fi: 966 lines = list(fi) 967 self.assertEqual(lines, expected_lines) 968 969 check('ignore', ['abc']) 970 with self.assertRaises(UnicodeDecodeError): 971 check('strict', ['abc']) 972 check('replace', ['\ufffdabc']) 973 check('backslashreplace', ['\\x80abc']) 974 975 def test_modes(self): 976 with open(TESTFN, 'wb') as f: 977 # UTF-7 is a convenient, seldom used encoding 978 f.write(b'A\nB\r\nC\rD+IKw-') 979 self.addCleanup(safe_unlink, TESTFN) 980 981 def check(mode, expected_lines): 982 with FileInput(files=TESTFN, mode=mode, 983 openhook=hook_encoded('utf-7')) as fi: 984 lines = list(fi) 985 self.assertEqual(lines, expected_lines) 986 987 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 988 with self.assertWarns(DeprecationWarning): 989 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 990 with self.assertWarns(DeprecationWarning): 991 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 992 with self.assertRaises(ValueError): 993 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac']) 994 995 996class MiscTest(unittest.TestCase): 997 998 def test_all(self): 999 support.check__all__(self, fileinput) 1000 1001 1002if __name__ == "__main__": 1003 unittest.main() 1004