• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import unittest
4from array import array
5from weakref import proxy
6
7import io
8import _pyio as pyio
9
10from test.support import gc_collect
11from test.support.os_helper import TESTFN
12from test.support import os_helper
13from test.support import warnings_helper
14from collections import UserList
15
16class AutoFileTests:
17    # file tests for which a test file is automatically set up
18
19    def setUp(self):
20        self.f = self.open(TESTFN, 'wb')
21
22    def tearDown(self):
23        if self.f:
24            self.f.close()
25        os_helper.unlink(TESTFN)
26
27    def testWeakRefs(self):
28        # verify weak references
29        p = proxy(self.f)
30        p.write(b'teststring')
31        self.assertEqual(self.f.tell(), p.tell())
32        self.f.close()
33        self.f = None
34        gc_collect()  # For PyPy or other GCs.
35        self.assertRaises(ReferenceError, getattr, p, 'tell')
36
37    def testAttributes(self):
38        # verify expected attributes exist
39        f = self.f
40        f.name     # merely shouldn't blow up
41        f.mode     # ditto
42        f.closed   # ditto
43
44    def testReadinto(self):
45        # verify readinto
46        self.f.write(b'12')
47        self.f.close()
48        a = array('b', b'x'*10)
49        self.f = self.open(TESTFN, 'rb')
50        n = self.f.readinto(a)
51        self.assertEqual(b'12', a.tobytes()[:n])
52
53    def testReadinto_text(self):
54        # verify readinto refuses text files
55        a = array('b', b'x'*10)
56        self.f.close()
57        self.f = self.open(TESTFN, encoding="utf-8")
58        if hasattr(self.f, "readinto"):
59            self.assertRaises(TypeError, self.f.readinto, a)
60
61    def testWritelinesUserList(self):
62        # verify writelines with instance sequence
63        l = UserList([b'1', b'2'])
64        self.f.writelines(l)
65        self.f.close()
66        self.f = self.open(TESTFN, 'rb')
67        buf = self.f.read()
68        self.assertEqual(buf, b'12')
69
70    def testWritelinesIntegers(self):
71        # verify writelines with integers
72        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
73
74    def testWritelinesIntegersUserList(self):
75        # verify writelines with integers in UserList
76        l = UserList([1,2,3])
77        self.assertRaises(TypeError, self.f.writelines, l)
78
79    def testWritelinesNonString(self):
80        # verify writelines with non-string object
81        class NonString:
82            pass
83
84        self.assertRaises(TypeError, self.f.writelines,
85                          [NonString(), NonString()])
86
87    def testErrors(self):
88        f = self.f
89        self.assertEqual(f.name, TESTFN)
90        self.assertFalse(f.isatty())
91        self.assertFalse(f.closed)
92
93        if hasattr(f, "readinto"):
94            self.assertRaises((OSError, TypeError), f.readinto, "")
95        f.close()
96        self.assertTrue(f.closed)
97
98    def testMethods(self):
99        methods = [('fileno', ()),
100                   ('flush', ()),
101                   ('isatty', ()),
102                   ('__next__', ()),
103                   ('read', ()),
104                   ('write', (b"",)),
105                   ('readline', ()),
106                   ('readlines', ()),
107                   ('seek', (0,)),
108                   ('tell', ()),
109                   ('write', (b"",)),
110                   ('writelines', ([],)),
111                   ('__iter__', ()),
112                   ]
113        methods.append(('truncate', ()))
114
115        # __exit__ should close the file
116        self.f.__exit__(None, None, None)
117        self.assertTrue(self.f.closed)
118
119        for methodname, args in methods:
120            method = getattr(self.f, methodname)
121            # should raise on closed file
122            self.assertRaises(ValueError, method, *args)
123
124        # file is closed, __exit__ shouldn't do anything
125        self.assertEqual(self.f.__exit__(None, None, None), None)
126        # it must also return None if an exception was given
127        try:
128            1/0
129        except:
130            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
131
132    def testReadWhenWriting(self):
133        self.assertRaises(OSError, self.f.read)
134
135class CAutoFileTests(AutoFileTests, unittest.TestCase):
136    open = io.open
137
138class PyAutoFileTests(AutoFileTests, unittest.TestCase):
139    open = staticmethod(pyio.open)
140
141
142class OtherFileTests:
143
144    def tearDown(self):
145        os_helper.unlink(TESTFN)
146
147    def testModeStrings(self):
148        # check invalid mode strings
149        self.open(TESTFN, 'wb').close()
150        for mode in ("", "aU", "wU+", "U+", "+U", "rU+"):
151            try:
152                f = self.open(TESTFN, mode)
153            except ValueError:
154                pass
155            else:
156                f.close()
157                self.fail('%r is an invalid file mode' % mode)
158
159    def testStdin(self):
160        if sys.platform == 'osf1V5':
161            # This causes the interpreter to exit on OSF1 v5.1.
162            self.skipTest(
163                ' sys.stdin.seek(-1) may crash the interpreter on OSF1.'
164                ' Test manually.')
165
166        if not sys.stdin.isatty():
167            # Issue 14853: stdin becomes seekable when redirected to a file
168            self.skipTest('stdin must be a TTY in this test')
169
170        with self.assertRaises((IOError, ValueError)):
171            sys.stdin.seek(-1)
172        with self.assertRaises((IOError, ValueError)):
173            sys.stdin.truncate()
174
175    def testBadModeArgument(self):
176        # verify that we get a sensible error message for bad mode argument
177        bad_mode = "qwerty"
178        try:
179            f = self.open(TESTFN, bad_mode)
180        except ValueError as msg:
181            if msg.args[0] != 0:
182                s = str(msg)
183                if TESTFN in s or bad_mode not in s:
184                    self.fail("bad error message for invalid mode: %s" % s)
185            # if msg.args[0] == 0, we're probably on Windows where there may be
186            # no obvious way to discover why open() failed.
187        else:
188            f.close()
189            self.fail("no error for invalid mode: %s" % bad_mode)
190
191    def _checkBufferSize(self, s):
192        try:
193            f = self.open(TESTFN, 'wb', s)
194            f.write(str(s).encode("ascii"))
195            f.close()
196            f.close()
197            f = self.open(TESTFN, 'rb', s)
198            d = int(f.read().decode("ascii"))
199            f.close()
200            f.close()
201        except OSError as msg:
202            self.fail('error setting buffer size %d: %s' % (s, str(msg)))
203        self.assertEqual(d, s)
204
205    def testSetBufferSize(self):
206        # make sure that explicitly setting the buffer size doesn't cause
207        # misbehaviour especially with repeated close() calls
208        for s in (-1, 0, 512):
209            with warnings_helper.check_no_warnings(self,
210                                           message='line buffering',
211                                           category=RuntimeWarning):
212                self._checkBufferSize(s)
213
214        # test that attempts to use line buffering in binary mode cause
215        # a warning
216        with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
217            self._checkBufferSize(1)
218
219    def testTruncateOnWindows(self):
220        # SF bug <http://www.python.org/sf/801631>
221        # "file.truncate fault on windows"
222
223        f = self.open(TESTFN, 'wb')
224
225        try:
226            f.write(b'12345678901')   # 11 bytes
227            f.close()
228
229            f = self.open(TESTFN,'rb+')
230            data = f.read(5)
231            if data != b'12345':
232                self.fail("Read on file opened for update failed %r" % data)
233            if f.tell() != 5:
234                self.fail("File pos after read wrong %d" % f.tell())
235
236            f.truncate()
237            if f.tell() != 5:
238                self.fail("File pos after ftruncate wrong %d" % f.tell())
239
240            f.close()
241            size = os.path.getsize(TESTFN)
242            if size != 5:
243                self.fail("File size after ftruncate wrong %d" % size)
244        finally:
245            f.close()
246
247    def testIteration(self):
248        # Test the complex interaction when mixing file-iteration and the
249        # various read* methods.
250        dataoffset = 16384
251        filler = b"ham\n"
252        assert not dataoffset % len(filler), \
253            "dataoffset must be multiple of len(filler)"
254        nchunks = dataoffset // len(filler)
255        testlines = [
256            b"spam, spam and eggs\n",
257            b"eggs, spam, ham and spam\n",
258            b"saussages, spam, spam and eggs\n",
259            b"spam, ham, spam and eggs\n",
260            b"spam, spam, spam, spam, spam, ham, spam\n",
261            b"wonderful spaaaaaam.\n"
262        ]
263        methods = [("readline", ()), ("read", ()), ("readlines", ()),
264                   ("readinto", (array("b", b" "*100),))]
265
266        # Prepare the testfile
267        bag = self.open(TESTFN, "wb")
268        bag.write(filler * nchunks)
269        bag.writelines(testlines)
270        bag.close()
271        # Test for appropriate errors mixing read* and iteration
272        for methodname, args in methods:
273            f = self.open(TESTFN, 'rb')
274            self.assertEqual(next(f), filler)
275            meth = getattr(f, methodname)
276            meth(*args)  # This simply shouldn't fail
277            f.close()
278
279        # Test to see if harmless (by accident) mixing of read* and
280        # iteration still works. This depends on the size of the internal
281        # iteration buffer (currently 8192,) but we can test it in a
282        # flexible manner.  Each line in the bag o' ham is 4 bytes
283        # ("h", "a", "m", "\n"), so 4096 lines of that should get us
284        # exactly on the buffer boundary for any power-of-2 buffersize
285        # between 4 and 16384 (inclusive).
286        f = self.open(TESTFN, 'rb')
287        for i in range(nchunks):
288            next(f)
289        testline = testlines.pop(0)
290        try:
291            line = f.readline()
292        except ValueError:
293            self.fail("readline() after next() with supposedly empty "
294                        "iteration-buffer failed anyway")
295        if line != testline:
296            self.fail("readline() after next() with empty buffer "
297                        "failed. Got %r, expected %r" % (line, testline))
298        testline = testlines.pop(0)
299        buf = array("b", b"\x00" * len(testline))
300        try:
301            f.readinto(buf)
302        except ValueError:
303            self.fail("readinto() after next() with supposedly empty "
304                        "iteration-buffer failed anyway")
305        line = buf.tobytes()
306        if line != testline:
307            self.fail("readinto() after next() with empty buffer "
308                        "failed. Got %r, expected %r" % (line, testline))
309
310        testline = testlines.pop(0)
311        try:
312            line = f.read(len(testline))
313        except ValueError:
314            self.fail("read() after next() with supposedly empty "
315                        "iteration-buffer failed anyway")
316        if line != testline:
317            self.fail("read() after next() with empty buffer "
318                        "failed. Got %r, expected %r" % (line, testline))
319        try:
320            lines = f.readlines()
321        except ValueError:
322            self.fail("readlines() after next() with supposedly empty "
323                        "iteration-buffer failed anyway")
324        if lines != testlines:
325            self.fail("readlines() after next() with empty buffer "
326                        "failed. Got %r, expected %r" % (line, testline))
327        f.close()
328
329        # Reading after iteration hit EOF shouldn't hurt either
330        f = self.open(TESTFN, 'rb')
331        try:
332            for line in f:
333                pass
334            try:
335                f.readline()
336                f.readinto(buf)
337                f.read()
338                f.readlines()
339            except ValueError:
340                self.fail("read* failed after next() consumed file")
341        finally:
342            f.close()
343
344class COtherFileTests(OtherFileTests, unittest.TestCase):
345    open = io.open
346
347class PyOtherFileTests(OtherFileTests, unittest.TestCase):
348    open = staticmethod(pyio.open)
349
350
351if __name__ == '__main__':
352    unittest.main()
353