• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Adapted from test_file.py by Daniel Stutzbach
2
3import sys
4import os
5import io
6import errno
7import unittest
8from array import array
9from weakref import proxy
10from functools import wraps
11
12from test.support import (TESTFN, TESTFN_UNICODE, check_warnings, run_unittest,
13                          make_bad_fd, cpython_only, swap_attr)
14from collections import UserList
15
16import _io  # C implementation of io
17import _pyio # Python implementation of io
18
19
20class AutoFileTests:
21    # file tests for which a test file is automatically set up
22
23    def setUp(self):
24        self.f = self.FileIO(TESTFN, 'w')
25
26    def tearDown(self):
27        if self.f:
28            self.f.close()
29        os.remove(TESTFN)
30
31    def testWeakRefs(self):
32        # verify weak references
33        p = proxy(self.f)
34        p.write(bytes(range(10)))
35        self.assertEqual(self.f.tell(), p.tell())
36        self.f.close()
37        self.f = None
38        self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40    def testSeekTell(self):
41        self.f.write(bytes(range(20)))
42        self.assertEqual(self.f.tell(), 20)
43        self.f.seek(0)
44        self.assertEqual(self.f.tell(), 0)
45        self.f.seek(10)
46        self.assertEqual(self.f.tell(), 10)
47        self.f.seek(5, 1)
48        self.assertEqual(self.f.tell(), 15)
49        self.f.seek(-5, 1)
50        self.assertEqual(self.f.tell(), 10)
51        self.f.seek(-5, 2)
52        self.assertEqual(self.f.tell(), 15)
53
54    def testAttributes(self):
55        # verify expected attributes exist
56        f = self.f
57
58        self.assertEqual(f.mode, "wb")
59        self.assertEqual(f.closed, False)
60
61        # verify the attributes are readonly
62        for attr in 'mode', 'closed':
63            self.assertRaises((AttributeError, TypeError),
64                              setattr, f, attr, 'oops')
65
66    def testBlksize(self):
67        # test private _blksize attribute
68        blksize = io.DEFAULT_BUFFER_SIZE
69        # try to get preferred blksize from stat.st_blksize, if available
70        if hasattr(os, 'fstat'):
71            fst = os.fstat(self.f.fileno())
72            blksize = getattr(fst, 'st_blksize', blksize)
73        self.assertEqual(self.f._blksize, blksize)
74
75    # verify readinto
76    def testReadintoByteArray(self):
77        self.f.write(bytes([1, 2, 0, 255]))
78        self.f.close()
79
80        ba = bytearray(b'abcdefgh')
81        with self.FileIO(TESTFN, 'r') as f:
82            n = f.readinto(ba)
83        self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
84        self.assertEqual(n, 4)
85
86    def _testReadintoMemoryview(self):
87        self.f.write(bytes([1, 2, 0, 255]))
88        self.f.close()
89
90        m = memoryview(bytearray(b'abcdefgh'))
91        with self.FileIO(TESTFN, 'r') as f:
92            n = f.readinto(m)
93        self.assertEqual(m, b'\x01\x02\x00\xffefgh')
94        self.assertEqual(n, 4)
95
96        m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
97        with self.FileIO(TESTFN, 'r') as f:
98            n = f.readinto(m)
99        self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
100        self.assertEqual(n, 4)
101
102    def _testReadintoArray(self):
103        self.f.write(bytes([1, 2, 0, 255]))
104        self.f.close()
105
106        a = array('B', b'abcdefgh')
107        with self.FileIO(TESTFN, 'r') as f:
108            n = f.readinto(a)
109        self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
110        self.assertEqual(n, 4)
111
112        a = array('b', b'abcdefgh')
113        with self.FileIO(TESTFN, 'r') as f:
114            n = f.readinto(a)
115        self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
116        self.assertEqual(n, 4)
117
118        a = array('I', b'abcdefgh')
119        with self.FileIO(TESTFN, 'r') as f:
120            n = f.readinto(a)
121        self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
122        self.assertEqual(n, 4)
123
124    def testWritelinesList(self):
125        l = [b'123', b'456']
126        self.f.writelines(l)
127        self.f.close()
128        self.f = self.FileIO(TESTFN, 'rb')
129        buf = self.f.read()
130        self.assertEqual(buf, b'123456')
131
132    def testWritelinesUserList(self):
133        l = UserList([b'123', b'456'])
134        self.f.writelines(l)
135        self.f.close()
136        self.f = self.FileIO(TESTFN, 'rb')
137        buf = self.f.read()
138        self.assertEqual(buf, b'123456')
139
140    def testWritelinesError(self):
141        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
142        self.assertRaises(TypeError, self.f.writelines, None)
143        self.assertRaises(TypeError, self.f.writelines, "abc")
144
145    def test_none_args(self):
146        self.f.write(b"hi\nbye\nabc")
147        self.f.close()
148        self.f = self.FileIO(TESTFN, 'r')
149        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
150        self.f.seek(0)
151        self.assertEqual(self.f.readline(None), b"hi\n")
152        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
153
154    def test_reject(self):
155        self.assertRaises(TypeError, self.f.write, "Hello!")
156
157    def testRepr(self):
158        self.assertEqual(repr(self.f),
159                         "<%s.FileIO name=%r mode=%r closefd=True>" %
160                         (self.modulename, self.f.name, self.f.mode))
161        del self.f.name
162        self.assertEqual(repr(self.f),
163                         "<%s.FileIO fd=%r mode=%r closefd=True>" %
164                         (self.modulename, self.f.fileno(), self.f.mode))
165        self.f.close()
166        self.assertEqual(repr(self.f),
167                         "<%s.FileIO [closed]>" % (self.modulename,))
168
169    def testReprNoCloseFD(self):
170        fd = os.open(TESTFN, os.O_RDONLY)
171        try:
172            with self.FileIO(fd, 'r', closefd=False) as f:
173                self.assertEqual(repr(f),
174                                 "<%s.FileIO name=%r mode=%r closefd=False>" %
175                                 (self.modulename, f.name, f.mode))
176        finally:
177            os.close(fd)
178
179    def testRecursiveRepr(self):
180        # Issue #25455
181        with swap_attr(self.f, 'name', self.f):
182            with self.assertRaises(RuntimeError):
183                repr(self.f)  # Should not crash
184
185    def testErrors(self):
186        f = self.f
187        self.assertFalse(f.isatty())
188        self.assertFalse(f.closed)
189        #self.assertEqual(f.name, TESTFN)
190        self.assertRaises(ValueError, f.read, 10) # Open for reading
191        f.close()
192        self.assertTrue(f.closed)
193        f = self.FileIO(TESTFN, 'r')
194        self.assertRaises(TypeError, f.readinto, "")
195        self.assertFalse(f.closed)
196        f.close()
197        self.assertTrue(f.closed)
198
199    def testMethods(self):
200        methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
201                   'read', 'readall', 'readline', 'readlines',
202                   'tell', 'truncate', 'flush']
203
204        self.f.close()
205        self.assertTrue(self.f.closed)
206
207        for methodname in methods:
208            method = getattr(self.f, methodname)
209            # should raise on closed file
210            self.assertRaises(ValueError, method)
211
212        self.assertRaises(TypeError, self.f.readinto)
213        self.assertRaises(ValueError, self.f.readinto, bytearray(1))
214        self.assertRaises(TypeError, self.f.seek)
215        self.assertRaises(ValueError, self.f.seek, 0)
216        self.assertRaises(TypeError, self.f.write)
217        self.assertRaises(ValueError, self.f.write, b'')
218        self.assertRaises(TypeError, self.f.writelines)
219        self.assertRaises(ValueError, self.f.writelines, b'')
220
221    def testOpendir(self):
222        # Issue 3703: opening a directory should fill the errno
223        # Windows always returns "[Errno 13]: Permission denied
224        # Unix uses fstat and returns "[Errno 21]: Is a directory"
225        try:
226            self.FileIO('.', 'r')
227        except OSError as e:
228            self.assertNotEqual(e.errno, 0)
229            self.assertEqual(e.filename, ".")
230        else:
231            self.fail("Should have raised OSError")
232
233    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
234    def testOpenDirFD(self):
235        fd = os.open('.', os.O_RDONLY)
236        with self.assertRaises(OSError) as cm:
237            self.FileIO(fd, 'r')
238        os.close(fd)
239        self.assertEqual(cm.exception.errno, errno.EISDIR)
240
241    #A set of functions testing that we get expected behaviour if someone has
242    #manually closed the internal file descriptor.  First, a decorator:
243    def ClosedFD(func):
244        @wraps(func)
245        def wrapper(self):
246            #forcibly close the fd before invoking the problem function
247            f = self.f
248            os.close(f.fileno())
249            try:
250                func(self, f)
251            finally:
252                try:
253                    self.f.close()
254                except OSError:
255                    pass
256        return wrapper
257
258    def ClosedFDRaises(func):
259        @wraps(func)
260        def wrapper(self):
261            #forcibly close the fd before invoking the problem function
262            f = self.f
263            os.close(f.fileno())
264            try:
265                func(self, f)
266            except OSError as e:
267                self.assertEqual(e.errno, errno.EBADF)
268            else:
269                self.fail("Should have raised OSError")
270            finally:
271                try:
272                    self.f.close()
273                except OSError:
274                    pass
275        return wrapper
276
277    @ClosedFDRaises
278    def testErrnoOnClose(self, f):
279        f.close()
280
281    @ClosedFDRaises
282    def testErrnoOnClosedWrite(self, f):
283        f.write(b'a')
284
285    @ClosedFDRaises
286    def testErrnoOnClosedSeek(self, f):
287        f.seek(0)
288
289    @ClosedFDRaises
290    def testErrnoOnClosedTell(self, f):
291        f.tell()
292
293    @ClosedFDRaises
294    def testErrnoOnClosedTruncate(self, f):
295        f.truncate(0)
296
297    @ClosedFD
298    def testErrnoOnClosedSeekable(self, f):
299        f.seekable()
300
301    @ClosedFD
302    def testErrnoOnClosedReadable(self, f):
303        f.readable()
304
305    @ClosedFD
306    def testErrnoOnClosedWritable(self, f):
307        f.writable()
308
309    @ClosedFD
310    def testErrnoOnClosedFileno(self, f):
311        f.fileno()
312
313    @ClosedFD
314    def testErrnoOnClosedIsatty(self, f):
315        self.assertEqual(f.isatty(), False)
316
317    def ReopenForRead(self):
318        try:
319            self.f.close()
320        except OSError:
321            pass
322        self.f = self.FileIO(TESTFN, 'r')
323        os.close(self.f.fileno())
324        return self.f
325
326    @ClosedFDRaises
327    def testErrnoOnClosedRead(self, f):
328        f = self.ReopenForRead()
329        f.read(1)
330
331    @ClosedFDRaises
332    def testErrnoOnClosedReadall(self, f):
333        f = self.ReopenForRead()
334        f.readall()
335
336    @ClosedFDRaises
337    def testErrnoOnClosedReadinto(self, f):
338        f = self.ReopenForRead()
339        a = array('b', b'x'*10)
340        f.readinto(a)
341
342class CAutoFileTests(AutoFileTests, unittest.TestCase):
343    FileIO = _io.FileIO
344    modulename = '_io'
345
346class PyAutoFileTests(AutoFileTests, unittest.TestCase):
347    FileIO = _pyio.FileIO
348    modulename = '_pyio'
349
350
351class OtherFileTests:
352
353    def testAbles(self):
354        try:
355            f = self.FileIO(TESTFN, "w")
356            self.assertEqual(f.readable(), False)
357            self.assertEqual(f.writable(), True)
358            self.assertEqual(f.seekable(), True)
359            f.close()
360
361            f = self.FileIO(TESTFN, "r")
362            self.assertEqual(f.readable(), True)
363            self.assertEqual(f.writable(), False)
364            self.assertEqual(f.seekable(), True)
365            f.close()
366
367            f = self.FileIO(TESTFN, "a+")
368            self.assertEqual(f.readable(), True)
369            self.assertEqual(f.writable(), True)
370            self.assertEqual(f.seekable(), True)
371            self.assertEqual(f.isatty(), False)
372            f.close()
373
374            if sys.platform != "win32":
375                try:
376                    f = self.FileIO("/dev/tty", "a")
377                except OSError:
378                    # When run in a cron job there just aren't any
379                    # ttys, so skip the test.  This also handles other
380                    # OS'es that don't support /dev/tty.
381                    pass
382                else:
383                    self.assertEqual(f.readable(), False)
384                    self.assertEqual(f.writable(), True)
385                    if sys.platform != "darwin" and \
386                       'bsd' not in sys.platform and \
387                       not sys.platform.startswith(('sunos', 'aix')):
388                        # Somehow /dev/tty appears seekable on some BSDs
389                        self.assertEqual(f.seekable(), False)
390                    self.assertEqual(f.isatty(), True)
391                    f.close()
392        finally:
393            os.unlink(TESTFN)
394
395    def testInvalidModeStrings(self):
396        # check invalid mode strings
397        for mode in ("", "aU", "wU+", "rw", "rt"):
398            try:
399                f = self.FileIO(TESTFN, mode)
400            except ValueError:
401                pass
402            else:
403                f.close()
404                self.fail('%r is an invalid file mode' % mode)
405
406    def testModeStrings(self):
407        # test that the mode attribute is correct for various mode strings
408        # given as init args
409        try:
410            for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
411                          ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
412                          ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
413                          ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
414                # read modes are last so that TESTFN will exist first
415                with self.FileIO(TESTFN, modes[0]) as f:
416                    self.assertEqual(f.mode, modes[1])
417        finally:
418            if os.path.exists(TESTFN):
419                os.unlink(TESTFN)
420
421    def testUnicodeOpen(self):
422        # verify repr works for unicode too
423        f = self.FileIO(str(TESTFN), "w")
424        f.close()
425        os.unlink(TESTFN)
426
427    def testBytesOpen(self):
428        # Opening a bytes filename
429        try:
430            fn = TESTFN.encode("ascii")
431        except UnicodeEncodeError:
432            self.skipTest('could not encode %r to ascii' % TESTFN)
433        f = self.FileIO(fn, "w")
434        try:
435            f.write(b"abc")
436            f.close()
437            with open(TESTFN, "rb") as f:
438                self.assertEqual(f.read(), b"abc")
439        finally:
440            os.unlink(TESTFN)
441
442    @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
443                     "test only works for utf-8 filesystems")
444    def testUtf8BytesOpen(self):
445        # Opening a UTF-8 bytes filename
446        try:
447            fn = TESTFN_UNICODE.encode("utf-8")
448        except UnicodeEncodeError:
449            self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
450        f = self.FileIO(fn, "w")
451        try:
452            f.write(b"abc")
453            f.close()
454            with open(TESTFN_UNICODE, "rb") as f:
455                self.assertEqual(f.read(), b"abc")
456        finally:
457            os.unlink(TESTFN_UNICODE)
458
459    def testConstructorHandlesNULChars(self):
460        fn_with_NUL = 'foo\0bar'
461        self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
462        self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
463
464    def testInvalidFd(self):
465        self.assertRaises(ValueError, self.FileIO, -10)
466        self.assertRaises(OSError, self.FileIO, make_bad_fd())
467        if sys.platform == 'win32':
468            import msvcrt
469            self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
470
471    def testBadModeArgument(self):
472        # verify that we get a sensible error message for bad mode argument
473        bad_mode = "qwerty"
474        try:
475            f = self.FileIO(TESTFN, bad_mode)
476        except ValueError as msg:
477            if msg.args[0] != 0:
478                s = str(msg)
479                if TESTFN in s or bad_mode not in s:
480                    self.fail("bad error message for invalid mode: %s" % s)
481            # if msg.args[0] == 0, we're probably on Windows where there may be
482            # no obvious way to discover why open() failed.
483        else:
484            f.close()
485            self.fail("no error for invalid mode: %s" % bad_mode)
486
487    def testTruncate(self):
488        f = self.FileIO(TESTFN, 'w')
489        f.write(bytes(bytearray(range(10))))
490        self.assertEqual(f.tell(), 10)
491        f.truncate(5)
492        self.assertEqual(f.tell(), 10)
493        self.assertEqual(f.seek(0, io.SEEK_END), 5)
494        f.truncate(15)
495        self.assertEqual(f.tell(), 5)
496        self.assertEqual(f.seek(0, io.SEEK_END), 15)
497        f.close()
498
499    def testTruncateOnWindows(self):
500        def bug801631():
501            # SF bug <http://www.python.org/sf/801631>
502            # "file.truncate fault on windows"
503            f = self.FileIO(TESTFN, 'w')
504            f.write(bytes(range(11)))
505            f.close()
506
507            f = self.FileIO(TESTFN,'r+')
508            data = f.read(5)
509            if data != bytes(range(5)):
510                self.fail("Read on file opened for update failed %r" % data)
511            if f.tell() != 5:
512                self.fail("File pos after read wrong %d" % f.tell())
513
514            f.truncate()
515            if f.tell() != 5:
516                self.fail("File pos after ftruncate wrong %d" % f.tell())
517
518            f.close()
519            size = os.path.getsize(TESTFN)
520            if size != 5:
521                self.fail("File size after ftruncate wrong %d" % size)
522
523        try:
524            bug801631()
525        finally:
526            os.unlink(TESTFN)
527
528    def testAppend(self):
529        try:
530            f = open(TESTFN, 'wb')
531            f.write(b'spam')
532            f.close()
533            f = open(TESTFN, 'ab')
534            f.write(b'eggs')
535            f.close()
536            f = open(TESTFN, 'rb')
537            d = f.read()
538            f.close()
539            self.assertEqual(d, b'spameggs')
540        finally:
541            try:
542                os.unlink(TESTFN)
543            except:
544                pass
545
546    def testInvalidInit(self):
547        self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
548
549    def testWarnings(self):
550        with check_warnings(quiet=True) as w:
551            self.assertEqual(w.warnings, [])
552            self.assertRaises(TypeError, self.FileIO, [])
553            self.assertEqual(w.warnings, [])
554            self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
555            self.assertEqual(w.warnings, [])
556
557    def testUnclosedFDOnException(self):
558        class MyException(Exception): pass
559        class MyFileIO(self.FileIO):
560            def __setattr__(self, name, value):
561                if name == "name":
562                    raise MyException("blocked setting name")
563                return super(MyFileIO, self).__setattr__(name, value)
564        fd = os.open(__file__, os.O_RDONLY)
565        self.assertRaises(MyException, MyFileIO, fd)
566        os.close(fd)  # should not raise OSError(EBADF)
567
568
569class COtherFileTests(OtherFileTests, unittest.TestCase):
570    FileIO = _io.FileIO
571    modulename = '_io'
572
573    @cpython_only
574    def testInvalidFd_overflow(self):
575        # Issue 15989
576        import _testcapi
577        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
578        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
579
580    def test_open_code(self):
581        # Check that the default behaviour of open_code matches
582        # open("rb")
583        with self.FileIO(__file__, "rb") as f:
584            expected = f.read()
585        with _io.open_code(__file__) as f:
586            actual = f.read()
587        self.assertEqual(expected, actual)
588
589
590class PyOtherFileTests(OtherFileTests, unittest.TestCase):
591    FileIO = _pyio.FileIO
592    modulename = '_pyio'
593
594    def test_open_code(self):
595        # Check that the default behaviour of open_code matches
596        # open("rb")
597        with self.FileIO(__file__, "rb") as f:
598            expected = f.read()
599        with check_warnings(quiet=True) as w:
600            # Always test _open_code_with_warning
601            with _pyio._open_code_with_warning(__file__) as f:
602                actual = f.read()
603            self.assertEqual(expected, actual)
604            self.assertNotEqual(w.warnings, [])
605
606
607def test_main():
608    # Historically, these tests have been sloppy about removing TESTFN.
609    # So get rid of it no matter what.
610    try:
611        run_unittest(CAutoFileTests, PyAutoFileTests,
612                     COtherFileTests, PyOtherFileTests)
613    finally:
614        if os.path.exists(TESTFN):
615            os.unlink(TESTFN)
616
617if __name__ == '__main__':
618    test_main()
619