1''' 2Tests for fileinput module. 3Nick Mathewson 4''' 5import os 6import sys 7import re 8import fileinput 9import collections 10import builtins 11import unittest 12 13try: 14 import bz2 15except ImportError: 16 bz2 = None 17try: 18 import gzip 19except ImportError: 20 gzip = None 21 22from io import BytesIO, StringIO 23from fileinput import FileInput, hook_encoded 24 25from test.support import verbose, TESTFN, check_warnings 26from test.support import unlink as safe_unlink 27from test import support 28from unittest import mock 29 30 31# The fileinput module has 2 interfaces: the FileInput class which does 32# all the work, and a few functions (input, etc.) that use a global _state 33# variable. 34 35# Write lines (a list of lines) to temp file number i, and return the 36# temp file's name. 37def writeTmp(i, lines, mode='w'): # opening in text mode is the default 38 name = TESTFN + str(i) 39 f = open(name, mode) 40 for line in lines: 41 f.write(line) 42 f.close() 43 return name 44 45def remove_tempfiles(*names): 46 for name in names: 47 if name: 48 safe_unlink(name) 49 50class LineReader: 51 52 def __init__(self): 53 self._linesread = [] 54 55 @property 56 def linesread(self): 57 try: 58 return self._linesread[:] 59 finally: 60 self._linesread = [] 61 62 def openhook(self, filename, mode): 63 self.it = iter(filename.splitlines(True)) 64 return self 65 66 def readline(self, size=None): 67 line = next(self.it, '') 68 self._linesread.append(line) 69 return line 70 71 def readlines(self, hint=-1): 72 lines = [] 73 size = 0 74 while True: 75 line = self.readline() 76 if not line: 77 return lines 78 lines.append(line) 79 size += len(line) 80 if size >= hint: 81 return lines 82 83 def close(self): 84 pass 85 86class BufferSizesTests(unittest.TestCase): 87 def test_buffer_sizes(self): 88 # First, run the tests with default and teeny buffer size. 89 for round, bs in (0, 0), (1, 30): 90 t1 = t2 = t3 = t4 = None 91 try: 92 t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) 93 t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) 94 t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) 95 t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) 96 if bs: 97 with self.assertWarns(DeprecationWarning): 98 self.buffer_size_test(t1, t2, t3, t4, bs, round) 99 else: 100 self.buffer_size_test(t1, t2, t3, t4, bs, round) 101 finally: 102 remove_tempfiles(t1, t2, t3, t4) 103 104 def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0): 105 pat = re.compile(r'LINE (\d+) OF FILE (\d+)') 106 107 start = 1 + round*6 108 if verbose: 109 print('%s. Simple iteration (bs=%s)' % (start+0, bs)) 110 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 111 lines = list(fi) 112 fi.close() 113 self.assertEqual(len(lines), 31) 114 self.assertEqual(lines[4], 'Line 5 of file 1\n') 115 self.assertEqual(lines[30], 'Line 1 of file 4\n') 116 self.assertEqual(fi.lineno(), 31) 117 self.assertEqual(fi.filename(), t4) 118 119 if verbose: 120 print('%s. Status variables (bs=%s)' % (start+1, bs)) 121 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 122 s = "x" 123 while s and s != 'Line 6 of file 2\n': 124 s = fi.readline() 125 self.assertEqual(fi.filename(), t2) 126 self.assertEqual(fi.lineno(), 21) 127 self.assertEqual(fi.filelineno(), 6) 128 self.assertFalse(fi.isfirstline()) 129 self.assertFalse(fi.isstdin()) 130 131 if verbose: 132 print('%s. Nextfile (bs=%s)' % (start+2, bs)) 133 fi.nextfile() 134 self.assertEqual(fi.readline(), 'Line 1 of file 3\n') 135 self.assertEqual(fi.lineno(), 22) 136 fi.close() 137 138 if verbose: 139 print('%s. Stdin (bs=%s)' % (start+3, bs)) 140 fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs) 141 savestdin = sys.stdin 142 try: 143 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") 144 lines = list(fi) 145 self.assertEqual(len(lines), 33) 146 self.assertEqual(lines[32], 'Line 2 of stdin\n') 147 self.assertEqual(fi.filename(), '<stdin>') 148 fi.nextfile() 149 finally: 150 sys.stdin = savestdin 151 152 if verbose: 153 print('%s. Boundary conditions (bs=%s)' % (start+4, bs)) 154 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 155 self.assertEqual(fi.lineno(), 0) 156 self.assertEqual(fi.filename(), None) 157 fi.nextfile() 158 self.assertEqual(fi.lineno(), 0) 159 self.assertEqual(fi.filename(), None) 160 161 if verbose: 162 print('%s. Inplace (bs=%s)' % (start+5, bs)) 163 savestdout = sys.stdout 164 try: 165 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs) 166 for line in fi: 167 line = line[:-1].upper() 168 print(line) 169 fi.close() 170 finally: 171 sys.stdout = savestdout 172 173 fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs) 174 for line in fi: 175 self.assertEqual(line[-1], '\n') 176 m = pat.match(line[:-1]) 177 self.assertNotEqual(m, None) 178 self.assertEqual(int(m.group(1)), fi.filelineno()) 179 fi.close() 180 181class UnconditionallyRaise: 182 def __init__(self, exception_type): 183 self.exception_type = exception_type 184 self.invoked = False 185 def __call__(self, *args, **kwargs): 186 self.invoked = True 187 raise self.exception_type() 188 189class FileInputTests(unittest.TestCase): 190 191 def test_zero_byte_files(self): 192 t1 = t2 = t3 = t4 = None 193 try: 194 t1 = writeTmp(1, [""]) 195 t2 = writeTmp(2, [""]) 196 t3 = writeTmp(3, ["The only line there is.\n"]) 197 t4 = writeTmp(4, [""]) 198 fi = FileInput(files=(t1, t2, t3, t4)) 199 200 line = fi.readline() 201 self.assertEqual(line, 'The only line there is.\n') 202 self.assertEqual(fi.lineno(), 1) 203 self.assertEqual(fi.filelineno(), 1) 204 self.assertEqual(fi.filename(), t3) 205 206 line = fi.readline() 207 self.assertFalse(line) 208 self.assertEqual(fi.lineno(), 1) 209 self.assertEqual(fi.filelineno(), 0) 210 self.assertEqual(fi.filename(), t4) 211 fi.close() 212 finally: 213 remove_tempfiles(t1, t2, t3, t4) 214 215 def test_files_that_dont_end_with_newline(self): 216 t1 = t2 = None 217 try: 218 t1 = writeTmp(1, ["A\nB\nC"]) 219 t2 = writeTmp(2, ["D\nE\nF"]) 220 fi = FileInput(files=(t1, t2)) 221 lines = list(fi) 222 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 223 self.assertEqual(fi.filelineno(), 3) 224 self.assertEqual(fi.lineno(), 6) 225 finally: 226 remove_tempfiles(t1, t2) 227 228## def test_unicode_filenames(self): 229## # XXX A unicode string is always returned by writeTmp. 230## # So is this needed? 231## try: 232## t1 = writeTmp(1, ["A\nB"]) 233## encoding = sys.getfilesystemencoding() 234## if encoding is None: 235## encoding = 'ascii' 236## fi = FileInput(files=str(t1, encoding)) 237## lines = list(fi) 238## self.assertEqual(lines, ["A\n", "B"]) 239## finally: 240## remove_tempfiles(t1) 241 242 def test_fileno(self): 243 t1 = t2 = None 244 try: 245 t1 = writeTmp(1, ["A\nB"]) 246 t2 = writeTmp(2, ["C\nD"]) 247 fi = FileInput(files=(t1, t2)) 248 self.assertEqual(fi.fileno(), -1) 249 line =next( fi) 250 self.assertNotEqual(fi.fileno(), -1) 251 fi.nextfile() 252 self.assertEqual(fi.fileno(), -1) 253 line = list(fi) 254 self.assertEqual(fi.fileno(), -1) 255 finally: 256 remove_tempfiles(t1, t2) 257 258 def test_opening_mode(self): 259 try: 260 # invalid mode, should raise ValueError 261 fi = FileInput(mode="w") 262 self.fail("FileInput should reject invalid mode argument") 263 except ValueError: 264 pass 265 t1 = None 266 try: 267 # try opening in universal newline mode 268 t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") 269 with check_warnings(('', DeprecationWarning)): 270 fi = FileInput(files=t1, mode="U") 271 with check_warnings(('', DeprecationWarning)): 272 lines = list(fi) 273 self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) 274 finally: 275 remove_tempfiles(t1) 276 277 def test_stdin_binary_mode(self): 278 with mock.patch('sys.stdin') as m_stdin: 279 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam') 280 fi = FileInput(files=['-'], mode='rb') 281 lines = list(fi) 282 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 283 284 def test_detached_stdin_binary_mode(self): 285 orig_stdin = sys.stdin 286 try: 287 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam') 288 self.assertFalse(hasattr(sys.stdin, 'buffer')) 289 fi = FileInput(files=['-'], mode='rb') 290 lines = list(fi) 291 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 292 finally: 293 sys.stdin = orig_stdin 294 295 def test_file_opening_hook(self): 296 try: 297 # cannot use openhook and inplace mode 298 fi = FileInput(inplace=1, openhook=lambda f, m: None) 299 self.fail("FileInput should raise if both inplace " 300 "and openhook arguments are given") 301 except ValueError: 302 pass 303 try: 304 fi = FileInput(openhook=1) 305 self.fail("FileInput should check openhook for being callable") 306 except ValueError: 307 pass 308 309 class CustomOpenHook: 310 def __init__(self): 311 self.invoked = False 312 def __call__(self, *args): 313 self.invoked = True 314 return open(*args) 315 316 t = writeTmp(1, ["\n"]) 317 self.addCleanup(remove_tempfiles, t) 318 custom_open_hook = CustomOpenHook() 319 with FileInput([t], openhook=custom_open_hook) as fi: 320 fi.readline() 321 self.assertTrue(custom_open_hook.invoked, "openhook not invoked") 322 323 def test_readline(self): 324 with open(TESTFN, 'wb') as f: 325 f.write(b'A\nB\r\nC\r') 326 # Fill TextIOWrapper buffer. 327 f.write(b'123456789\n' * 1000) 328 # Issue #20501: readline() shouldn't read whole file. 329 f.write(b'\x80') 330 self.addCleanup(safe_unlink, TESTFN) 331 332 with FileInput(files=TESTFN, 333 openhook=hook_encoded('ascii')) as fi: 334 try: 335 self.assertEqual(fi.readline(), 'A\n') 336 self.assertEqual(fi.readline(), 'B\n') 337 self.assertEqual(fi.readline(), 'C\n') 338 except UnicodeDecodeError: 339 self.fail('Read to end of file') 340 with self.assertRaises(UnicodeDecodeError): 341 # Read to the end of file. 342 list(fi) 343 self.assertEqual(fi.readline(), '') 344 self.assertEqual(fi.readline(), '') 345 346 def test_readline_binary_mode(self): 347 with open(TESTFN, 'wb') as f: 348 f.write(b'A\nB\r\nC\rD') 349 self.addCleanup(safe_unlink, TESTFN) 350 351 with FileInput(files=TESTFN, mode='rb') as fi: 352 self.assertEqual(fi.readline(), b'A\n') 353 self.assertEqual(fi.readline(), b'B\r\n') 354 self.assertEqual(fi.readline(), b'C\rD') 355 # Read to the end of file. 356 self.assertEqual(fi.readline(), b'') 357 self.assertEqual(fi.readline(), b'') 358 359 def test_context_manager(self): 360 try: 361 t1 = writeTmp(1, ["A\nB\nC"]) 362 t2 = writeTmp(2, ["D\nE\nF"]) 363 with FileInput(files=(t1, t2)) as fi: 364 lines = list(fi) 365 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 366 self.assertEqual(fi.filelineno(), 3) 367 self.assertEqual(fi.lineno(), 6) 368 self.assertEqual(fi._files, ()) 369 finally: 370 remove_tempfiles(t1, t2) 371 372 def test_close_on_exception(self): 373 try: 374 t1 = writeTmp(1, [""]) 375 with FileInput(files=t1) as fi: 376 raise OSError 377 except OSError: 378 self.assertEqual(fi._files, ()) 379 finally: 380 remove_tempfiles(t1) 381 382 def test_empty_files_list_specified_to_constructor(self): 383 with FileInput(files=[]) as fi: 384 self.assertEqual(fi._files, ('-',)) 385 386 def test__getitem__(self): 387 """Tests invoking FileInput.__getitem__() with the current 388 line number""" 389 t = writeTmp(1, ["line1\n", "line2\n"]) 390 self.addCleanup(remove_tempfiles, t) 391 with FileInput(files=[t]) as fi: 392 retval1 = fi[0] 393 self.assertEqual(retval1, "line1\n") 394 retval2 = fi[1] 395 self.assertEqual(retval2, "line2\n") 396 397 def test__getitem__invalid_key(self): 398 """Tests invoking FileInput.__getitem__() with an index unequal to 399 the line number""" 400 t = writeTmp(1, ["line1\n", "line2\n"]) 401 self.addCleanup(remove_tempfiles, t) 402 with FileInput(files=[t]) as fi: 403 with self.assertRaises(RuntimeError) as cm: 404 fi[1] 405 self.assertEqual(cm.exception.args, ("accessing lines out of order",)) 406 407 def test__getitem__eof(self): 408 """Tests invoking FileInput.__getitem__() with the line number but at 409 end-of-input""" 410 t = writeTmp(1, []) 411 self.addCleanup(remove_tempfiles, t) 412 with FileInput(files=[t]) as fi: 413 with self.assertRaises(IndexError) as cm: 414 fi[0] 415 self.assertEqual(cm.exception.args, ("end of input reached",)) 416 417 def test_nextfile_oserror_deleting_backup(self): 418 """Tests invoking FileInput.nextfile() when the attempt to delete 419 the backup file would raise OSError. This error is expected to be 420 silently ignored""" 421 422 os_unlink_orig = os.unlink 423 os_unlink_replacement = UnconditionallyRaise(OSError) 424 try: 425 t = writeTmp(1, ["\n"]) 426 self.addCleanup(remove_tempfiles, t) 427 with FileInput(files=[t], inplace=True) as fi: 428 next(fi) # make sure the file is opened 429 os.unlink = os_unlink_replacement 430 fi.nextfile() 431 finally: 432 os.unlink = os_unlink_orig 433 434 # sanity check to make sure that our test scenario was actually hit 435 self.assertTrue(os_unlink_replacement.invoked, 436 "os.unlink() was not invoked") 437 438 def test_readline_os_fstat_raises_OSError(self): 439 """Tests invoking FileInput.readline() when os.fstat() raises OSError. 440 This exception should be silently discarded.""" 441 442 os_fstat_orig = os.fstat 443 os_fstat_replacement = UnconditionallyRaise(OSError) 444 try: 445 t = writeTmp(1, ["\n"]) 446 self.addCleanup(remove_tempfiles, t) 447 with FileInput(files=[t], inplace=True) as fi: 448 os.fstat = os_fstat_replacement 449 fi.readline() 450 finally: 451 os.fstat = os_fstat_orig 452 453 # sanity check to make sure that our test scenario was actually hit 454 self.assertTrue(os_fstat_replacement.invoked, 455 "os.fstat() was not invoked") 456 457 @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist") 458 def test_readline_os_chmod_raises_OSError(self): 459 """Tests invoking FileInput.readline() when os.chmod() raises OSError. 460 This exception should be silently discarded.""" 461 462 os_chmod_orig = os.chmod 463 os_chmod_replacement = UnconditionallyRaise(OSError) 464 try: 465 t = writeTmp(1, ["\n"]) 466 self.addCleanup(remove_tempfiles, t) 467 with FileInput(files=[t], inplace=True) as fi: 468 os.chmod = os_chmod_replacement 469 fi.readline() 470 finally: 471 os.chmod = os_chmod_orig 472 473 # sanity check to make sure that our test scenario was actually hit 474 self.assertTrue(os_chmod_replacement.invoked, 475 "os.fstat() was not invoked") 476 477 def test_fileno_when_ValueError_raised(self): 478 class FilenoRaisesValueError(UnconditionallyRaise): 479 def __init__(self): 480 UnconditionallyRaise.__init__(self, ValueError) 481 def fileno(self): 482 self.__call__() 483 484 unconditionally_raise_ValueError = FilenoRaisesValueError() 485 t = writeTmp(1, ["\n"]) 486 self.addCleanup(remove_tempfiles, t) 487 with FileInput(files=[t]) as fi: 488 file_backup = fi._file 489 try: 490 fi._file = unconditionally_raise_ValueError 491 result = fi.fileno() 492 finally: 493 fi._file = file_backup # make sure the file gets cleaned up 494 495 # sanity check to make sure that our test scenario was actually hit 496 self.assertTrue(unconditionally_raise_ValueError.invoked, 497 "_file.fileno() was not invoked") 498 499 self.assertEqual(result, -1, "fileno() should return -1") 500 501 def test_readline_buffering(self): 502 src = LineReader() 503 with FileInput(files=['line1\nline2', 'line3\n'], 504 openhook=src.openhook) as fi: 505 self.assertEqual(src.linesread, []) 506 self.assertEqual(fi.readline(), 'line1\n') 507 self.assertEqual(src.linesread, ['line1\n']) 508 self.assertEqual(fi.readline(), 'line2') 509 self.assertEqual(src.linesread, ['line2']) 510 self.assertEqual(fi.readline(), 'line3\n') 511 self.assertEqual(src.linesread, ['', 'line3\n']) 512 self.assertEqual(fi.readline(), '') 513 self.assertEqual(src.linesread, ['']) 514 self.assertEqual(fi.readline(), '') 515 self.assertEqual(src.linesread, []) 516 517 def test_iteration_buffering(self): 518 src = LineReader() 519 with FileInput(files=['line1\nline2', 'line3\n'], 520 openhook=src.openhook) as fi: 521 self.assertEqual(src.linesread, []) 522 self.assertEqual(next(fi), 'line1\n') 523 self.assertEqual(src.linesread, ['line1\n']) 524 self.assertEqual(next(fi), 'line2') 525 self.assertEqual(src.linesread, ['line2']) 526 self.assertEqual(next(fi), 'line3\n') 527 self.assertEqual(src.linesread, ['', 'line3\n']) 528 self.assertRaises(StopIteration, next, fi) 529 self.assertEqual(src.linesread, ['']) 530 self.assertRaises(StopIteration, next, fi) 531 self.assertEqual(src.linesread, []) 532 533class MockFileInput: 534 """A class that mocks out fileinput.FileInput for use during unit tests""" 535 536 def __init__(self, files=None, inplace=False, backup="", bufsize=0, 537 mode="r", openhook=None): 538 self.files = files 539 self.inplace = inplace 540 self.backup = backup 541 self.bufsize = bufsize 542 self.mode = mode 543 self.openhook = openhook 544 self._file = None 545 self.invocation_counts = collections.defaultdict(lambda: 0) 546 self.return_values = {} 547 548 def close(self): 549 self.invocation_counts["close"] += 1 550 551 def nextfile(self): 552 self.invocation_counts["nextfile"] += 1 553 return self.return_values["nextfile"] 554 555 def filename(self): 556 self.invocation_counts["filename"] += 1 557 return self.return_values["filename"] 558 559 def lineno(self): 560 self.invocation_counts["lineno"] += 1 561 return self.return_values["lineno"] 562 563 def filelineno(self): 564 self.invocation_counts["filelineno"] += 1 565 return self.return_values["filelineno"] 566 567 def fileno(self): 568 self.invocation_counts["fileno"] += 1 569 return self.return_values["fileno"] 570 571 def isfirstline(self): 572 self.invocation_counts["isfirstline"] += 1 573 return self.return_values["isfirstline"] 574 575 def isstdin(self): 576 self.invocation_counts["isstdin"] += 1 577 return self.return_values["isstdin"] 578 579class BaseFileInputGlobalMethodsTest(unittest.TestCase): 580 """Base class for unit tests for the global function of 581 the fileinput module.""" 582 583 def setUp(self): 584 self._orig_state = fileinput._state 585 self._orig_FileInput = fileinput.FileInput 586 fileinput.FileInput = MockFileInput 587 588 def tearDown(self): 589 fileinput.FileInput = self._orig_FileInput 590 fileinput._state = self._orig_state 591 592 def assertExactlyOneInvocation(self, mock_file_input, method_name): 593 # assert that the method with the given name was invoked once 594 actual_count = mock_file_input.invocation_counts[method_name] 595 self.assertEqual(actual_count, 1, method_name) 596 # assert that no other unexpected methods were invoked 597 actual_total_count = len(mock_file_input.invocation_counts) 598 self.assertEqual(actual_total_count, 1) 599 600class Test_fileinput_input(BaseFileInputGlobalMethodsTest): 601 """Unit tests for fileinput.input()""" 602 603 def test_state_is_not_None_and_state_file_is_not_None(self): 604 """Tests invoking fileinput.input() when fileinput._state is not None 605 and its _file attribute is also not None. Expect RuntimeError to 606 be raised with a meaningful error message and for fileinput._state 607 to *not* be modified.""" 608 instance = MockFileInput() 609 instance._file = object() 610 fileinput._state = instance 611 with self.assertRaises(RuntimeError) as cm: 612 fileinput.input() 613 self.assertEqual(("input() already active",), cm.exception.args) 614 self.assertIs(instance, fileinput._state, "fileinput._state") 615 616 def test_state_is_not_None_and_state_file_is_None(self): 617 """Tests invoking fileinput.input() when fileinput._state is not None 618 but its _file attribute *is* None. Expect it to create and return 619 a new fileinput.FileInput object with all method parameters passed 620 explicitly to the __init__() method; also ensure that 621 fileinput._state is set to the returned instance.""" 622 instance = MockFileInput() 623 instance._file = None 624 fileinput._state = instance 625 self.do_test_call_input() 626 627 def test_state_is_None(self): 628 """Tests invoking fileinput.input() when fileinput._state is None 629 Expect it to create and return a new fileinput.FileInput object 630 with all method parameters passed explicitly to the __init__() 631 method; also ensure that fileinput._state is set to the returned 632 instance.""" 633 fileinput._state = None 634 self.do_test_call_input() 635 636 def do_test_call_input(self): 637 """Tests that fileinput.input() creates a new fileinput.FileInput 638 object, passing the given parameters unmodified to 639 fileinput.FileInput.__init__(). Note that this test depends on the 640 monkey patching of fileinput.FileInput done by setUp().""" 641 files = object() 642 inplace = object() 643 backup = object() 644 bufsize = object() 645 mode = object() 646 openhook = object() 647 648 # call fileinput.input() with different values for each argument 649 result = fileinput.input(files=files, inplace=inplace, backup=backup, 650 bufsize=bufsize, 651 mode=mode, openhook=openhook) 652 653 # ensure fileinput._state was set to the returned object 654 self.assertIs(result, fileinput._state, "fileinput._state") 655 656 # ensure the parameters to fileinput.input() were passed directly 657 # to FileInput.__init__() 658 self.assertIs(files, result.files, "files") 659 self.assertIs(inplace, result.inplace, "inplace") 660 self.assertIs(backup, result.backup, "backup") 661 self.assertIs(bufsize, result.bufsize, "bufsize") 662 self.assertIs(mode, result.mode, "mode") 663 self.assertIs(openhook, result.openhook, "openhook") 664 665class Test_fileinput_close(BaseFileInputGlobalMethodsTest): 666 """Unit tests for fileinput.close()""" 667 668 def test_state_is_None(self): 669 """Tests that fileinput.close() does nothing if fileinput._state 670 is None""" 671 fileinput._state = None 672 fileinput.close() 673 self.assertIsNone(fileinput._state) 674 675 def test_state_is_not_None(self): 676 """Tests that fileinput.close() invokes close() on fileinput._state 677 and sets _state=None""" 678 instance = MockFileInput() 679 fileinput._state = instance 680 fileinput.close() 681 self.assertExactlyOneInvocation(instance, "close") 682 self.assertIsNone(fileinput._state) 683 684class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): 685 """Unit tests for fileinput.nextfile()""" 686 687 def test_state_is_None(self): 688 """Tests fileinput.nextfile() when fileinput._state is None. 689 Ensure that it raises RuntimeError with a meaningful error message 690 and does not modify fileinput._state""" 691 fileinput._state = None 692 with self.assertRaises(RuntimeError) as cm: 693 fileinput.nextfile() 694 self.assertEqual(("no active input()",), cm.exception.args) 695 self.assertIsNone(fileinput._state) 696 697 def test_state_is_not_None(self): 698 """Tests fileinput.nextfile() when fileinput._state is not None. 699 Ensure that it invokes fileinput._state.nextfile() exactly once, 700 returns whatever it returns, and does not modify fileinput._state 701 to point to a different object.""" 702 nextfile_retval = object() 703 instance = MockFileInput() 704 instance.return_values["nextfile"] = nextfile_retval 705 fileinput._state = instance 706 retval = fileinput.nextfile() 707 self.assertExactlyOneInvocation(instance, "nextfile") 708 self.assertIs(retval, nextfile_retval) 709 self.assertIs(fileinput._state, instance) 710 711class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): 712 """Unit tests for fileinput.filename()""" 713 714 def test_state_is_None(self): 715 """Tests fileinput.filename() when fileinput._state is None. 716 Ensure that it raises RuntimeError with a meaningful error message 717 and does not modify fileinput._state""" 718 fileinput._state = None 719 with self.assertRaises(RuntimeError) as cm: 720 fileinput.filename() 721 self.assertEqual(("no active input()",), cm.exception.args) 722 self.assertIsNone(fileinput._state) 723 724 def test_state_is_not_None(self): 725 """Tests fileinput.filename() when fileinput._state is not None. 726 Ensure that it invokes fileinput._state.filename() exactly once, 727 returns whatever it returns, and does not modify fileinput._state 728 to point to a different object.""" 729 filename_retval = object() 730 instance = MockFileInput() 731 instance.return_values["filename"] = filename_retval 732 fileinput._state = instance 733 retval = fileinput.filename() 734 self.assertExactlyOneInvocation(instance, "filename") 735 self.assertIs(retval, filename_retval) 736 self.assertIs(fileinput._state, instance) 737 738class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): 739 """Unit tests for fileinput.lineno()""" 740 741 def test_state_is_None(self): 742 """Tests fileinput.lineno() when fileinput._state is None. 743 Ensure that it raises RuntimeError with a meaningful error message 744 and does not modify fileinput._state""" 745 fileinput._state = None 746 with self.assertRaises(RuntimeError) as cm: 747 fileinput.lineno() 748 self.assertEqual(("no active input()",), cm.exception.args) 749 self.assertIsNone(fileinput._state) 750 751 def test_state_is_not_None(self): 752 """Tests fileinput.lineno() when fileinput._state is not None. 753 Ensure that it invokes fileinput._state.lineno() exactly once, 754 returns whatever it returns, and does not modify fileinput._state 755 to point to a different object.""" 756 lineno_retval = object() 757 instance = MockFileInput() 758 instance.return_values["lineno"] = lineno_retval 759 fileinput._state = instance 760 retval = fileinput.lineno() 761 self.assertExactlyOneInvocation(instance, "lineno") 762 self.assertIs(retval, lineno_retval) 763 self.assertIs(fileinput._state, instance) 764 765class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): 766 """Unit tests for fileinput.filelineno()""" 767 768 def test_state_is_None(self): 769 """Tests fileinput.filelineno() when fileinput._state is None. 770 Ensure that it raises RuntimeError with a meaningful error message 771 and does not modify fileinput._state""" 772 fileinput._state = None 773 with self.assertRaises(RuntimeError) as cm: 774 fileinput.filelineno() 775 self.assertEqual(("no active input()",), cm.exception.args) 776 self.assertIsNone(fileinput._state) 777 778 def test_state_is_not_None(self): 779 """Tests fileinput.filelineno() when fileinput._state is not None. 780 Ensure that it invokes fileinput._state.filelineno() exactly once, 781 returns whatever it returns, and does not modify fileinput._state 782 to point to a different object.""" 783 filelineno_retval = object() 784 instance = MockFileInput() 785 instance.return_values["filelineno"] = filelineno_retval 786 fileinput._state = instance 787 retval = fileinput.filelineno() 788 self.assertExactlyOneInvocation(instance, "filelineno") 789 self.assertIs(retval, filelineno_retval) 790 self.assertIs(fileinput._state, instance) 791 792class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): 793 """Unit tests for fileinput.fileno()""" 794 795 def test_state_is_None(self): 796 """Tests fileinput.fileno() when fileinput._state is None. 797 Ensure that it raises RuntimeError with a meaningful error message 798 and does not modify fileinput._state""" 799 fileinput._state = None 800 with self.assertRaises(RuntimeError) as cm: 801 fileinput.fileno() 802 self.assertEqual(("no active input()",), cm.exception.args) 803 self.assertIsNone(fileinput._state) 804 805 def test_state_is_not_None(self): 806 """Tests fileinput.fileno() when fileinput._state is not None. 807 Ensure that it invokes fileinput._state.fileno() exactly once, 808 returns whatever it returns, and does not modify fileinput._state 809 to point to a different object.""" 810 fileno_retval = object() 811 instance = MockFileInput() 812 instance.return_values["fileno"] = fileno_retval 813 instance.fileno_retval = fileno_retval 814 fileinput._state = instance 815 retval = fileinput.fileno() 816 self.assertExactlyOneInvocation(instance, "fileno") 817 self.assertIs(retval, fileno_retval) 818 self.assertIs(fileinput._state, instance) 819 820class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): 821 """Unit tests for fileinput.isfirstline()""" 822 823 def test_state_is_None(self): 824 """Tests fileinput.isfirstline() when fileinput._state is None. 825 Ensure that it raises RuntimeError with a meaningful error message 826 and does not modify fileinput._state""" 827 fileinput._state = None 828 with self.assertRaises(RuntimeError) as cm: 829 fileinput.isfirstline() 830 self.assertEqual(("no active input()",), cm.exception.args) 831 self.assertIsNone(fileinput._state) 832 833 def test_state_is_not_None(self): 834 """Tests fileinput.isfirstline() when fileinput._state is not None. 835 Ensure that it invokes fileinput._state.isfirstline() exactly once, 836 returns whatever it returns, and does not modify fileinput._state 837 to point to a different object.""" 838 isfirstline_retval = object() 839 instance = MockFileInput() 840 instance.return_values["isfirstline"] = isfirstline_retval 841 fileinput._state = instance 842 retval = fileinput.isfirstline() 843 self.assertExactlyOneInvocation(instance, "isfirstline") 844 self.assertIs(retval, isfirstline_retval) 845 self.assertIs(fileinput._state, instance) 846 847class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): 848 """Unit tests for fileinput.isstdin()""" 849 850 def test_state_is_None(self): 851 """Tests fileinput.isstdin() when fileinput._state is None. 852 Ensure that it raises RuntimeError with a meaningful error message 853 and does not modify fileinput._state""" 854 fileinput._state = None 855 with self.assertRaises(RuntimeError) as cm: 856 fileinput.isstdin() 857 self.assertEqual(("no active input()",), cm.exception.args) 858 self.assertIsNone(fileinput._state) 859 860 def test_state_is_not_None(self): 861 """Tests fileinput.isstdin() when fileinput._state is not None. 862 Ensure that it invokes fileinput._state.isstdin() exactly once, 863 returns whatever it returns, and does not modify fileinput._state 864 to point to a different object.""" 865 isstdin_retval = object() 866 instance = MockFileInput() 867 instance.return_values["isstdin"] = isstdin_retval 868 fileinput._state = instance 869 retval = fileinput.isstdin() 870 self.assertExactlyOneInvocation(instance, "isstdin") 871 self.assertIs(retval, isstdin_retval) 872 self.assertIs(fileinput._state, instance) 873 874class InvocationRecorder: 875 def __init__(self): 876 self.invocation_count = 0 877 def __call__(self, *args, **kwargs): 878 self.invocation_count += 1 879 self.last_invocation = (args, kwargs) 880 881class Test_hook_compressed(unittest.TestCase): 882 """Unit tests for fileinput.hook_compressed()""" 883 884 def setUp(self): 885 self.fake_open = InvocationRecorder() 886 887 def test_empty_string(self): 888 self.do_test_use_builtin_open("", 1) 889 890 def test_no_ext(self): 891 self.do_test_use_builtin_open("abcd", 2) 892 893 @unittest.skipUnless(gzip, "Requires gzip and zlib") 894 def test_gz_ext_fake(self): 895 original_open = gzip.open 896 gzip.open = self.fake_open 897 try: 898 result = fileinput.hook_compressed("test.gz", 3) 899 finally: 900 gzip.open = original_open 901 902 self.assertEqual(self.fake_open.invocation_count, 1) 903 self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {})) 904 905 @unittest.skipUnless(bz2, "Requires bz2") 906 def test_bz2_ext_fake(self): 907 original_open = bz2.BZ2File 908 bz2.BZ2File = self.fake_open 909 try: 910 result = fileinput.hook_compressed("test.bz2", 4) 911 finally: 912 bz2.BZ2File = original_open 913 914 self.assertEqual(self.fake_open.invocation_count, 1) 915 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {})) 916 917 def test_blah_ext(self): 918 self.do_test_use_builtin_open("abcd.blah", 5) 919 920 def test_gz_ext_builtin(self): 921 self.do_test_use_builtin_open("abcd.Gz", 6) 922 923 def test_bz2_ext_builtin(self): 924 self.do_test_use_builtin_open("abcd.Bz2", 7) 925 926 def do_test_use_builtin_open(self, filename, mode): 927 original_open = self.replace_builtin_open(self.fake_open) 928 try: 929 result = fileinput.hook_compressed(filename, mode) 930 finally: 931 self.replace_builtin_open(original_open) 932 933 self.assertEqual(self.fake_open.invocation_count, 1) 934 self.assertEqual(self.fake_open.last_invocation, 935 ((filename, mode), {})) 936 937 @staticmethod 938 def replace_builtin_open(new_open_func): 939 original_open = builtins.open 940 builtins.open = new_open_func 941 return original_open 942 943class Test_hook_encoded(unittest.TestCase): 944 """Unit tests for fileinput.hook_encoded()""" 945 946 def test(self): 947 encoding = object() 948 errors = object() 949 result = fileinput.hook_encoded(encoding, errors=errors) 950 951 fake_open = InvocationRecorder() 952 original_open = builtins.open 953 builtins.open = fake_open 954 try: 955 filename = object() 956 mode = object() 957 open_result = result(filename, mode) 958 finally: 959 builtins.open = original_open 960 961 self.assertEqual(fake_open.invocation_count, 1) 962 963 args, kwargs = fake_open.last_invocation 964 self.assertIs(args[0], filename) 965 self.assertIs(args[1], mode) 966 self.assertIs(kwargs.pop('encoding'), encoding) 967 self.assertIs(kwargs.pop('errors'), errors) 968 self.assertFalse(kwargs) 969 970 def test_errors(self): 971 with open(TESTFN, 'wb') as f: 972 f.write(b'\x80abc') 973 self.addCleanup(safe_unlink, TESTFN) 974 975 def check(errors, expected_lines): 976 with FileInput(files=TESTFN, mode='r', 977 openhook=hook_encoded('utf-8', errors=errors)) as fi: 978 lines = list(fi) 979 self.assertEqual(lines, expected_lines) 980 981 check('ignore', ['abc']) 982 with self.assertRaises(UnicodeDecodeError): 983 check('strict', ['abc']) 984 check('replace', ['\ufffdabc']) 985 check('backslashreplace', ['\\x80abc']) 986 987 def test_modes(self): 988 with open(TESTFN, 'wb') as f: 989 # UTF-7 is a convenient, seldom used encoding 990 f.write(b'A\nB\r\nC\rD+IKw-') 991 self.addCleanup(safe_unlink, TESTFN) 992 993 def check(mode, expected_lines): 994 with FileInput(files=TESTFN, mode=mode, 995 openhook=hook_encoded('utf-7')) as fi: 996 lines = list(fi) 997 self.assertEqual(lines, expected_lines) 998 999 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1000 with self.assertWarns(DeprecationWarning): 1001 check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1002 with self.assertWarns(DeprecationWarning): 1003 check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 1004 with self.assertRaises(ValueError): 1005 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac']) 1006 1007 1008class MiscTest(unittest.TestCase): 1009 1010 def test_all(self): 1011 support.check__all__(self, fileinput) 1012 1013 1014if __name__ == "__main__": 1015 unittest.main() 1016