1# Adapted from test_file.py by Daniel Stutzbach 2 3import sys 4import os 5import io 6import errno 7import unittest 8from array import array 9from weakref import proxy 10from functools import wraps 11 12from test.support import cpython_only, swap_attr, gc_collect 13from test.support.os_helper import (TESTFN, TESTFN_UNICODE, make_bad_fd) 14from test.support.warnings_helper import check_warnings 15from collections import UserList 16 17import _io # C implementation of io 18import _pyio # Python implementation of io 19 20 21class AutoFileTests: 22 # file tests for which a test file is automatically set up 23 24 def setUp(self): 25 self.f = self.FileIO(TESTFN, 'w') 26 27 def tearDown(self): 28 if self.f: 29 self.f.close() 30 os.remove(TESTFN) 31 32 def testWeakRefs(self): 33 # verify weak references 34 p = proxy(self.f) 35 p.write(bytes(range(10))) 36 self.assertEqual(self.f.tell(), p.tell()) 37 self.f.close() 38 self.f = None 39 gc_collect() # For PyPy or other GCs. 40 self.assertRaises(ReferenceError, getattr, p, 'tell') 41 42 def testSeekTell(self): 43 self.f.write(bytes(range(20))) 44 self.assertEqual(self.f.tell(), 20) 45 self.f.seek(0) 46 self.assertEqual(self.f.tell(), 0) 47 self.f.seek(10) 48 self.assertEqual(self.f.tell(), 10) 49 self.f.seek(5, 1) 50 self.assertEqual(self.f.tell(), 15) 51 self.f.seek(-5, 1) 52 self.assertEqual(self.f.tell(), 10) 53 self.f.seek(-5, 2) 54 self.assertEqual(self.f.tell(), 15) 55 56 def testAttributes(self): 57 # verify expected attributes exist 58 f = self.f 59 60 self.assertEqual(f.mode, "wb") 61 self.assertEqual(f.closed, False) 62 63 # verify the attributes are readonly 64 for attr in 'mode', 'closed': 65 self.assertRaises((AttributeError, TypeError), 66 setattr, f, attr, 'oops') 67 68 def testBlksize(self): 69 # test private _blksize attribute 70 blksize = io.DEFAULT_BUFFER_SIZE 71 # try to get preferred blksize from stat.st_blksize, if available 72 if hasattr(os, 'fstat'): 73 fst = os.fstat(self.f.fileno()) 74 blksize = getattr(fst, 'st_blksize', blksize) 75 self.assertEqual(self.f._blksize, blksize) 76 77 # verify readinto 78 def testReadintoByteArray(self): 79 self.f.write(bytes([1, 2, 0, 255])) 80 self.f.close() 81 82 ba = bytearray(b'abcdefgh') 83 with self.FileIO(TESTFN, 'r') as f: 84 n = f.readinto(ba) 85 self.assertEqual(ba, b'\x01\x02\x00\xffefgh') 86 self.assertEqual(n, 4) 87 88 def _testReadintoMemoryview(self): 89 self.f.write(bytes([1, 2, 0, 255])) 90 self.f.close() 91 92 m = memoryview(bytearray(b'abcdefgh')) 93 with self.FileIO(TESTFN, 'r') as f: 94 n = f.readinto(m) 95 self.assertEqual(m, b'\x01\x02\x00\xffefgh') 96 self.assertEqual(n, 4) 97 98 m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) 99 with self.FileIO(TESTFN, 'r') as f: 100 n = f.readinto(m) 101 self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') 102 self.assertEqual(n, 4) 103 104 def _testReadintoArray(self): 105 self.f.write(bytes([1, 2, 0, 255])) 106 self.f.close() 107 108 a = array('B', b'abcdefgh') 109 with self.FileIO(TESTFN, 'r') as f: 110 n = f.readinto(a) 111 self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) 112 self.assertEqual(n, 4) 113 114 a = array('b', b'abcdefgh') 115 with self.FileIO(TESTFN, 'r') as f: 116 n = f.readinto(a) 117 self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) 118 self.assertEqual(n, 4) 119 120 a = array('I', b'abcdefgh') 121 with self.FileIO(TESTFN, 'r') as f: 122 n = f.readinto(a) 123 self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) 124 self.assertEqual(n, 4) 125 126 def testWritelinesList(self): 127 l = [b'123', b'456'] 128 self.f.writelines(l) 129 self.f.close() 130 self.f = self.FileIO(TESTFN, 'rb') 131 buf = self.f.read() 132 self.assertEqual(buf, b'123456') 133 134 def testWritelinesUserList(self): 135 l = UserList([b'123', b'456']) 136 self.f.writelines(l) 137 self.f.close() 138 self.f = self.FileIO(TESTFN, 'rb') 139 buf = self.f.read() 140 self.assertEqual(buf, b'123456') 141 142 def testWritelinesError(self): 143 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 144 self.assertRaises(TypeError, self.f.writelines, None) 145 self.assertRaises(TypeError, self.f.writelines, "abc") 146 147 def test_none_args(self): 148 self.f.write(b"hi\nbye\nabc") 149 self.f.close() 150 self.f = self.FileIO(TESTFN, 'r') 151 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 152 self.f.seek(0) 153 self.assertEqual(self.f.readline(None), b"hi\n") 154 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 155 156 def test_reject(self): 157 self.assertRaises(TypeError, self.f.write, "Hello!") 158 159 def testRepr(self): 160 self.assertEqual(repr(self.f), 161 "<%s.FileIO name=%r mode=%r closefd=True>" % 162 (self.modulename, self.f.name, self.f.mode)) 163 del self.f.name 164 self.assertEqual(repr(self.f), 165 "<%s.FileIO fd=%r mode=%r closefd=True>" % 166 (self.modulename, self.f.fileno(), self.f.mode)) 167 self.f.close() 168 self.assertEqual(repr(self.f), 169 "<%s.FileIO [closed]>" % (self.modulename,)) 170 171 def testReprNoCloseFD(self): 172 fd = os.open(TESTFN, os.O_RDONLY) 173 try: 174 with self.FileIO(fd, 'r', closefd=False) as f: 175 self.assertEqual(repr(f), 176 "<%s.FileIO name=%r mode=%r closefd=False>" % 177 (self.modulename, f.name, f.mode)) 178 finally: 179 os.close(fd) 180 181 def testRecursiveRepr(self): 182 # Issue #25455 183 with swap_attr(self.f, 'name', self.f): 184 with self.assertRaises(RuntimeError): 185 repr(self.f) # Should not crash 186 187 def testErrors(self): 188 f = self.f 189 self.assertFalse(f.isatty()) 190 self.assertFalse(f.closed) 191 #self.assertEqual(f.name, TESTFN) 192 self.assertRaises(ValueError, f.read, 10) # Open for reading 193 f.close() 194 self.assertTrue(f.closed) 195 f = self.FileIO(TESTFN, 'r') 196 self.assertRaises(TypeError, f.readinto, "") 197 self.assertFalse(f.closed) 198 f.close() 199 self.assertTrue(f.closed) 200 201 def testMethods(self): 202 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 203 'read', 'readall', 'readline', 'readlines', 204 'tell', 'truncate', 'flush'] 205 206 self.f.close() 207 self.assertTrue(self.f.closed) 208 209 for methodname in methods: 210 method = getattr(self.f, methodname) 211 # should raise on closed file 212 self.assertRaises(ValueError, method) 213 214 self.assertRaises(TypeError, self.f.readinto) 215 self.assertRaises(ValueError, self.f.readinto, bytearray(1)) 216 self.assertRaises(TypeError, self.f.seek) 217 self.assertRaises(ValueError, self.f.seek, 0) 218 self.assertRaises(TypeError, self.f.write) 219 self.assertRaises(ValueError, self.f.write, b'') 220 self.assertRaises(TypeError, self.f.writelines) 221 self.assertRaises(ValueError, self.f.writelines, b'') 222 223 def testOpendir(self): 224 # Issue 3703: opening a directory should fill the errno 225 # Windows always returns "[Errno 13]: Permission denied 226 # Unix uses fstat and returns "[Errno 21]: Is a directory" 227 try: 228 self.FileIO('.', 'r') 229 except OSError as e: 230 self.assertNotEqual(e.errno, 0) 231 self.assertEqual(e.filename, ".") 232 else: 233 self.fail("Should have raised OSError") 234 235 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 236 def testOpenDirFD(self): 237 fd = os.open('.', os.O_RDONLY) 238 with self.assertRaises(OSError) as cm: 239 self.FileIO(fd, 'r') 240 os.close(fd) 241 self.assertEqual(cm.exception.errno, errno.EISDIR) 242 243 #A set of functions testing that we get expected behaviour if someone has 244 #manually closed the internal file descriptor. First, a decorator: 245 def ClosedFD(func): 246 @wraps(func) 247 def wrapper(self): 248 #forcibly close the fd before invoking the problem function 249 f = self.f 250 os.close(f.fileno()) 251 try: 252 func(self, f) 253 finally: 254 try: 255 self.f.close() 256 except OSError: 257 pass 258 return wrapper 259 260 def ClosedFDRaises(func): 261 @wraps(func) 262 def wrapper(self): 263 #forcibly close the fd before invoking the problem function 264 f = self.f 265 os.close(f.fileno()) 266 try: 267 func(self, f) 268 except OSError as e: 269 self.assertEqual(e.errno, errno.EBADF) 270 else: 271 self.fail("Should have raised OSError") 272 finally: 273 try: 274 self.f.close() 275 except OSError: 276 pass 277 return wrapper 278 279 @ClosedFDRaises 280 def testErrnoOnClose(self, f): 281 f.close() 282 283 @ClosedFDRaises 284 def testErrnoOnClosedWrite(self, f): 285 f.write(b'a') 286 287 @ClosedFDRaises 288 def testErrnoOnClosedSeek(self, f): 289 f.seek(0) 290 291 @ClosedFDRaises 292 def testErrnoOnClosedTell(self, f): 293 f.tell() 294 295 @ClosedFDRaises 296 def testErrnoOnClosedTruncate(self, f): 297 f.truncate(0) 298 299 @ClosedFD 300 def testErrnoOnClosedSeekable(self, f): 301 f.seekable() 302 303 @ClosedFD 304 def testErrnoOnClosedReadable(self, f): 305 f.readable() 306 307 @ClosedFD 308 def testErrnoOnClosedWritable(self, f): 309 f.writable() 310 311 @ClosedFD 312 def testErrnoOnClosedFileno(self, f): 313 f.fileno() 314 315 @ClosedFD 316 def testErrnoOnClosedIsatty(self, f): 317 self.assertEqual(f.isatty(), False) 318 319 def ReopenForRead(self): 320 try: 321 self.f.close() 322 except OSError: 323 pass 324 self.f = self.FileIO(TESTFN, 'r') 325 os.close(self.f.fileno()) 326 return self.f 327 328 @ClosedFDRaises 329 def testErrnoOnClosedRead(self, f): 330 f = self.ReopenForRead() 331 f.read(1) 332 333 @ClosedFDRaises 334 def testErrnoOnClosedReadall(self, f): 335 f = self.ReopenForRead() 336 f.readall() 337 338 @ClosedFDRaises 339 def testErrnoOnClosedReadinto(self, f): 340 f = self.ReopenForRead() 341 a = array('b', b'x'*10) 342 f.readinto(a) 343 344class CAutoFileTests(AutoFileTests, unittest.TestCase): 345 FileIO = _io.FileIO 346 modulename = '_io' 347 348class PyAutoFileTests(AutoFileTests, unittest.TestCase): 349 FileIO = _pyio.FileIO 350 modulename = '_pyio' 351 352 353class OtherFileTests: 354 355 def testAbles(self): 356 try: 357 f = self.FileIO(TESTFN, "w") 358 self.assertEqual(f.readable(), False) 359 self.assertEqual(f.writable(), True) 360 self.assertEqual(f.seekable(), True) 361 f.close() 362 363 f = self.FileIO(TESTFN, "r") 364 self.assertEqual(f.readable(), True) 365 self.assertEqual(f.writable(), False) 366 self.assertEqual(f.seekable(), True) 367 f.close() 368 369 f = self.FileIO(TESTFN, "a+") 370 self.assertEqual(f.readable(), True) 371 self.assertEqual(f.writable(), True) 372 self.assertEqual(f.seekable(), True) 373 self.assertEqual(f.isatty(), False) 374 f.close() 375 376 if sys.platform != "win32": 377 try: 378 f = self.FileIO("/dev/tty", "a") 379 except OSError: 380 # When run in a cron job there just aren't any 381 # ttys, so skip the test. This also handles other 382 # OS'es that don't support /dev/tty. 383 pass 384 else: 385 self.assertEqual(f.readable(), False) 386 self.assertEqual(f.writable(), True) 387 if sys.platform != "darwin" and \ 388 'bsd' not in sys.platform and \ 389 not sys.platform.startswith(('sunos', 'aix')): 390 # Somehow /dev/tty appears seekable on some BSDs 391 self.assertEqual(f.seekable(), False) 392 self.assertEqual(f.isatty(), True) 393 f.close() 394 finally: 395 os.unlink(TESTFN) 396 397 def testInvalidModeStrings(self): 398 # check invalid mode strings 399 for mode in ("", "aU", "wU+", "rw", "rt"): 400 try: 401 f = self.FileIO(TESTFN, mode) 402 except ValueError: 403 pass 404 else: 405 f.close() 406 self.fail('%r is an invalid file mode' % mode) 407 408 def testModeStrings(self): 409 # test that the mode attribute is correct for various mode strings 410 # given as init args 411 try: 412 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), 413 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), 414 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), 415 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: 416 # read modes are last so that TESTFN will exist first 417 with self.FileIO(TESTFN, modes[0]) as f: 418 self.assertEqual(f.mode, modes[1]) 419 finally: 420 if os.path.exists(TESTFN): 421 os.unlink(TESTFN) 422 423 def testUnicodeOpen(self): 424 # verify repr works for unicode too 425 f = self.FileIO(str(TESTFN), "w") 426 f.close() 427 os.unlink(TESTFN) 428 429 def testBytesOpen(self): 430 # Opening a bytes filename 431 try: 432 fn = TESTFN.encode("ascii") 433 except UnicodeEncodeError: 434 self.skipTest('could not encode %r to ascii' % TESTFN) 435 f = self.FileIO(fn, "w") 436 try: 437 f.write(b"abc") 438 f.close() 439 with open(TESTFN, "rb") as f: 440 self.assertEqual(f.read(), b"abc") 441 finally: 442 os.unlink(TESTFN) 443 444 @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8', 445 "test only works for utf-8 filesystems") 446 def testUtf8BytesOpen(self): 447 # Opening a UTF-8 bytes filename 448 try: 449 fn = TESTFN_UNICODE.encode("utf-8") 450 except UnicodeEncodeError: 451 self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE) 452 f = self.FileIO(fn, "w") 453 try: 454 f.write(b"abc") 455 f.close() 456 with open(TESTFN_UNICODE, "rb") as f: 457 self.assertEqual(f.read(), b"abc") 458 finally: 459 os.unlink(TESTFN_UNICODE) 460 461 def testConstructorHandlesNULChars(self): 462 fn_with_NUL = 'foo\0bar' 463 self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') 464 self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') 465 466 def testInvalidFd(self): 467 self.assertRaises(ValueError, self.FileIO, -10) 468 self.assertRaises(OSError, self.FileIO, make_bad_fd()) 469 if sys.platform == 'win32': 470 import msvcrt 471 self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) 472 473 def testBadModeArgument(self): 474 # verify that we get a sensible error message for bad mode argument 475 bad_mode = "qwerty" 476 try: 477 f = self.FileIO(TESTFN, bad_mode) 478 except ValueError as msg: 479 if msg.args[0] != 0: 480 s = str(msg) 481 if TESTFN in s or bad_mode not in s: 482 self.fail("bad error message for invalid mode: %s" % s) 483 # if msg.args[0] == 0, we're probably on Windows where there may be 484 # no obvious way to discover why open() failed. 485 else: 486 f.close() 487 self.fail("no error for invalid mode: %s" % bad_mode) 488 489 def testTruncate(self): 490 f = self.FileIO(TESTFN, 'w') 491 f.write(bytes(bytearray(range(10)))) 492 self.assertEqual(f.tell(), 10) 493 f.truncate(5) 494 self.assertEqual(f.tell(), 10) 495 self.assertEqual(f.seek(0, io.SEEK_END), 5) 496 f.truncate(15) 497 self.assertEqual(f.tell(), 5) 498 self.assertEqual(f.seek(0, io.SEEK_END), 15) 499 f.close() 500 501 def testTruncateOnWindows(self): 502 def bug801631(): 503 # SF bug <http://www.python.org/sf/801631> 504 # "file.truncate fault on windows" 505 f = self.FileIO(TESTFN, 'w') 506 f.write(bytes(range(11))) 507 f.close() 508 509 f = self.FileIO(TESTFN,'r+') 510 data = f.read(5) 511 if data != bytes(range(5)): 512 self.fail("Read on file opened for update failed %r" % data) 513 if f.tell() != 5: 514 self.fail("File pos after read wrong %d" % f.tell()) 515 516 f.truncate() 517 if f.tell() != 5: 518 self.fail("File pos after ftruncate wrong %d" % f.tell()) 519 520 f.close() 521 size = os.path.getsize(TESTFN) 522 if size != 5: 523 self.fail("File size after ftruncate wrong %d" % size) 524 525 try: 526 bug801631() 527 finally: 528 os.unlink(TESTFN) 529 530 def testAppend(self): 531 try: 532 f = open(TESTFN, 'wb') 533 f.write(b'spam') 534 f.close() 535 f = open(TESTFN, 'ab') 536 f.write(b'eggs') 537 f.close() 538 f = open(TESTFN, 'rb') 539 d = f.read() 540 f.close() 541 self.assertEqual(d, b'spameggs') 542 finally: 543 try: 544 os.unlink(TESTFN) 545 except: 546 pass 547 548 def testInvalidInit(self): 549 self.assertRaises(TypeError, self.FileIO, "1", 0, 0) 550 551 def testWarnings(self): 552 with check_warnings(quiet=True) as w: 553 self.assertEqual(w.warnings, []) 554 self.assertRaises(TypeError, self.FileIO, []) 555 self.assertEqual(w.warnings, []) 556 self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") 557 self.assertEqual(w.warnings, []) 558 559 def testUnclosedFDOnException(self): 560 class MyException(Exception): pass 561 class MyFileIO(self.FileIO): 562 def __setattr__(self, name, value): 563 if name == "name": 564 raise MyException("blocked setting name") 565 return super(MyFileIO, self).__setattr__(name, value) 566 fd = os.open(__file__, os.O_RDONLY) 567 self.assertRaises(MyException, MyFileIO, fd) 568 os.close(fd) # should not raise OSError(EBADF) 569 570 571class COtherFileTests(OtherFileTests, unittest.TestCase): 572 FileIO = _io.FileIO 573 modulename = '_io' 574 575 @cpython_only 576 def testInvalidFd_overflow(self): 577 # Issue 15989 578 import _testcapi 579 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) 580 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) 581 582 def test_open_code(self): 583 # Check that the default behaviour of open_code matches 584 # open("rb") 585 with self.FileIO(__file__, "rb") as f: 586 expected = f.read() 587 with _io.open_code(__file__) as f: 588 actual = f.read() 589 self.assertEqual(expected, actual) 590 591 592class PyOtherFileTests(OtherFileTests, unittest.TestCase): 593 FileIO = _pyio.FileIO 594 modulename = '_pyio' 595 596 def test_open_code(self): 597 # Check that the default behaviour of open_code matches 598 # open("rb") 599 with self.FileIO(__file__, "rb") as f: 600 expected = f.read() 601 with check_warnings(quiet=True) as w: 602 # Always test _open_code_with_warning 603 with _pyio._open_code_with_warning(__file__) as f: 604 actual = f.read() 605 self.assertEqual(expected, actual) 606 self.assertNotEqual(w.warnings, []) 607 608 609def tearDownModule(): 610 # Historically, these tests have been sloppy about removing TESTFN. 611 # So get rid of it no matter what. 612 if os.path.exists(TESTFN): 613 os.unlink(TESTFN) 614 615 616if __name__ == '__main__': 617 unittest.main() 618