• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''
2Tests for fileinput module.
3Nick Mathewson
4'''
5import os
6import sys
7import re
8import fileinput
9import collections
10import builtins
11import unittest
12
13try:
14    import bz2
15except ImportError:
16    bz2 = None
17try:
18    import gzip
19except ImportError:
20    gzip = None
21
22from io import BytesIO, StringIO
23from fileinput import FileInput, hook_encoded
24
25from test.support import verbose, TESTFN, check_warnings
26from test.support import unlink as safe_unlink
27from test import support
28from unittest import mock
29
30
31# The fileinput module has 2 interfaces: the FileInput class which does
32# all the work, and a few functions (input, etc.) that use a global _state
33# variable.
34
35# Write lines (a list of lines) to temp file number i, and return the
36# temp file's name.
37def writeTmp(i, lines, mode='w'):  # opening in text mode is the default
38    name = TESTFN + str(i)
39    f = open(name, mode)
40    for line in lines:
41        f.write(line)
42    f.close()
43    return name
44
45def remove_tempfiles(*names):
46    for name in names:
47        if name:
48            safe_unlink(name)
49
50class LineReader:
51
52    def __init__(self):
53        self._linesread = []
54
55    @property
56    def linesread(self):
57        try:
58            return self._linesread[:]
59        finally:
60            self._linesread = []
61
62    def openhook(self, filename, mode):
63        self.it = iter(filename.splitlines(True))
64        return self
65
66    def readline(self, size=None):
67        line = next(self.it, '')
68        self._linesread.append(line)
69        return line
70
71    def readlines(self, hint=-1):
72        lines = []
73        size = 0
74        while True:
75            line = self.readline()
76            if not line:
77                return lines
78            lines.append(line)
79            size += len(line)
80            if size >= hint:
81                return lines
82
83    def close(self):
84        pass
85
86class BufferSizesTests(unittest.TestCase):
87    def test_buffer_sizes(self):
88        # First, run the tests with default and teeny buffer size.
89        for round, bs in (0, 0), (1, 30):
90            t1 = t2 = t3 = t4 = None
91            try:
92                t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)])
93                t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)])
94                t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)])
95                t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)])
96                if bs:
97                    with self.assertWarns(DeprecationWarning):
98                        self.buffer_size_test(t1, t2, t3, t4, bs, round)
99                else:
100                    self.buffer_size_test(t1, t2, t3, t4, bs, round)
101            finally:
102                remove_tempfiles(t1, t2, t3, t4)
103
104    def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
105        pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
106
107        start = 1 + round*6
108        if verbose:
109            print('%s. Simple iteration (bs=%s)' % (start+0, bs))
110        fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
111        lines = list(fi)
112        fi.close()
113        self.assertEqual(len(lines), 31)
114        self.assertEqual(lines[4], 'Line 5 of file 1\n')
115        self.assertEqual(lines[30], 'Line 1 of file 4\n')
116        self.assertEqual(fi.lineno(), 31)
117        self.assertEqual(fi.filename(), t4)
118
119        if verbose:
120            print('%s. Status variables (bs=%s)' % (start+1, bs))
121        fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
122        s = "x"
123        while s and s != 'Line 6 of file 2\n':
124            s = fi.readline()
125        self.assertEqual(fi.filename(), t2)
126        self.assertEqual(fi.lineno(), 21)
127        self.assertEqual(fi.filelineno(), 6)
128        self.assertFalse(fi.isfirstline())
129        self.assertFalse(fi.isstdin())
130
131        if verbose:
132            print('%s. Nextfile (bs=%s)' % (start+2, bs))
133        fi.nextfile()
134        self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
135        self.assertEqual(fi.lineno(), 22)
136        fi.close()
137
138        if verbose:
139            print('%s. Stdin (bs=%s)' % (start+3, bs))
140        fi = FileInput(files=(t1, t2, t3, t4, '-'), bufsize=bs)
141        savestdin = sys.stdin
142        try:
143            sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
144            lines = list(fi)
145            self.assertEqual(len(lines), 33)
146            self.assertEqual(lines[32], 'Line 2 of stdin\n')
147            self.assertEqual(fi.filename(), '<stdin>')
148            fi.nextfile()
149        finally:
150            sys.stdin = savestdin
151
152        if verbose:
153            print('%s. Boundary conditions (bs=%s)' % (start+4, bs))
154        fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
155        self.assertEqual(fi.lineno(), 0)
156        self.assertEqual(fi.filename(), None)
157        fi.nextfile()
158        self.assertEqual(fi.lineno(), 0)
159        self.assertEqual(fi.filename(), None)
160
161        if verbose:
162            print('%s. Inplace (bs=%s)' % (start+5, bs))
163        savestdout = sys.stdout
164        try:
165            fi = FileInput(files=(t1, t2, t3, t4), inplace=1, bufsize=bs)
166            for line in fi:
167                line = line[:-1].upper()
168                print(line)
169            fi.close()
170        finally:
171            sys.stdout = savestdout
172
173        fi = FileInput(files=(t1, t2, t3, t4), bufsize=bs)
174        for line in fi:
175            self.assertEqual(line[-1], '\n')
176            m = pat.match(line[:-1])
177            self.assertNotEqual(m, None)
178            self.assertEqual(int(m.group(1)), fi.filelineno())
179        fi.close()
180
181class UnconditionallyRaise:
182    def __init__(self, exception_type):
183        self.exception_type = exception_type
184        self.invoked = False
185    def __call__(self, *args, **kwargs):
186        self.invoked = True
187        raise self.exception_type()
188
189class FileInputTests(unittest.TestCase):
190
191    def test_zero_byte_files(self):
192        t1 = t2 = t3 = t4 = None
193        try:
194            t1 = writeTmp(1, [""])
195            t2 = writeTmp(2, [""])
196            t3 = writeTmp(3, ["The only line there is.\n"])
197            t4 = writeTmp(4, [""])
198            fi = FileInput(files=(t1, t2, t3, t4))
199
200            line = fi.readline()
201            self.assertEqual(line, 'The only line there is.\n')
202            self.assertEqual(fi.lineno(), 1)
203            self.assertEqual(fi.filelineno(), 1)
204            self.assertEqual(fi.filename(), t3)
205
206            line = fi.readline()
207            self.assertFalse(line)
208            self.assertEqual(fi.lineno(), 1)
209            self.assertEqual(fi.filelineno(), 0)
210            self.assertEqual(fi.filename(), t4)
211            fi.close()
212        finally:
213            remove_tempfiles(t1, t2, t3, t4)
214
215    def test_files_that_dont_end_with_newline(self):
216        t1 = t2 = None
217        try:
218            t1 = writeTmp(1, ["A\nB\nC"])
219            t2 = writeTmp(2, ["D\nE\nF"])
220            fi = FileInput(files=(t1, t2))
221            lines = list(fi)
222            self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
223            self.assertEqual(fi.filelineno(), 3)
224            self.assertEqual(fi.lineno(), 6)
225        finally:
226            remove_tempfiles(t1, t2)
227
228##     def test_unicode_filenames(self):
229##         # XXX A unicode string is always returned by writeTmp.
230##         #     So is this needed?
231##         try:
232##             t1 = writeTmp(1, ["A\nB"])
233##             encoding = sys.getfilesystemencoding()
234##             if encoding is None:
235##                 encoding = 'ascii'
236##             fi = FileInput(files=str(t1, encoding))
237##             lines = list(fi)
238##             self.assertEqual(lines, ["A\n", "B"])
239##         finally:
240##             remove_tempfiles(t1)
241
242    def test_fileno(self):
243        t1 = t2 = None
244        try:
245            t1 = writeTmp(1, ["A\nB"])
246            t2 = writeTmp(2, ["C\nD"])
247            fi = FileInput(files=(t1, t2))
248            self.assertEqual(fi.fileno(), -1)
249            line =next( fi)
250            self.assertNotEqual(fi.fileno(), -1)
251            fi.nextfile()
252            self.assertEqual(fi.fileno(), -1)
253            line = list(fi)
254            self.assertEqual(fi.fileno(), -1)
255        finally:
256            remove_tempfiles(t1, t2)
257
258    def test_opening_mode(self):
259        try:
260            # invalid mode, should raise ValueError
261            fi = FileInput(mode="w")
262            self.fail("FileInput should reject invalid mode argument")
263        except ValueError:
264            pass
265        t1 = None
266        try:
267            # try opening in universal newline mode
268            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
269            with check_warnings(('', DeprecationWarning)):
270                fi = FileInput(files=t1, mode="U")
271            with check_warnings(('', DeprecationWarning)):
272                lines = list(fi)
273            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
274        finally:
275            remove_tempfiles(t1)
276
277    def test_stdin_binary_mode(self):
278        with mock.patch('sys.stdin') as m_stdin:
279            m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
280            fi = FileInput(files=['-'], mode='rb')
281            lines = list(fi)
282            self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
283
284    def test_detached_stdin_binary_mode(self):
285        orig_stdin = sys.stdin
286        try:
287            sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
288            self.assertFalse(hasattr(sys.stdin, 'buffer'))
289            fi = FileInput(files=['-'], mode='rb')
290            lines = list(fi)
291            self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
292        finally:
293            sys.stdin = orig_stdin
294
295    def test_file_opening_hook(self):
296        try:
297            # cannot use openhook and inplace mode
298            fi = FileInput(inplace=1, openhook=lambda f, m: None)
299            self.fail("FileInput should raise if both inplace "
300                             "and openhook arguments are given")
301        except ValueError:
302            pass
303        try:
304            fi = FileInput(openhook=1)
305            self.fail("FileInput should check openhook for being callable")
306        except ValueError:
307            pass
308
309        class CustomOpenHook:
310            def __init__(self):
311                self.invoked = False
312            def __call__(self, *args):
313                self.invoked = True
314                return open(*args)
315
316        t = writeTmp(1, ["\n"])
317        self.addCleanup(remove_tempfiles, t)
318        custom_open_hook = CustomOpenHook()
319        with FileInput([t], openhook=custom_open_hook) as fi:
320            fi.readline()
321        self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
322
323    def test_readline(self):
324        with open(TESTFN, 'wb') as f:
325            f.write(b'A\nB\r\nC\r')
326            # Fill TextIOWrapper buffer.
327            f.write(b'123456789\n' * 1000)
328            # Issue #20501: readline() shouldn't read whole file.
329            f.write(b'\x80')
330        self.addCleanup(safe_unlink, TESTFN)
331
332        with FileInput(files=TESTFN,
333                       openhook=hook_encoded('ascii')) as fi:
334            try:
335                self.assertEqual(fi.readline(), 'A\n')
336                self.assertEqual(fi.readline(), 'B\n')
337                self.assertEqual(fi.readline(), 'C\n')
338            except UnicodeDecodeError:
339                self.fail('Read to end of file')
340            with self.assertRaises(UnicodeDecodeError):
341                # Read to the end of file.
342                list(fi)
343            self.assertEqual(fi.readline(), '')
344            self.assertEqual(fi.readline(), '')
345
346    def test_readline_binary_mode(self):
347        with open(TESTFN, 'wb') as f:
348            f.write(b'A\nB\r\nC\rD')
349        self.addCleanup(safe_unlink, TESTFN)
350
351        with FileInput(files=TESTFN, mode='rb') as fi:
352            self.assertEqual(fi.readline(), b'A\n')
353            self.assertEqual(fi.readline(), b'B\r\n')
354            self.assertEqual(fi.readline(), b'C\rD')
355            # Read to the end of file.
356            self.assertEqual(fi.readline(), b'')
357            self.assertEqual(fi.readline(), b'')
358
359    def test_context_manager(self):
360        try:
361            t1 = writeTmp(1, ["A\nB\nC"])
362            t2 = writeTmp(2, ["D\nE\nF"])
363            with FileInput(files=(t1, t2)) as fi:
364                lines = list(fi)
365            self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
366            self.assertEqual(fi.filelineno(), 3)
367            self.assertEqual(fi.lineno(), 6)
368            self.assertEqual(fi._files, ())
369        finally:
370            remove_tempfiles(t1, t2)
371
372    def test_close_on_exception(self):
373        try:
374            t1 = writeTmp(1, [""])
375            with FileInput(files=t1) as fi:
376                raise OSError
377        except OSError:
378            self.assertEqual(fi._files, ())
379        finally:
380            remove_tempfiles(t1)
381
382    def test_empty_files_list_specified_to_constructor(self):
383        with FileInput(files=[]) as fi:
384            self.assertEqual(fi._files, ('-',))
385
386    def test__getitem__(self):
387        """Tests invoking FileInput.__getitem__() with the current
388           line number"""
389        t = writeTmp(1, ["line1\n", "line2\n"])
390        self.addCleanup(remove_tempfiles, t)
391        with FileInput(files=[t]) as fi:
392            retval1 = fi[0]
393            self.assertEqual(retval1, "line1\n")
394            retval2 = fi[1]
395            self.assertEqual(retval2, "line2\n")
396
397    def test__getitem__invalid_key(self):
398        """Tests invoking FileInput.__getitem__() with an index unequal to
399           the line number"""
400        t = writeTmp(1, ["line1\n", "line2\n"])
401        self.addCleanup(remove_tempfiles, t)
402        with FileInput(files=[t]) as fi:
403            with self.assertRaises(RuntimeError) as cm:
404                fi[1]
405        self.assertEqual(cm.exception.args, ("accessing lines out of order",))
406
407    def test__getitem__eof(self):
408        """Tests invoking FileInput.__getitem__() with the line number but at
409           end-of-input"""
410        t = writeTmp(1, [])
411        self.addCleanup(remove_tempfiles, t)
412        with FileInput(files=[t]) as fi:
413            with self.assertRaises(IndexError) as cm:
414                fi[0]
415        self.assertEqual(cm.exception.args, ("end of input reached",))
416
417    def test_nextfile_oserror_deleting_backup(self):
418        """Tests invoking FileInput.nextfile() when the attempt to delete
419           the backup file would raise OSError.  This error is expected to be
420           silently ignored"""
421
422        os_unlink_orig = os.unlink
423        os_unlink_replacement = UnconditionallyRaise(OSError)
424        try:
425            t = writeTmp(1, ["\n"])
426            self.addCleanup(remove_tempfiles, t)
427            with FileInput(files=[t], inplace=True) as fi:
428                next(fi) # make sure the file is opened
429                os.unlink = os_unlink_replacement
430                fi.nextfile()
431        finally:
432            os.unlink = os_unlink_orig
433
434        # sanity check to make sure that our test scenario was actually hit
435        self.assertTrue(os_unlink_replacement.invoked,
436                        "os.unlink() was not invoked")
437
438    def test_readline_os_fstat_raises_OSError(self):
439        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
440           This exception should be silently discarded."""
441
442        os_fstat_orig = os.fstat
443        os_fstat_replacement = UnconditionallyRaise(OSError)
444        try:
445            t = writeTmp(1, ["\n"])
446            self.addCleanup(remove_tempfiles, t)
447            with FileInput(files=[t], inplace=True) as fi:
448                os.fstat = os_fstat_replacement
449                fi.readline()
450        finally:
451            os.fstat = os_fstat_orig
452
453        # sanity check to make sure that our test scenario was actually hit
454        self.assertTrue(os_fstat_replacement.invoked,
455                        "os.fstat() was not invoked")
456
457    @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist")
458    def test_readline_os_chmod_raises_OSError(self):
459        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
460           This exception should be silently discarded."""
461
462        os_chmod_orig = os.chmod
463        os_chmod_replacement = UnconditionallyRaise(OSError)
464        try:
465            t = writeTmp(1, ["\n"])
466            self.addCleanup(remove_tempfiles, t)
467            with FileInput(files=[t], inplace=True) as fi:
468                os.chmod = os_chmod_replacement
469                fi.readline()
470        finally:
471            os.chmod = os_chmod_orig
472
473        # sanity check to make sure that our test scenario was actually hit
474        self.assertTrue(os_chmod_replacement.invoked,
475                        "os.fstat() was not invoked")
476
477    def test_fileno_when_ValueError_raised(self):
478        class FilenoRaisesValueError(UnconditionallyRaise):
479            def __init__(self):
480                UnconditionallyRaise.__init__(self, ValueError)
481            def fileno(self):
482                self.__call__()
483
484        unconditionally_raise_ValueError = FilenoRaisesValueError()
485        t = writeTmp(1, ["\n"])
486        self.addCleanup(remove_tempfiles, t)
487        with FileInput(files=[t]) as fi:
488            file_backup = fi._file
489            try:
490                fi._file = unconditionally_raise_ValueError
491                result = fi.fileno()
492            finally:
493                fi._file = file_backup # make sure the file gets cleaned up
494
495        # sanity check to make sure that our test scenario was actually hit
496        self.assertTrue(unconditionally_raise_ValueError.invoked,
497                        "_file.fileno() was not invoked")
498
499        self.assertEqual(result, -1, "fileno() should return -1")
500
501    def test_readline_buffering(self):
502        src = LineReader()
503        with FileInput(files=['line1\nline2', 'line3\n'],
504                       openhook=src.openhook) as fi:
505            self.assertEqual(src.linesread, [])
506            self.assertEqual(fi.readline(), 'line1\n')
507            self.assertEqual(src.linesread, ['line1\n'])
508            self.assertEqual(fi.readline(), 'line2')
509            self.assertEqual(src.linesread, ['line2'])
510            self.assertEqual(fi.readline(), 'line3\n')
511            self.assertEqual(src.linesread, ['', 'line3\n'])
512            self.assertEqual(fi.readline(), '')
513            self.assertEqual(src.linesread, [''])
514            self.assertEqual(fi.readline(), '')
515            self.assertEqual(src.linesread, [])
516
517    def test_iteration_buffering(self):
518        src = LineReader()
519        with FileInput(files=['line1\nline2', 'line3\n'],
520                       openhook=src.openhook) as fi:
521            self.assertEqual(src.linesread, [])
522            self.assertEqual(next(fi), 'line1\n')
523            self.assertEqual(src.linesread, ['line1\n'])
524            self.assertEqual(next(fi), 'line2')
525            self.assertEqual(src.linesread, ['line2'])
526            self.assertEqual(next(fi), 'line3\n')
527            self.assertEqual(src.linesread, ['', 'line3\n'])
528            self.assertRaises(StopIteration, next, fi)
529            self.assertEqual(src.linesread, [''])
530            self.assertRaises(StopIteration, next, fi)
531            self.assertEqual(src.linesread, [])
532
533class MockFileInput:
534    """A class that mocks out fileinput.FileInput for use during unit tests"""
535
536    def __init__(self, files=None, inplace=False, backup="", bufsize=0,
537                 mode="r", openhook=None):
538        self.files = files
539        self.inplace = inplace
540        self.backup = backup
541        self.bufsize = bufsize
542        self.mode = mode
543        self.openhook = openhook
544        self._file = None
545        self.invocation_counts = collections.defaultdict(lambda: 0)
546        self.return_values = {}
547
548    def close(self):
549        self.invocation_counts["close"] += 1
550
551    def nextfile(self):
552        self.invocation_counts["nextfile"] += 1
553        return self.return_values["nextfile"]
554
555    def filename(self):
556        self.invocation_counts["filename"] += 1
557        return self.return_values["filename"]
558
559    def lineno(self):
560        self.invocation_counts["lineno"] += 1
561        return self.return_values["lineno"]
562
563    def filelineno(self):
564        self.invocation_counts["filelineno"] += 1
565        return self.return_values["filelineno"]
566
567    def fileno(self):
568        self.invocation_counts["fileno"] += 1
569        return self.return_values["fileno"]
570
571    def isfirstline(self):
572        self.invocation_counts["isfirstline"] += 1
573        return self.return_values["isfirstline"]
574
575    def isstdin(self):
576        self.invocation_counts["isstdin"] += 1
577        return self.return_values["isstdin"]
578
579class BaseFileInputGlobalMethodsTest(unittest.TestCase):
580    """Base class for unit tests for the global function of
581       the fileinput module."""
582
583    def setUp(self):
584        self._orig_state = fileinput._state
585        self._orig_FileInput = fileinput.FileInput
586        fileinput.FileInput = MockFileInput
587
588    def tearDown(self):
589        fileinput.FileInput = self._orig_FileInput
590        fileinput._state = self._orig_state
591
592    def assertExactlyOneInvocation(self, mock_file_input, method_name):
593        # assert that the method with the given name was invoked once
594        actual_count = mock_file_input.invocation_counts[method_name]
595        self.assertEqual(actual_count, 1, method_name)
596        # assert that no other unexpected methods were invoked
597        actual_total_count = len(mock_file_input.invocation_counts)
598        self.assertEqual(actual_total_count, 1)
599
600class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
601    """Unit tests for fileinput.input()"""
602
603    def test_state_is_not_None_and_state_file_is_not_None(self):
604        """Tests invoking fileinput.input() when fileinput._state is not None
605           and its _file attribute is also not None.  Expect RuntimeError to
606           be raised with a meaningful error message and for fileinput._state
607           to *not* be modified."""
608        instance = MockFileInput()
609        instance._file = object()
610        fileinput._state = instance
611        with self.assertRaises(RuntimeError) as cm:
612            fileinput.input()
613        self.assertEqual(("input() already active",), cm.exception.args)
614        self.assertIs(instance, fileinput._state, "fileinput._state")
615
616    def test_state_is_not_None_and_state_file_is_None(self):
617        """Tests invoking fileinput.input() when fileinput._state is not None
618           but its _file attribute *is* None.  Expect it to create and return
619           a new fileinput.FileInput object with all method parameters passed
620           explicitly to the __init__() method; also ensure that
621           fileinput._state is set to the returned instance."""
622        instance = MockFileInput()
623        instance._file = None
624        fileinput._state = instance
625        self.do_test_call_input()
626
627    def test_state_is_None(self):
628        """Tests invoking fileinput.input() when fileinput._state is None
629           Expect it to create and return a new fileinput.FileInput object
630           with all method parameters passed explicitly to the __init__()
631           method; also ensure that fileinput._state is set to the returned
632           instance."""
633        fileinput._state = None
634        self.do_test_call_input()
635
636    def do_test_call_input(self):
637        """Tests that fileinput.input() creates a new fileinput.FileInput
638           object, passing the given parameters unmodified to
639           fileinput.FileInput.__init__().  Note that this test depends on the
640           monkey patching of fileinput.FileInput done by setUp()."""
641        files = object()
642        inplace = object()
643        backup = object()
644        bufsize = object()
645        mode = object()
646        openhook = object()
647
648        # call fileinput.input() with different values for each argument
649        result = fileinput.input(files=files, inplace=inplace, backup=backup,
650                                 bufsize=bufsize,
651            mode=mode, openhook=openhook)
652
653        # ensure fileinput._state was set to the returned object
654        self.assertIs(result, fileinput._state, "fileinput._state")
655
656        # ensure the parameters to fileinput.input() were passed directly
657        # to FileInput.__init__()
658        self.assertIs(files, result.files, "files")
659        self.assertIs(inplace, result.inplace, "inplace")
660        self.assertIs(backup, result.backup, "backup")
661        self.assertIs(bufsize, result.bufsize, "bufsize")
662        self.assertIs(mode, result.mode, "mode")
663        self.assertIs(openhook, result.openhook, "openhook")
664
665class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
666    """Unit tests for fileinput.close()"""
667
668    def test_state_is_None(self):
669        """Tests that fileinput.close() does nothing if fileinput._state
670           is None"""
671        fileinput._state = None
672        fileinput.close()
673        self.assertIsNone(fileinput._state)
674
675    def test_state_is_not_None(self):
676        """Tests that fileinput.close() invokes close() on fileinput._state
677           and sets _state=None"""
678        instance = MockFileInput()
679        fileinput._state = instance
680        fileinput.close()
681        self.assertExactlyOneInvocation(instance, "close")
682        self.assertIsNone(fileinput._state)
683
684class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
685    """Unit tests for fileinput.nextfile()"""
686
687    def test_state_is_None(self):
688        """Tests fileinput.nextfile() when fileinput._state is None.
689           Ensure that it raises RuntimeError with a meaningful error message
690           and does not modify fileinput._state"""
691        fileinput._state = None
692        with self.assertRaises(RuntimeError) as cm:
693            fileinput.nextfile()
694        self.assertEqual(("no active input()",), cm.exception.args)
695        self.assertIsNone(fileinput._state)
696
697    def test_state_is_not_None(self):
698        """Tests fileinput.nextfile() when fileinput._state is not None.
699           Ensure that it invokes fileinput._state.nextfile() exactly once,
700           returns whatever it returns, and does not modify fileinput._state
701           to point to a different object."""
702        nextfile_retval = object()
703        instance = MockFileInput()
704        instance.return_values["nextfile"] = nextfile_retval
705        fileinput._state = instance
706        retval = fileinput.nextfile()
707        self.assertExactlyOneInvocation(instance, "nextfile")
708        self.assertIs(retval, nextfile_retval)
709        self.assertIs(fileinput._state, instance)
710
711class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
712    """Unit tests for fileinput.filename()"""
713
714    def test_state_is_None(self):
715        """Tests fileinput.filename() when fileinput._state is None.
716           Ensure that it raises RuntimeError with a meaningful error message
717           and does not modify fileinput._state"""
718        fileinput._state = None
719        with self.assertRaises(RuntimeError) as cm:
720            fileinput.filename()
721        self.assertEqual(("no active input()",), cm.exception.args)
722        self.assertIsNone(fileinput._state)
723
724    def test_state_is_not_None(self):
725        """Tests fileinput.filename() when fileinput._state is not None.
726           Ensure that it invokes fileinput._state.filename() exactly once,
727           returns whatever it returns, and does not modify fileinput._state
728           to point to a different object."""
729        filename_retval = object()
730        instance = MockFileInput()
731        instance.return_values["filename"] = filename_retval
732        fileinput._state = instance
733        retval = fileinput.filename()
734        self.assertExactlyOneInvocation(instance, "filename")
735        self.assertIs(retval, filename_retval)
736        self.assertIs(fileinput._state, instance)
737
738class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
739    """Unit tests for fileinput.lineno()"""
740
741    def test_state_is_None(self):
742        """Tests fileinput.lineno() when fileinput._state is None.
743           Ensure that it raises RuntimeError with a meaningful error message
744           and does not modify fileinput._state"""
745        fileinput._state = None
746        with self.assertRaises(RuntimeError) as cm:
747            fileinput.lineno()
748        self.assertEqual(("no active input()",), cm.exception.args)
749        self.assertIsNone(fileinput._state)
750
751    def test_state_is_not_None(self):
752        """Tests fileinput.lineno() when fileinput._state is not None.
753           Ensure that it invokes fileinput._state.lineno() exactly once,
754           returns whatever it returns, and does not modify fileinput._state
755           to point to a different object."""
756        lineno_retval = object()
757        instance = MockFileInput()
758        instance.return_values["lineno"] = lineno_retval
759        fileinput._state = instance
760        retval = fileinput.lineno()
761        self.assertExactlyOneInvocation(instance, "lineno")
762        self.assertIs(retval, lineno_retval)
763        self.assertIs(fileinput._state, instance)
764
765class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
766    """Unit tests for fileinput.filelineno()"""
767
768    def test_state_is_None(self):
769        """Tests fileinput.filelineno() when fileinput._state is None.
770           Ensure that it raises RuntimeError with a meaningful error message
771           and does not modify fileinput._state"""
772        fileinput._state = None
773        with self.assertRaises(RuntimeError) as cm:
774            fileinput.filelineno()
775        self.assertEqual(("no active input()",), cm.exception.args)
776        self.assertIsNone(fileinput._state)
777
778    def test_state_is_not_None(self):
779        """Tests fileinput.filelineno() when fileinput._state is not None.
780           Ensure that it invokes fileinput._state.filelineno() exactly once,
781           returns whatever it returns, and does not modify fileinput._state
782           to point to a different object."""
783        filelineno_retval = object()
784        instance = MockFileInput()
785        instance.return_values["filelineno"] = filelineno_retval
786        fileinput._state = instance
787        retval = fileinput.filelineno()
788        self.assertExactlyOneInvocation(instance, "filelineno")
789        self.assertIs(retval, filelineno_retval)
790        self.assertIs(fileinput._state, instance)
791
792class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
793    """Unit tests for fileinput.fileno()"""
794
795    def test_state_is_None(self):
796        """Tests fileinput.fileno() when fileinput._state is None.
797           Ensure that it raises RuntimeError with a meaningful error message
798           and does not modify fileinput._state"""
799        fileinput._state = None
800        with self.assertRaises(RuntimeError) as cm:
801            fileinput.fileno()
802        self.assertEqual(("no active input()",), cm.exception.args)
803        self.assertIsNone(fileinput._state)
804
805    def test_state_is_not_None(self):
806        """Tests fileinput.fileno() when fileinput._state is not None.
807           Ensure that it invokes fileinput._state.fileno() exactly once,
808           returns whatever it returns, and does not modify fileinput._state
809           to point to a different object."""
810        fileno_retval = object()
811        instance = MockFileInput()
812        instance.return_values["fileno"] = fileno_retval
813        instance.fileno_retval = fileno_retval
814        fileinput._state = instance
815        retval = fileinput.fileno()
816        self.assertExactlyOneInvocation(instance, "fileno")
817        self.assertIs(retval, fileno_retval)
818        self.assertIs(fileinput._state, instance)
819
820class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
821    """Unit tests for fileinput.isfirstline()"""
822
823    def test_state_is_None(self):
824        """Tests fileinput.isfirstline() when fileinput._state is None.
825           Ensure that it raises RuntimeError with a meaningful error message
826           and does not modify fileinput._state"""
827        fileinput._state = None
828        with self.assertRaises(RuntimeError) as cm:
829            fileinput.isfirstline()
830        self.assertEqual(("no active input()",), cm.exception.args)
831        self.assertIsNone(fileinput._state)
832
833    def test_state_is_not_None(self):
834        """Tests fileinput.isfirstline() when fileinput._state is not None.
835           Ensure that it invokes fileinput._state.isfirstline() exactly once,
836           returns whatever it returns, and does not modify fileinput._state
837           to point to a different object."""
838        isfirstline_retval = object()
839        instance = MockFileInput()
840        instance.return_values["isfirstline"] = isfirstline_retval
841        fileinput._state = instance
842        retval = fileinput.isfirstline()
843        self.assertExactlyOneInvocation(instance, "isfirstline")
844        self.assertIs(retval, isfirstline_retval)
845        self.assertIs(fileinput._state, instance)
846
847class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
848    """Unit tests for fileinput.isstdin()"""
849
850    def test_state_is_None(self):
851        """Tests fileinput.isstdin() when fileinput._state is None.
852           Ensure that it raises RuntimeError with a meaningful error message
853           and does not modify fileinput._state"""
854        fileinput._state = None
855        with self.assertRaises(RuntimeError) as cm:
856            fileinput.isstdin()
857        self.assertEqual(("no active input()",), cm.exception.args)
858        self.assertIsNone(fileinput._state)
859
860    def test_state_is_not_None(self):
861        """Tests fileinput.isstdin() when fileinput._state is not None.
862           Ensure that it invokes fileinput._state.isstdin() exactly once,
863           returns whatever it returns, and does not modify fileinput._state
864           to point to a different object."""
865        isstdin_retval = object()
866        instance = MockFileInput()
867        instance.return_values["isstdin"] = isstdin_retval
868        fileinput._state = instance
869        retval = fileinput.isstdin()
870        self.assertExactlyOneInvocation(instance, "isstdin")
871        self.assertIs(retval, isstdin_retval)
872        self.assertIs(fileinput._state, instance)
873
874class InvocationRecorder:
875    def __init__(self):
876        self.invocation_count = 0
877    def __call__(self, *args, **kwargs):
878        self.invocation_count += 1
879        self.last_invocation = (args, kwargs)
880
881class Test_hook_compressed(unittest.TestCase):
882    """Unit tests for fileinput.hook_compressed()"""
883
884    def setUp(self):
885        self.fake_open = InvocationRecorder()
886
887    def test_empty_string(self):
888        self.do_test_use_builtin_open("", 1)
889
890    def test_no_ext(self):
891        self.do_test_use_builtin_open("abcd", 2)
892
893    @unittest.skipUnless(gzip, "Requires gzip and zlib")
894    def test_gz_ext_fake(self):
895        original_open = gzip.open
896        gzip.open = self.fake_open
897        try:
898            result = fileinput.hook_compressed("test.gz", 3)
899        finally:
900            gzip.open = original_open
901
902        self.assertEqual(self.fake_open.invocation_count, 1)
903        self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
904
905    @unittest.skipUnless(bz2, "Requires bz2")
906    def test_bz2_ext_fake(self):
907        original_open = bz2.BZ2File
908        bz2.BZ2File = self.fake_open
909        try:
910            result = fileinput.hook_compressed("test.bz2", 4)
911        finally:
912            bz2.BZ2File = original_open
913
914        self.assertEqual(self.fake_open.invocation_count, 1)
915        self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {}))
916
917    def test_blah_ext(self):
918        self.do_test_use_builtin_open("abcd.blah", 5)
919
920    def test_gz_ext_builtin(self):
921        self.do_test_use_builtin_open("abcd.Gz", 6)
922
923    def test_bz2_ext_builtin(self):
924        self.do_test_use_builtin_open("abcd.Bz2", 7)
925
926    def do_test_use_builtin_open(self, filename, mode):
927        original_open = self.replace_builtin_open(self.fake_open)
928        try:
929            result = fileinput.hook_compressed(filename, mode)
930        finally:
931            self.replace_builtin_open(original_open)
932
933        self.assertEqual(self.fake_open.invocation_count, 1)
934        self.assertEqual(self.fake_open.last_invocation,
935                         ((filename, mode), {}))
936
937    @staticmethod
938    def replace_builtin_open(new_open_func):
939        original_open = builtins.open
940        builtins.open = new_open_func
941        return original_open
942
943class Test_hook_encoded(unittest.TestCase):
944    """Unit tests for fileinput.hook_encoded()"""
945
946    def test(self):
947        encoding = object()
948        errors = object()
949        result = fileinput.hook_encoded(encoding, errors=errors)
950
951        fake_open = InvocationRecorder()
952        original_open = builtins.open
953        builtins.open = fake_open
954        try:
955            filename = object()
956            mode = object()
957            open_result = result(filename, mode)
958        finally:
959            builtins.open = original_open
960
961        self.assertEqual(fake_open.invocation_count, 1)
962
963        args, kwargs = fake_open.last_invocation
964        self.assertIs(args[0], filename)
965        self.assertIs(args[1], mode)
966        self.assertIs(kwargs.pop('encoding'), encoding)
967        self.assertIs(kwargs.pop('errors'), errors)
968        self.assertFalse(kwargs)
969
970    def test_errors(self):
971        with open(TESTFN, 'wb') as f:
972            f.write(b'\x80abc')
973        self.addCleanup(safe_unlink, TESTFN)
974
975        def check(errors, expected_lines):
976            with FileInput(files=TESTFN, mode='r',
977                           openhook=hook_encoded('utf-8', errors=errors)) as fi:
978                lines = list(fi)
979            self.assertEqual(lines, expected_lines)
980
981        check('ignore', ['abc'])
982        with self.assertRaises(UnicodeDecodeError):
983            check('strict', ['abc'])
984        check('replace', ['\ufffdabc'])
985        check('backslashreplace', ['\\x80abc'])
986
987    def test_modes(self):
988        with open(TESTFN, 'wb') as f:
989            # UTF-7 is a convenient, seldom used encoding
990            f.write(b'A\nB\r\nC\rD+IKw-')
991        self.addCleanup(safe_unlink, TESTFN)
992
993        def check(mode, expected_lines):
994            with FileInput(files=TESTFN, mode=mode,
995                           openhook=hook_encoded('utf-7')) as fi:
996                lines = list(fi)
997            self.assertEqual(lines, expected_lines)
998
999        check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1000        with self.assertWarns(DeprecationWarning):
1001            check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1002        with self.assertWarns(DeprecationWarning):
1003            check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
1004        with self.assertRaises(ValueError):
1005            check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
1006
1007
1008class MiscTest(unittest.TestCase):
1009
1010    def test_all(self):
1011        support.check__all__(self, fileinput)
1012
1013
1014if __name__ == "__main__":
1015    unittest.main()
1016