• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Adapted from test_file.py by Daniel Stutzbach
2
3import sys
4import os
5import io
6import errno
7import unittest
8from array import array
9from weakref import proxy
10from functools import wraps
11
12from test.support import cpython_only, swap_attr, gc_collect
13from test.support.os_helper import (TESTFN, TESTFN_UNICODE, make_bad_fd)
14from test.support.warnings_helper import check_warnings
15from collections import UserList
16
17import _io  # C implementation of io
18import _pyio # Python implementation of io
19
20
21class AutoFileTests:
22    # file tests for which a test file is automatically set up
23
24    def setUp(self):
25        self.f = self.FileIO(TESTFN, 'w')
26
27    def tearDown(self):
28        if self.f:
29            self.f.close()
30        os.remove(TESTFN)
31
32    def testWeakRefs(self):
33        # verify weak references
34        p = proxy(self.f)
35        p.write(bytes(range(10)))
36        self.assertEqual(self.f.tell(), p.tell())
37        self.f.close()
38        self.f = None
39        gc_collect()  # For PyPy or other GCs.
40        self.assertRaises(ReferenceError, getattr, p, 'tell')
41
42    def testSeekTell(self):
43        self.f.write(bytes(range(20)))
44        self.assertEqual(self.f.tell(), 20)
45        self.f.seek(0)
46        self.assertEqual(self.f.tell(), 0)
47        self.f.seek(10)
48        self.assertEqual(self.f.tell(), 10)
49        self.f.seek(5, 1)
50        self.assertEqual(self.f.tell(), 15)
51        self.f.seek(-5, 1)
52        self.assertEqual(self.f.tell(), 10)
53        self.f.seek(-5, 2)
54        self.assertEqual(self.f.tell(), 15)
55
56    def testAttributes(self):
57        # verify expected attributes exist
58        f = self.f
59
60        self.assertEqual(f.mode, "wb")
61        self.assertEqual(f.closed, False)
62
63        # verify the attributes are readonly
64        for attr in 'mode', 'closed':
65            self.assertRaises((AttributeError, TypeError),
66                              setattr, f, attr, 'oops')
67
68    def testBlksize(self):
69        # test private _blksize attribute
70        blksize = io.DEFAULT_BUFFER_SIZE
71        # try to get preferred blksize from stat.st_blksize, if available
72        if hasattr(os, 'fstat'):
73            fst = os.fstat(self.f.fileno())
74            blksize = getattr(fst, 'st_blksize', blksize)
75        self.assertEqual(self.f._blksize, blksize)
76
77    # verify readinto
78    def testReadintoByteArray(self):
79        self.f.write(bytes([1, 2, 0, 255]))
80        self.f.close()
81
82        ba = bytearray(b'abcdefgh')
83        with self.FileIO(TESTFN, 'r') as f:
84            n = f.readinto(ba)
85        self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
86        self.assertEqual(n, 4)
87
88    def _testReadintoMemoryview(self):
89        self.f.write(bytes([1, 2, 0, 255]))
90        self.f.close()
91
92        m = memoryview(bytearray(b'abcdefgh'))
93        with self.FileIO(TESTFN, 'r') as f:
94            n = f.readinto(m)
95        self.assertEqual(m, b'\x01\x02\x00\xffefgh')
96        self.assertEqual(n, 4)
97
98        m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
99        with self.FileIO(TESTFN, 'r') as f:
100            n = f.readinto(m)
101        self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
102        self.assertEqual(n, 4)
103
104    def _testReadintoArray(self):
105        self.f.write(bytes([1, 2, 0, 255]))
106        self.f.close()
107
108        a = array('B', b'abcdefgh')
109        with self.FileIO(TESTFN, 'r') as f:
110            n = f.readinto(a)
111        self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
112        self.assertEqual(n, 4)
113
114        a = array('b', b'abcdefgh')
115        with self.FileIO(TESTFN, 'r') as f:
116            n = f.readinto(a)
117        self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
118        self.assertEqual(n, 4)
119
120        a = array('I', b'abcdefgh')
121        with self.FileIO(TESTFN, 'r') as f:
122            n = f.readinto(a)
123        self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
124        self.assertEqual(n, 4)
125
126    def testWritelinesList(self):
127        l = [b'123', b'456']
128        self.f.writelines(l)
129        self.f.close()
130        self.f = self.FileIO(TESTFN, 'rb')
131        buf = self.f.read()
132        self.assertEqual(buf, b'123456')
133
134    def testWritelinesUserList(self):
135        l = UserList([b'123', b'456'])
136        self.f.writelines(l)
137        self.f.close()
138        self.f = self.FileIO(TESTFN, 'rb')
139        buf = self.f.read()
140        self.assertEqual(buf, b'123456')
141
142    def testWritelinesError(self):
143        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
144        self.assertRaises(TypeError, self.f.writelines, None)
145        self.assertRaises(TypeError, self.f.writelines, "abc")
146
147    def test_none_args(self):
148        self.f.write(b"hi\nbye\nabc")
149        self.f.close()
150        self.f = self.FileIO(TESTFN, 'r')
151        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
152        self.f.seek(0)
153        self.assertEqual(self.f.readline(None), b"hi\n")
154        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
155
156    def test_reject(self):
157        self.assertRaises(TypeError, self.f.write, "Hello!")
158
159    def testRepr(self):
160        self.assertEqual(repr(self.f),
161                         "<%s.FileIO name=%r mode=%r closefd=True>" %
162                         (self.modulename, self.f.name, self.f.mode))
163        del self.f.name
164        self.assertEqual(repr(self.f),
165                         "<%s.FileIO fd=%r mode=%r closefd=True>" %
166                         (self.modulename, self.f.fileno(), self.f.mode))
167        self.f.close()
168        self.assertEqual(repr(self.f),
169                         "<%s.FileIO [closed]>" % (self.modulename,))
170
171    def testReprNoCloseFD(self):
172        fd = os.open(TESTFN, os.O_RDONLY)
173        try:
174            with self.FileIO(fd, 'r', closefd=False) as f:
175                self.assertEqual(repr(f),
176                                 "<%s.FileIO name=%r mode=%r closefd=False>" %
177                                 (self.modulename, f.name, f.mode))
178        finally:
179            os.close(fd)
180
181    def testRecursiveRepr(self):
182        # Issue #25455
183        with swap_attr(self.f, 'name', self.f):
184            with self.assertRaises(RuntimeError):
185                repr(self.f)  # Should not crash
186
187    def testErrors(self):
188        f = self.f
189        self.assertFalse(f.isatty())
190        self.assertFalse(f.closed)
191        #self.assertEqual(f.name, TESTFN)
192        self.assertRaises(ValueError, f.read, 10) # Open for reading
193        f.close()
194        self.assertTrue(f.closed)
195        f = self.FileIO(TESTFN, 'r')
196        self.assertRaises(TypeError, f.readinto, "")
197        self.assertFalse(f.closed)
198        f.close()
199        self.assertTrue(f.closed)
200
201    def testMethods(self):
202        methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
203                   'read', 'readall', 'readline', 'readlines',
204                   'tell', 'truncate', 'flush']
205
206        self.f.close()
207        self.assertTrue(self.f.closed)
208
209        for methodname in methods:
210            method = getattr(self.f, methodname)
211            # should raise on closed file
212            self.assertRaises(ValueError, method)
213
214        self.assertRaises(TypeError, self.f.readinto)
215        self.assertRaises(ValueError, self.f.readinto, bytearray(1))
216        self.assertRaises(TypeError, self.f.seek)
217        self.assertRaises(ValueError, self.f.seek, 0)
218        self.assertRaises(TypeError, self.f.write)
219        self.assertRaises(ValueError, self.f.write, b'')
220        self.assertRaises(TypeError, self.f.writelines)
221        self.assertRaises(ValueError, self.f.writelines, b'')
222
223    def testOpendir(self):
224        # Issue 3703: opening a directory should fill the errno
225        # Windows always returns "[Errno 13]: Permission denied
226        # Unix uses fstat and returns "[Errno 21]: Is a directory"
227        try:
228            self.FileIO('.', 'r')
229        except OSError as e:
230            self.assertNotEqual(e.errno, 0)
231            self.assertEqual(e.filename, ".")
232        else:
233            self.fail("Should have raised OSError")
234
235    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
236    def testOpenDirFD(self):
237        fd = os.open('.', os.O_RDONLY)
238        with self.assertRaises(OSError) as cm:
239            self.FileIO(fd, 'r')
240        os.close(fd)
241        self.assertEqual(cm.exception.errno, errno.EISDIR)
242
243    #A set of functions testing that we get expected behaviour if someone has
244    #manually closed the internal file descriptor.  First, a decorator:
245    def ClosedFD(func):
246        @wraps(func)
247        def wrapper(self):
248            #forcibly close the fd before invoking the problem function
249            f = self.f
250            os.close(f.fileno())
251            try:
252                func(self, f)
253            finally:
254                try:
255                    self.f.close()
256                except OSError:
257                    pass
258        return wrapper
259
260    def ClosedFDRaises(func):
261        @wraps(func)
262        def wrapper(self):
263            #forcibly close the fd before invoking the problem function
264            f = self.f
265            os.close(f.fileno())
266            try:
267                func(self, f)
268            except OSError as e:
269                self.assertEqual(e.errno, errno.EBADF)
270            else:
271                self.fail("Should have raised OSError")
272            finally:
273                try:
274                    self.f.close()
275                except OSError:
276                    pass
277        return wrapper
278
279    @ClosedFDRaises
280    def testErrnoOnClose(self, f):
281        f.close()
282
283    @ClosedFDRaises
284    def testErrnoOnClosedWrite(self, f):
285        f.write(b'a')
286
287    @ClosedFDRaises
288    def testErrnoOnClosedSeek(self, f):
289        f.seek(0)
290
291    @ClosedFDRaises
292    def testErrnoOnClosedTell(self, f):
293        f.tell()
294
295    @ClosedFDRaises
296    def testErrnoOnClosedTruncate(self, f):
297        f.truncate(0)
298
299    @ClosedFD
300    def testErrnoOnClosedSeekable(self, f):
301        f.seekable()
302
303    @ClosedFD
304    def testErrnoOnClosedReadable(self, f):
305        f.readable()
306
307    @ClosedFD
308    def testErrnoOnClosedWritable(self, f):
309        f.writable()
310
311    @ClosedFD
312    def testErrnoOnClosedFileno(self, f):
313        f.fileno()
314
315    @ClosedFD
316    def testErrnoOnClosedIsatty(self, f):
317        self.assertEqual(f.isatty(), False)
318
319    def ReopenForRead(self):
320        try:
321            self.f.close()
322        except OSError:
323            pass
324        self.f = self.FileIO(TESTFN, 'r')
325        os.close(self.f.fileno())
326        return self.f
327
328    @ClosedFDRaises
329    def testErrnoOnClosedRead(self, f):
330        f = self.ReopenForRead()
331        f.read(1)
332
333    @ClosedFDRaises
334    def testErrnoOnClosedReadall(self, f):
335        f = self.ReopenForRead()
336        f.readall()
337
338    @ClosedFDRaises
339    def testErrnoOnClosedReadinto(self, f):
340        f = self.ReopenForRead()
341        a = array('b', b'x'*10)
342        f.readinto(a)
343
344class CAutoFileTests(AutoFileTests, unittest.TestCase):
345    FileIO = _io.FileIO
346    modulename = '_io'
347
348class PyAutoFileTests(AutoFileTests, unittest.TestCase):
349    FileIO = _pyio.FileIO
350    modulename = '_pyio'
351
352
353class OtherFileTests:
354
355    def testAbles(self):
356        try:
357            f = self.FileIO(TESTFN, "w")
358            self.assertEqual(f.readable(), False)
359            self.assertEqual(f.writable(), True)
360            self.assertEqual(f.seekable(), True)
361            f.close()
362
363            f = self.FileIO(TESTFN, "r")
364            self.assertEqual(f.readable(), True)
365            self.assertEqual(f.writable(), False)
366            self.assertEqual(f.seekable(), True)
367            f.close()
368
369            f = self.FileIO(TESTFN, "a+")
370            self.assertEqual(f.readable(), True)
371            self.assertEqual(f.writable(), True)
372            self.assertEqual(f.seekable(), True)
373            self.assertEqual(f.isatty(), False)
374            f.close()
375
376            if sys.platform != "win32":
377                try:
378                    f = self.FileIO("/dev/tty", "a")
379                except OSError:
380                    # When run in a cron job there just aren't any
381                    # ttys, so skip the test.  This also handles other
382                    # OS'es that don't support /dev/tty.
383                    pass
384                else:
385                    self.assertEqual(f.readable(), False)
386                    self.assertEqual(f.writable(), True)
387                    if sys.platform != "darwin" and \
388                       'bsd' not in sys.platform and \
389                       not sys.platform.startswith(('sunos', 'aix')):
390                        # Somehow /dev/tty appears seekable on some BSDs
391                        self.assertEqual(f.seekable(), False)
392                    self.assertEqual(f.isatty(), True)
393                    f.close()
394        finally:
395            os.unlink(TESTFN)
396
397    def testInvalidModeStrings(self):
398        # check invalid mode strings
399        for mode in ("", "aU", "wU+", "rw", "rt"):
400            try:
401                f = self.FileIO(TESTFN, mode)
402            except ValueError:
403                pass
404            else:
405                f.close()
406                self.fail('%r is an invalid file mode' % mode)
407
408    def testModeStrings(self):
409        # test that the mode attribute is correct for various mode strings
410        # given as init args
411        try:
412            for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
413                          ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
414                          ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
415                          ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
416                # read modes are last so that TESTFN will exist first
417                with self.FileIO(TESTFN, modes[0]) as f:
418                    self.assertEqual(f.mode, modes[1])
419        finally:
420            if os.path.exists(TESTFN):
421                os.unlink(TESTFN)
422
423    def testUnicodeOpen(self):
424        # verify repr works for unicode too
425        f = self.FileIO(str(TESTFN), "w")
426        f.close()
427        os.unlink(TESTFN)
428
429    def testBytesOpen(self):
430        # Opening a bytes filename
431        try:
432            fn = TESTFN.encode("ascii")
433        except UnicodeEncodeError:
434            self.skipTest('could not encode %r to ascii' % TESTFN)
435        f = self.FileIO(fn, "w")
436        try:
437            f.write(b"abc")
438            f.close()
439            with open(TESTFN, "rb") as f:
440                self.assertEqual(f.read(), b"abc")
441        finally:
442            os.unlink(TESTFN)
443
444    @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
445                     "test only works for utf-8 filesystems")
446    def testUtf8BytesOpen(self):
447        # Opening a UTF-8 bytes filename
448        try:
449            fn = TESTFN_UNICODE.encode("utf-8")
450        except UnicodeEncodeError:
451            self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
452        f = self.FileIO(fn, "w")
453        try:
454            f.write(b"abc")
455            f.close()
456            with open(TESTFN_UNICODE, "rb") as f:
457                self.assertEqual(f.read(), b"abc")
458        finally:
459            os.unlink(TESTFN_UNICODE)
460
461    def testConstructorHandlesNULChars(self):
462        fn_with_NUL = 'foo\0bar'
463        self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
464        self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
465
466    def testInvalidFd(self):
467        self.assertRaises(ValueError, self.FileIO, -10)
468        self.assertRaises(OSError, self.FileIO, make_bad_fd())
469        if sys.platform == 'win32':
470            import msvcrt
471            self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
472
473    def testBadModeArgument(self):
474        # verify that we get a sensible error message for bad mode argument
475        bad_mode = "qwerty"
476        try:
477            f = self.FileIO(TESTFN, bad_mode)
478        except ValueError as msg:
479            if msg.args[0] != 0:
480                s = str(msg)
481                if TESTFN in s or bad_mode not in s:
482                    self.fail("bad error message for invalid mode: %s" % s)
483            # if msg.args[0] == 0, we're probably on Windows where there may be
484            # no obvious way to discover why open() failed.
485        else:
486            f.close()
487            self.fail("no error for invalid mode: %s" % bad_mode)
488
489    def testTruncate(self):
490        f = self.FileIO(TESTFN, 'w')
491        f.write(bytes(bytearray(range(10))))
492        self.assertEqual(f.tell(), 10)
493        f.truncate(5)
494        self.assertEqual(f.tell(), 10)
495        self.assertEqual(f.seek(0, io.SEEK_END), 5)
496        f.truncate(15)
497        self.assertEqual(f.tell(), 5)
498        self.assertEqual(f.seek(0, io.SEEK_END), 15)
499        f.close()
500
501    def testTruncateOnWindows(self):
502        def bug801631():
503            # SF bug <http://www.python.org/sf/801631>
504            # "file.truncate fault on windows"
505            f = self.FileIO(TESTFN, 'w')
506            f.write(bytes(range(11)))
507            f.close()
508
509            f = self.FileIO(TESTFN,'r+')
510            data = f.read(5)
511            if data != bytes(range(5)):
512                self.fail("Read on file opened for update failed %r" % data)
513            if f.tell() != 5:
514                self.fail("File pos after read wrong %d" % f.tell())
515
516            f.truncate()
517            if f.tell() != 5:
518                self.fail("File pos after ftruncate wrong %d" % f.tell())
519
520            f.close()
521            size = os.path.getsize(TESTFN)
522            if size != 5:
523                self.fail("File size after ftruncate wrong %d" % size)
524
525        try:
526            bug801631()
527        finally:
528            os.unlink(TESTFN)
529
530    def testAppend(self):
531        try:
532            f = open(TESTFN, 'wb')
533            f.write(b'spam')
534            f.close()
535            f = open(TESTFN, 'ab')
536            f.write(b'eggs')
537            f.close()
538            f = open(TESTFN, 'rb')
539            d = f.read()
540            f.close()
541            self.assertEqual(d, b'spameggs')
542        finally:
543            try:
544                os.unlink(TESTFN)
545            except:
546                pass
547
548    def testInvalidInit(self):
549        self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
550
551    def testWarnings(self):
552        with check_warnings(quiet=True) as w:
553            self.assertEqual(w.warnings, [])
554            self.assertRaises(TypeError, self.FileIO, [])
555            self.assertEqual(w.warnings, [])
556            self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
557            self.assertEqual(w.warnings, [])
558
559    def testUnclosedFDOnException(self):
560        class MyException(Exception): pass
561        class MyFileIO(self.FileIO):
562            def __setattr__(self, name, value):
563                if name == "name":
564                    raise MyException("blocked setting name")
565                return super(MyFileIO, self).__setattr__(name, value)
566        fd = os.open(__file__, os.O_RDONLY)
567        self.assertRaises(MyException, MyFileIO, fd)
568        os.close(fd)  # should not raise OSError(EBADF)
569
570
571class COtherFileTests(OtherFileTests, unittest.TestCase):
572    FileIO = _io.FileIO
573    modulename = '_io'
574
575    @cpython_only
576    def testInvalidFd_overflow(self):
577        # Issue 15989
578        import _testcapi
579        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
580        self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
581
582    def test_open_code(self):
583        # Check that the default behaviour of open_code matches
584        # open("rb")
585        with self.FileIO(__file__, "rb") as f:
586            expected = f.read()
587        with _io.open_code(__file__) as f:
588            actual = f.read()
589        self.assertEqual(expected, actual)
590
591
592class PyOtherFileTests(OtherFileTests, unittest.TestCase):
593    FileIO = _pyio.FileIO
594    modulename = '_pyio'
595
596    def test_open_code(self):
597        # Check that the default behaviour of open_code matches
598        # open("rb")
599        with self.FileIO(__file__, "rb") as f:
600            expected = f.read()
601        with check_warnings(quiet=True) as w:
602            # Always test _open_code_with_warning
603            with _pyio._open_code_with_warning(__file__) as f:
604                actual = f.read()
605            self.assertEqual(expected, actual)
606            self.assertNotEqual(w.warnings, [])
607
608
609def tearDownModule():
610    # Historically, these tests have been sloppy about removing TESTFN.
611    # So get rid of it no matter what.
612    if os.path.exists(TESTFN):
613        os.unlink(TESTFN)
614
615
616if __name__ == '__main__':
617    unittest.main()
618