1# Adapted from test_file.py by Daniel Stutzbach 2 3import sys 4import os 5import io 6import errno 7import unittest 8from array import array 9from weakref import proxy 10from functools import wraps 11 12from test.support import (TESTFN, TESTFN_UNICODE, check_warnings, run_unittest, 13 make_bad_fd, cpython_only, swap_attr) 14from collections import UserList 15 16import _io # C implementation of io 17import _pyio # Python implementation of io 18 19 20class AutoFileTests: 21 # file tests for which a test file is automatically set up 22 23 def setUp(self): 24 self.f = self.FileIO(TESTFN, 'w') 25 26 def tearDown(self): 27 if self.f: 28 self.f.close() 29 os.remove(TESTFN) 30 31 def testWeakRefs(self): 32 # verify weak references 33 p = proxy(self.f) 34 p.write(bytes(range(10))) 35 self.assertEqual(self.f.tell(), p.tell()) 36 self.f.close() 37 self.f = None 38 self.assertRaises(ReferenceError, getattr, p, 'tell') 39 40 def testSeekTell(self): 41 self.f.write(bytes(range(20))) 42 self.assertEqual(self.f.tell(), 20) 43 self.f.seek(0) 44 self.assertEqual(self.f.tell(), 0) 45 self.f.seek(10) 46 self.assertEqual(self.f.tell(), 10) 47 self.f.seek(5, 1) 48 self.assertEqual(self.f.tell(), 15) 49 self.f.seek(-5, 1) 50 self.assertEqual(self.f.tell(), 10) 51 self.f.seek(-5, 2) 52 self.assertEqual(self.f.tell(), 15) 53 54 def testAttributes(self): 55 # verify expected attributes exist 56 f = self.f 57 58 self.assertEqual(f.mode, "wb") 59 self.assertEqual(f.closed, False) 60 61 # verify the attributes are readonly 62 for attr in 'mode', 'closed': 63 self.assertRaises((AttributeError, TypeError), 64 setattr, f, attr, 'oops') 65 66 def testBlksize(self): 67 # test private _blksize attribute 68 blksize = io.DEFAULT_BUFFER_SIZE 69 # try to get preferred blksize from stat.st_blksize, if available 70 if hasattr(os, 'fstat'): 71 fst = os.fstat(self.f.fileno()) 72 blksize = getattr(fst, 'st_blksize', blksize) 73 self.assertEqual(self.f._blksize, blksize) 74 75 # verify readinto 76 def testReadintoByteArray(self): 77 self.f.write(bytes([1, 2, 0, 255])) 78 self.f.close() 79 80 ba = bytearray(b'abcdefgh') 81 with self.FileIO(TESTFN, 'r') as f: 82 n = f.readinto(ba) 83 self.assertEqual(ba, b'\x01\x02\x00\xffefgh') 84 self.assertEqual(n, 4) 85 86 def _testReadintoMemoryview(self): 87 self.f.write(bytes([1, 2, 0, 255])) 88 self.f.close() 89 90 m = memoryview(bytearray(b'abcdefgh')) 91 with self.FileIO(TESTFN, 'r') as f: 92 n = f.readinto(m) 93 self.assertEqual(m, b'\x01\x02\x00\xffefgh') 94 self.assertEqual(n, 4) 95 96 m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) 97 with self.FileIO(TESTFN, 'r') as f: 98 n = f.readinto(m) 99 self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') 100 self.assertEqual(n, 4) 101 102 def _testReadintoArray(self): 103 self.f.write(bytes([1, 2, 0, 255])) 104 self.f.close() 105 106 a = array('B', b'abcdefgh') 107 with self.FileIO(TESTFN, 'r') as f: 108 n = f.readinto(a) 109 self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) 110 self.assertEqual(n, 4) 111 112 a = array('b', b'abcdefgh') 113 with self.FileIO(TESTFN, 'r') as f: 114 n = f.readinto(a) 115 self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) 116 self.assertEqual(n, 4) 117 118 a = array('I', b'abcdefgh') 119 with self.FileIO(TESTFN, 'r') as f: 120 n = f.readinto(a) 121 self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) 122 self.assertEqual(n, 4) 123 124 def testWritelinesList(self): 125 l = [b'123', b'456'] 126 self.f.writelines(l) 127 self.f.close() 128 self.f = self.FileIO(TESTFN, 'rb') 129 buf = self.f.read() 130 self.assertEqual(buf, b'123456') 131 132 def testWritelinesUserList(self): 133 l = UserList([b'123', b'456']) 134 self.f.writelines(l) 135 self.f.close() 136 self.f = self.FileIO(TESTFN, 'rb') 137 buf = self.f.read() 138 self.assertEqual(buf, b'123456') 139 140 def testWritelinesError(self): 141 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 142 self.assertRaises(TypeError, self.f.writelines, None) 143 self.assertRaises(TypeError, self.f.writelines, "abc") 144 145 def test_none_args(self): 146 self.f.write(b"hi\nbye\nabc") 147 self.f.close() 148 self.f = self.FileIO(TESTFN, 'r') 149 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 150 self.f.seek(0) 151 self.assertEqual(self.f.readline(None), b"hi\n") 152 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 153 154 def test_reject(self): 155 self.assertRaises(TypeError, self.f.write, "Hello!") 156 157 def testRepr(self): 158 self.assertEqual(repr(self.f), 159 "<%s.FileIO name=%r mode=%r closefd=True>" % 160 (self.modulename, self.f.name, self.f.mode)) 161 del self.f.name 162 self.assertEqual(repr(self.f), 163 "<%s.FileIO fd=%r mode=%r closefd=True>" % 164 (self.modulename, self.f.fileno(), self.f.mode)) 165 self.f.close() 166 self.assertEqual(repr(self.f), 167 "<%s.FileIO [closed]>" % (self.modulename,)) 168 169 def testReprNoCloseFD(self): 170 fd = os.open(TESTFN, os.O_RDONLY) 171 try: 172 with self.FileIO(fd, 'r', closefd=False) as f: 173 self.assertEqual(repr(f), 174 "<%s.FileIO name=%r mode=%r closefd=False>" % 175 (self.modulename, f.name, f.mode)) 176 finally: 177 os.close(fd) 178 179 def testRecursiveRepr(self): 180 # Issue #25455 181 with swap_attr(self.f, 'name', self.f): 182 with self.assertRaises(RuntimeError): 183 repr(self.f) # Should not crash 184 185 def testErrors(self): 186 f = self.f 187 self.assertFalse(f.isatty()) 188 self.assertFalse(f.closed) 189 #self.assertEqual(f.name, TESTFN) 190 self.assertRaises(ValueError, f.read, 10) # Open for reading 191 f.close() 192 self.assertTrue(f.closed) 193 f = self.FileIO(TESTFN, 'r') 194 self.assertRaises(TypeError, f.readinto, "") 195 self.assertFalse(f.closed) 196 f.close() 197 self.assertTrue(f.closed) 198 199 def testMethods(self): 200 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 201 'read', 'readall', 'readline', 'readlines', 202 'tell', 'truncate', 'flush'] 203 204 self.f.close() 205 self.assertTrue(self.f.closed) 206 207 for methodname in methods: 208 method = getattr(self.f, methodname) 209 # should raise on closed file 210 self.assertRaises(ValueError, method) 211 212 self.assertRaises(TypeError, self.f.readinto) 213 self.assertRaises(ValueError, self.f.readinto, bytearray(1)) 214 self.assertRaises(TypeError, self.f.seek) 215 self.assertRaises(ValueError, self.f.seek, 0) 216 self.assertRaises(TypeError, self.f.write) 217 self.assertRaises(ValueError, self.f.write, b'') 218 self.assertRaises(TypeError, self.f.writelines) 219 self.assertRaises(ValueError, self.f.writelines, b'') 220 221 def testOpendir(self): 222 # Issue 3703: opening a directory should fill the errno 223 # Windows always returns "[Errno 13]: Permission denied 224 # Unix uses fstat and returns "[Errno 21]: Is a directory" 225 try: 226 self.FileIO('.', 'r') 227 except OSError as e: 228 self.assertNotEqual(e.errno, 0) 229 self.assertEqual(e.filename, ".") 230 else: 231 self.fail("Should have raised OSError") 232 233 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 234 def testOpenDirFD(self): 235 fd = os.open('.', os.O_RDONLY) 236 with self.assertRaises(OSError) as cm: 237 self.FileIO(fd, 'r') 238 os.close(fd) 239 self.assertEqual(cm.exception.errno, errno.EISDIR) 240 241 #A set of functions testing that we get expected behaviour if someone has 242 #manually closed the internal file descriptor. First, a decorator: 243 def ClosedFD(func): 244 @wraps(func) 245 def wrapper(self): 246 #forcibly close the fd before invoking the problem function 247 f = self.f 248 os.close(f.fileno()) 249 try: 250 func(self, f) 251 finally: 252 try: 253 self.f.close() 254 except OSError: 255 pass 256 return wrapper 257 258 def ClosedFDRaises(func): 259 @wraps(func) 260 def wrapper(self): 261 #forcibly close the fd before invoking the problem function 262 f = self.f 263 os.close(f.fileno()) 264 try: 265 func(self, f) 266 except OSError as e: 267 self.assertEqual(e.errno, errno.EBADF) 268 else: 269 self.fail("Should have raised OSError") 270 finally: 271 try: 272 self.f.close() 273 except OSError: 274 pass 275 return wrapper 276 277 @ClosedFDRaises 278 def testErrnoOnClose(self, f): 279 f.close() 280 281 @ClosedFDRaises 282 def testErrnoOnClosedWrite(self, f): 283 f.write(b'a') 284 285 @ClosedFDRaises 286 def testErrnoOnClosedSeek(self, f): 287 f.seek(0) 288 289 @ClosedFDRaises 290 def testErrnoOnClosedTell(self, f): 291 f.tell() 292 293 @ClosedFDRaises 294 def testErrnoOnClosedTruncate(self, f): 295 f.truncate(0) 296 297 @ClosedFD 298 def testErrnoOnClosedSeekable(self, f): 299 f.seekable() 300 301 @ClosedFD 302 def testErrnoOnClosedReadable(self, f): 303 f.readable() 304 305 @ClosedFD 306 def testErrnoOnClosedWritable(self, f): 307 f.writable() 308 309 @ClosedFD 310 def testErrnoOnClosedFileno(self, f): 311 f.fileno() 312 313 @ClosedFD 314 def testErrnoOnClosedIsatty(self, f): 315 self.assertEqual(f.isatty(), False) 316 317 def ReopenForRead(self): 318 try: 319 self.f.close() 320 except OSError: 321 pass 322 self.f = self.FileIO(TESTFN, 'r') 323 os.close(self.f.fileno()) 324 return self.f 325 326 @ClosedFDRaises 327 def testErrnoOnClosedRead(self, f): 328 f = self.ReopenForRead() 329 f.read(1) 330 331 @ClosedFDRaises 332 def testErrnoOnClosedReadall(self, f): 333 f = self.ReopenForRead() 334 f.readall() 335 336 @ClosedFDRaises 337 def testErrnoOnClosedReadinto(self, f): 338 f = self.ReopenForRead() 339 a = array('b', b'x'*10) 340 f.readinto(a) 341 342class CAutoFileTests(AutoFileTests, unittest.TestCase): 343 FileIO = _io.FileIO 344 modulename = '_io' 345 346class PyAutoFileTests(AutoFileTests, unittest.TestCase): 347 FileIO = _pyio.FileIO 348 modulename = '_pyio' 349 350 351class OtherFileTests: 352 353 def testAbles(self): 354 try: 355 f = self.FileIO(TESTFN, "w") 356 self.assertEqual(f.readable(), False) 357 self.assertEqual(f.writable(), True) 358 self.assertEqual(f.seekable(), True) 359 f.close() 360 361 f = self.FileIO(TESTFN, "r") 362 self.assertEqual(f.readable(), True) 363 self.assertEqual(f.writable(), False) 364 self.assertEqual(f.seekable(), True) 365 f.close() 366 367 f = self.FileIO(TESTFN, "a+") 368 self.assertEqual(f.readable(), True) 369 self.assertEqual(f.writable(), True) 370 self.assertEqual(f.seekable(), True) 371 self.assertEqual(f.isatty(), False) 372 f.close() 373 374 if sys.platform != "win32": 375 try: 376 f = self.FileIO("/dev/tty", "a") 377 except OSError: 378 # When run in a cron job there just aren't any 379 # ttys, so skip the test. This also handles other 380 # OS'es that don't support /dev/tty. 381 pass 382 else: 383 self.assertEqual(f.readable(), False) 384 self.assertEqual(f.writable(), True) 385 if sys.platform != "darwin" and \ 386 'bsd' not in sys.platform and \ 387 not sys.platform.startswith(('sunos', 'aix')): 388 # Somehow /dev/tty appears seekable on some BSDs 389 self.assertEqual(f.seekable(), False) 390 self.assertEqual(f.isatty(), True) 391 f.close() 392 finally: 393 os.unlink(TESTFN) 394 395 def testInvalidModeStrings(self): 396 # check invalid mode strings 397 for mode in ("", "aU", "wU+", "rw", "rt"): 398 try: 399 f = self.FileIO(TESTFN, mode) 400 except ValueError: 401 pass 402 else: 403 f.close() 404 self.fail('%r is an invalid file mode' % mode) 405 406 def testModeStrings(self): 407 # test that the mode attribute is correct for various mode strings 408 # given as init args 409 try: 410 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), 411 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), 412 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), 413 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: 414 # read modes are last so that TESTFN will exist first 415 with self.FileIO(TESTFN, modes[0]) as f: 416 self.assertEqual(f.mode, modes[1]) 417 finally: 418 if os.path.exists(TESTFN): 419 os.unlink(TESTFN) 420 421 def testUnicodeOpen(self): 422 # verify repr works for unicode too 423 f = self.FileIO(str(TESTFN), "w") 424 f.close() 425 os.unlink(TESTFN) 426 427 def testBytesOpen(self): 428 # Opening a bytes filename 429 try: 430 fn = TESTFN.encode("ascii") 431 except UnicodeEncodeError: 432 self.skipTest('could not encode %r to ascii' % TESTFN) 433 f = self.FileIO(fn, "w") 434 try: 435 f.write(b"abc") 436 f.close() 437 with open(TESTFN, "rb") as f: 438 self.assertEqual(f.read(), b"abc") 439 finally: 440 os.unlink(TESTFN) 441 442 @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8', 443 "test only works for utf-8 filesystems") 444 def testUtf8BytesOpen(self): 445 # Opening a UTF-8 bytes filename 446 try: 447 fn = TESTFN_UNICODE.encode("utf-8") 448 except UnicodeEncodeError: 449 self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE) 450 f = self.FileIO(fn, "w") 451 try: 452 f.write(b"abc") 453 f.close() 454 with open(TESTFN_UNICODE, "rb") as f: 455 self.assertEqual(f.read(), b"abc") 456 finally: 457 os.unlink(TESTFN_UNICODE) 458 459 def testConstructorHandlesNULChars(self): 460 fn_with_NUL = 'foo\0bar' 461 self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') 462 self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') 463 464 def testInvalidFd(self): 465 self.assertRaises(ValueError, self.FileIO, -10) 466 self.assertRaises(OSError, self.FileIO, make_bad_fd()) 467 if sys.platform == 'win32': 468 import msvcrt 469 self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) 470 471 def testBadModeArgument(self): 472 # verify that we get a sensible error message for bad mode argument 473 bad_mode = "qwerty" 474 try: 475 f = self.FileIO(TESTFN, bad_mode) 476 except ValueError as msg: 477 if msg.args[0] != 0: 478 s = str(msg) 479 if TESTFN in s or bad_mode not in s: 480 self.fail("bad error message for invalid mode: %s" % s) 481 # if msg.args[0] == 0, we're probably on Windows where there may be 482 # no obvious way to discover why open() failed. 483 else: 484 f.close() 485 self.fail("no error for invalid mode: %s" % bad_mode) 486 487 def testTruncate(self): 488 f = self.FileIO(TESTFN, 'w') 489 f.write(bytes(bytearray(range(10)))) 490 self.assertEqual(f.tell(), 10) 491 f.truncate(5) 492 self.assertEqual(f.tell(), 10) 493 self.assertEqual(f.seek(0, io.SEEK_END), 5) 494 f.truncate(15) 495 self.assertEqual(f.tell(), 5) 496 self.assertEqual(f.seek(0, io.SEEK_END), 15) 497 f.close() 498 499 def testTruncateOnWindows(self): 500 def bug801631(): 501 # SF bug <http://www.python.org/sf/801631> 502 # "file.truncate fault on windows" 503 f = self.FileIO(TESTFN, 'w') 504 f.write(bytes(range(11))) 505 f.close() 506 507 f = self.FileIO(TESTFN,'r+') 508 data = f.read(5) 509 if data != bytes(range(5)): 510 self.fail("Read on file opened for update failed %r" % data) 511 if f.tell() != 5: 512 self.fail("File pos after read wrong %d" % f.tell()) 513 514 f.truncate() 515 if f.tell() != 5: 516 self.fail("File pos after ftruncate wrong %d" % f.tell()) 517 518 f.close() 519 size = os.path.getsize(TESTFN) 520 if size != 5: 521 self.fail("File size after ftruncate wrong %d" % size) 522 523 try: 524 bug801631() 525 finally: 526 os.unlink(TESTFN) 527 528 def testAppend(self): 529 try: 530 f = open(TESTFN, 'wb') 531 f.write(b'spam') 532 f.close() 533 f = open(TESTFN, 'ab') 534 f.write(b'eggs') 535 f.close() 536 f = open(TESTFN, 'rb') 537 d = f.read() 538 f.close() 539 self.assertEqual(d, b'spameggs') 540 finally: 541 try: 542 os.unlink(TESTFN) 543 except: 544 pass 545 546 def testInvalidInit(self): 547 self.assertRaises(TypeError, self.FileIO, "1", 0, 0) 548 549 def testWarnings(self): 550 with check_warnings(quiet=True) as w: 551 self.assertEqual(w.warnings, []) 552 self.assertRaises(TypeError, self.FileIO, []) 553 self.assertEqual(w.warnings, []) 554 self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") 555 self.assertEqual(w.warnings, []) 556 557 def testUnclosedFDOnException(self): 558 class MyException(Exception): pass 559 class MyFileIO(self.FileIO): 560 def __setattr__(self, name, value): 561 if name == "name": 562 raise MyException("blocked setting name") 563 return super(MyFileIO, self).__setattr__(name, value) 564 fd = os.open(__file__, os.O_RDONLY) 565 self.assertRaises(MyException, MyFileIO, fd) 566 os.close(fd) # should not raise OSError(EBADF) 567 568 569class COtherFileTests(OtherFileTests, unittest.TestCase): 570 FileIO = _io.FileIO 571 modulename = '_io' 572 573 @cpython_only 574 def testInvalidFd_overflow(self): 575 # Issue 15989 576 import _testcapi 577 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) 578 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) 579 580 def test_open_code(self): 581 # Check that the default behaviour of open_code matches 582 # open("rb") 583 with self.FileIO(__file__, "rb") as f: 584 expected = f.read() 585 with _io.open_code(__file__) as f: 586 actual = f.read() 587 self.assertEqual(expected, actual) 588 589 590class PyOtherFileTests(OtherFileTests, unittest.TestCase): 591 FileIO = _pyio.FileIO 592 modulename = '_pyio' 593 594 def test_open_code(self): 595 # Check that the default behaviour of open_code matches 596 # open("rb") 597 with self.FileIO(__file__, "rb") as f: 598 expected = f.read() 599 with check_warnings(quiet=True) as w: 600 # Always test _open_code_with_warning 601 with _pyio._open_code_with_warning(__file__) as f: 602 actual = f.read() 603 self.assertEqual(expected, actual) 604 self.assertNotEqual(w.warnings, []) 605 606 607def test_main(): 608 # Historically, these tests have been sloppy about removing TESTFN. 609 # So get rid of it no matter what. 610 try: 611 run_unittest(CAutoFileTests, PyAutoFileTests, 612 COtherFileTests, PyOtherFileTests) 613 finally: 614 if os.path.exists(TESTFN): 615 os.unlink(TESTFN) 616 617if __name__ == '__main__': 618 test_main() 619