1import sys 2import os 3import unittest 4import itertools 5import select 6import signal 7import stat 8import subprocess 9import time 10from array import array 11from weakref import proxy 12try: 13 import threading 14except ImportError: 15 threading = None 16 17from test import test_support 18from test.test_support import TESTFN, run_unittest, requires 19from UserList import UserList 20 21class AutoFileTests(unittest.TestCase): 22 # file tests for which a test file is automatically set up 23 24 def setUp(self): 25 self.f = open(TESTFN, 'wb') 26 27 def tearDown(self): 28 if self.f: 29 self.f.close() 30 os.remove(TESTFN) 31 32 def testWeakRefs(self): 33 # verify weak references 34 p = proxy(self.f) 35 p.write('teststring') 36 self.assertEqual(self.f.tell(), p.tell()) 37 self.f.close() 38 self.f = None 39 self.assertRaises(ReferenceError, getattr, p, 'tell') 40 41 def testAttributes(self): 42 # verify expected attributes exist 43 f = self.f 44 with test_support.check_py3k_warnings(): 45 softspace = f.softspace 46 f.name # merely shouldn't blow up 47 f.mode # ditto 48 f.closed # ditto 49 50 with test_support.check_py3k_warnings(): 51 # verify softspace is writable 52 f.softspace = softspace # merely shouldn't blow up 53 54 # verify the others aren't 55 for attr in 'name', 'mode', 'closed': 56 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') 57 58 def testReadinto(self): 59 # verify readinto 60 self.f.write('12') 61 self.f.close() 62 a = array('c', 'x'*10) 63 self.f = open(TESTFN, 'rb') 64 n = self.f.readinto(a) 65 self.assertEqual('12', a.tostring()[:n]) 66 67 def testWritelinesUserList(self): 68 # verify writelines with instance sequence 69 l = UserList(['1', '2']) 70 self.f.writelines(l) 71 self.f.close() 72 self.f = open(TESTFN, 'rb') 73 buf = self.f.read() 74 self.assertEqual(buf, '12') 75 76 def testWritelinesIntegers(self): 77 # verify writelines with integers 78 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 79 80 def testWritelinesIntegersUserList(self): 81 # verify writelines with integers in UserList 82 l = UserList([1,2,3]) 83 self.assertRaises(TypeError, self.f.writelines, l) 84 85 def testWritelinesNonString(self): 86 # verify writelines with non-string object 87 class NonString: 88 pass 89 90 self.assertRaises(TypeError, self.f.writelines, 91 [NonString(), NonString()]) 92 93 def testWritelinesBuffer(self): 94 self.f.writelines([array('c', 'abc')]) 95 self.f.close() 96 self.f = open(TESTFN, 'rb') 97 buf = self.f.read() 98 self.assertEqual(buf, 'abc') 99 100 def testRepr(self): 101 # verify repr works 102 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN)) 103 # see issue #14161 104 # Windows doesn't like \r\n\t" in the file name, but ' is ok 105 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx" 106 with open(fname, 'w') as f: 107 self.addCleanup(os.remove, fname) 108 self.assertTrue(repr(f).startswith( 109 "<open file %r, mode 'w' at" % fname)) 110 111 def testErrors(self): 112 self.f.close() 113 self.f = open(TESTFN, 'rb') 114 f = self.f 115 self.assertEqual(f.name, TESTFN) 116 self.assertTrue(not f.isatty()) 117 self.assertTrue(not f.closed) 118 119 self.assertRaises(TypeError, f.readinto, "") 120 f.close() 121 self.assertTrue(f.closed) 122 123 def testMethods(self): 124 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 125 'readline', 'readlines', 'seek', 'tell', 'truncate', 126 'write', '__iter__'] 127 deprecated_methods = ['xreadlines'] 128 if sys.platform.startswith('atheos'): 129 methods.remove('truncate') 130 131 # __exit__ should close the file 132 self.f.__exit__(None, None, None) 133 self.assertTrue(self.f.closed) 134 135 for methodname in methods: 136 method = getattr(self.f, methodname) 137 # should raise on closed file 138 self.assertRaises(ValueError, method) 139 with test_support.check_py3k_warnings(): 140 for methodname in deprecated_methods: 141 method = getattr(self.f, methodname) 142 self.assertRaises(ValueError, method) 143 self.assertRaises(ValueError, self.f.writelines, []) 144 145 # file is closed, __exit__ shouldn't do anything 146 self.assertEqual(self.f.__exit__(None, None, None), None) 147 # it must also return None if an exception was given 148 try: 149 1 // 0 150 except: 151 self.assertEqual(self.f.__exit__(*sys.exc_info()), None) 152 153 def testReadWhenWriting(self): 154 self.assertRaises(IOError, self.f.read) 155 156 def testNastyWritelinesGenerator(self): 157 def nasty(): 158 for i in range(5): 159 if i == 3: 160 self.f.close() 161 yield str(i) 162 self.assertRaises(ValueError, self.f.writelines, nasty()) 163 164 def testIssue5677(self): 165 # Remark: Do not perform more than one test per open file, 166 # since that does NOT catch the readline error on Windows. 167 data = 'xxx' 168 for mode in ['w', 'wb', 'a', 'ab']: 169 for attr in ['read', 'readline', 'readlines']: 170 self.f = open(TESTFN, mode) 171 self.f.write(data) 172 self.assertRaises(IOError, getattr(self.f, attr)) 173 self.f.close() 174 175 self.f = open(TESTFN, mode) 176 self.f.write(data) 177 self.assertRaises(IOError, lambda: [line for line in self.f]) 178 self.f.close() 179 180 self.f = open(TESTFN, mode) 181 self.f.write(data) 182 self.assertRaises(IOError, self.f.readinto, bytearray(len(data))) 183 self.f.close() 184 185 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']: 186 self.f = open(TESTFN, mode) 187 self.assertRaises(IOError, self.f.write, data) 188 self.f.close() 189 190 self.f = open(TESTFN, mode) 191 self.assertRaises(IOError, self.f.writelines, [data, data]) 192 self.f.close() 193 194 self.f = open(TESTFN, mode) 195 self.assertRaises(IOError, self.f.truncate) 196 self.f.close() 197 198class OtherFileTests(unittest.TestCase): 199 200 def testOpenDir(self): 201 this_dir = os.path.dirname(__file__) or os.curdir 202 for mode in (None, "w"): 203 try: 204 if mode: 205 f = open(this_dir, mode) 206 else: 207 f = open(this_dir) 208 except IOError as e: 209 self.assertEqual(e.filename, this_dir) 210 else: 211 self.fail("opening a directory didn't raise an IOError") 212 213 def testModeStrings(self): 214 # check invalid mode strings 215 for mode in ("", "aU", "wU+"): 216 try: 217 f = open(TESTFN, mode) 218 except ValueError: 219 pass 220 else: 221 f.close() 222 self.fail('%r is an invalid file mode' % mode) 223 224 # Some invalid modes fail on Windows, but pass on Unix 225 # Issue3965: avoid a crash on Windows when filename is unicode 226 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')): 227 try: 228 f = open(name, "rr") 229 except (IOError, ValueError): 230 pass 231 else: 232 f.close() 233 234 def testStdinSeek(self): 235 if sys.platform == 'osf1V5': 236 # This causes the interpreter to exit on OSF1 v5.1. 237 self.skipTest('Skipping sys.stdin.seek(-1), it may crash ' 238 'the interpreter. Test manually.') 239 240 if not sys.stdin.isatty(): 241 # Issue #23168: if stdin is redirected to a file, stdin becomes 242 # seekable 243 self.skipTest('stdin must be a TTY in this test') 244 245 self.assertRaises(IOError, sys.stdin.seek, -1) 246 247 def testStdinTruncate(self): 248 self.assertRaises(IOError, sys.stdin.truncate) 249 250 def testUnicodeOpen(self): 251 # verify repr works for unicode too 252 f = open(unicode(TESTFN), "w") 253 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN)) 254 f.close() 255 os.unlink(TESTFN) 256 257 def testBadModeArgument(self): 258 # verify that we get a sensible error message for bad mode argument 259 bad_mode = "qwerty" 260 try: 261 f = open(TESTFN, bad_mode) 262 except ValueError, msg: 263 if msg.args[0] != 0: 264 s = str(msg) 265 if TESTFN in s or bad_mode not in s: 266 self.fail("bad error message for invalid mode: %s" % s) 267 # if msg.args[0] == 0, we're probably on Windows where there may 268 # be no obvious way to discover why open() failed. 269 else: 270 f.close() 271 self.fail("no error for invalid mode: %s" % bad_mode) 272 273 def testSetBufferSize(self): 274 # make sure that explicitly setting the buffer size doesn't cause 275 # misbehaviour especially with repeated close() calls 276 for s in (-1, 0, 1, 512): 277 try: 278 f = open(TESTFN, 'w', s) 279 f.write(str(s)) 280 f.close() 281 f.close() 282 f = open(TESTFN, 'r', s) 283 d = int(f.read()) 284 f.close() 285 f.close() 286 except IOError, msg: 287 self.fail('error setting buffer size %d: %s' % (s, str(msg))) 288 self.assertEqual(d, s) 289 290 def testTruncateOnWindows(self): 291 os.unlink(TESTFN) 292 293 def bug801631(): 294 # SF bug <http://www.python.org/sf/801631> 295 # "file.truncate fault on windows" 296 f = open(TESTFN, 'wb') 297 f.write('12345678901') # 11 bytes 298 f.close() 299 300 f = open(TESTFN,'rb+') 301 data = f.read(5) 302 if data != '12345': 303 self.fail("Read on file opened for update failed %r" % data) 304 if f.tell() != 5: 305 self.fail("File pos after read wrong %d" % f.tell()) 306 307 f.truncate() 308 if f.tell() != 5: 309 self.fail("File pos after ftruncate wrong %d" % f.tell()) 310 311 f.close() 312 size = os.path.getsize(TESTFN) 313 if size != 5: 314 self.fail("File size after ftruncate wrong %d" % size) 315 316 try: 317 bug801631() 318 finally: 319 os.unlink(TESTFN) 320 321 def testIteration(self): 322 # Test the complex interaction when mixing file-iteration and the 323 # various read* methods. Ostensibly, the mixture could just be tested 324 # to work when it should work according to the Python language, 325 # instead of fail when it should fail according to the current CPython 326 # implementation. People don't always program Python the way they 327 # should, though, and the implemenation might change in subtle ways, 328 # so we explicitly test for errors, too; the test will just have to 329 # be updated when the implementation changes. 330 dataoffset = 16384 331 filler = "ham\n" 332 assert not dataoffset % len(filler), \ 333 "dataoffset must be multiple of len(filler)" 334 nchunks = dataoffset // len(filler) 335 testlines = [ 336 "spam, spam and eggs\n", 337 "eggs, spam, ham and spam\n", 338 "saussages, spam, spam and eggs\n", 339 "spam, ham, spam and eggs\n", 340 "spam, spam, spam, spam, spam, ham, spam\n", 341 "wonderful spaaaaaam.\n" 342 ] 343 methods = [("readline", ()), ("read", ()), ("readlines", ()), 344 ("readinto", (array("c", " "*100),))] 345 346 try: 347 # Prepare the testfile 348 bag = open(TESTFN, "w") 349 bag.write(filler * nchunks) 350 bag.writelines(testlines) 351 bag.close() 352 # Test for appropriate errors mixing read* and iteration 353 for methodname, args in methods: 354 f = open(TESTFN) 355 if f.next() != filler: 356 self.fail, "Broken testfile" 357 meth = getattr(f, methodname) 358 try: 359 meth(*args) 360 except ValueError: 361 pass 362 else: 363 self.fail("%s%r after next() didn't raise ValueError" % 364 (methodname, args)) 365 f.close() 366 367 # Test to see if harmless (by accident) mixing of read* and 368 # iteration still works. This depends on the size of the internal 369 # iteration buffer (currently 8192,) but we can test it in a 370 # flexible manner. Each line in the bag o' ham is 4 bytes 371 # ("h", "a", "m", "\n"), so 4096 lines of that should get us 372 # exactly on the buffer boundary for any power-of-2 buffersize 373 # between 4 and 16384 (inclusive). 374 f = open(TESTFN) 375 for i in range(nchunks): 376 f.next() 377 testline = testlines.pop(0) 378 try: 379 line = f.readline() 380 except ValueError: 381 self.fail("readline() after next() with supposedly empty " 382 "iteration-buffer failed anyway") 383 if line != testline: 384 self.fail("readline() after next() with empty buffer " 385 "failed. Got %r, expected %r" % (line, testline)) 386 testline = testlines.pop(0) 387 buf = array("c", "\x00" * len(testline)) 388 try: 389 f.readinto(buf) 390 except ValueError: 391 self.fail("readinto() after next() with supposedly empty " 392 "iteration-buffer failed anyway") 393 line = buf.tostring() 394 if line != testline: 395 self.fail("readinto() after next() with empty buffer " 396 "failed. Got %r, expected %r" % (line, testline)) 397 398 testline = testlines.pop(0) 399 try: 400 line = f.read(len(testline)) 401 except ValueError: 402 self.fail("read() after next() with supposedly empty " 403 "iteration-buffer failed anyway") 404 if line != testline: 405 self.fail("read() after next() with empty buffer " 406 "failed. Got %r, expected %r" % (line, testline)) 407 try: 408 lines = f.readlines() 409 except ValueError: 410 self.fail("readlines() after next() with supposedly empty " 411 "iteration-buffer failed anyway") 412 if lines != testlines: 413 self.fail("readlines() after next() with empty buffer " 414 "failed. Got %r, expected %r" % (line, testline)) 415 # Reading after iteration hit EOF shouldn't hurt either 416 f = open(TESTFN) 417 try: 418 for line in f: 419 pass 420 try: 421 f.readline() 422 f.readinto(buf) 423 f.read() 424 f.readlines() 425 except ValueError: 426 self.fail("read* failed after next() consumed file") 427 finally: 428 f.close() 429 finally: 430 os.unlink(TESTFN) 431 432 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.') 433 def test_write_full(self): 434 devfull = '/dev/full' 435 if not (os.path.exists(devfull) and 436 stat.S_ISCHR(os.stat(devfull).st_mode)): 437 # Issue #21934: OpenBSD does not have a /dev/full character device 438 self.skipTest('requires %r' % devfull) 439 with open(devfull, 'wb', 1) as f: 440 with self.assertRaises(IOError): 441 f.write('hello\n') 442 with open(devfull, 'wb', 1) as f: 443 with self.assertRaises(IOError): 444 # Issue #17976 445 f.write('hello') 446 f.write('\n') 447 with open(devfull, 'wb', 0) as f: 448 with self.assertRaises(IOError): 449 f.write('h') 450 451 @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system") 452 @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False) 453 def test_very_long_line(self, size): 454 # Issue #22526 455 requires('largefile') 456 with open(TESTFN, "wb") as fp: 457 fp.seek(size - 1) 458 fp.write("\0") 459 with open(TESTFN, "rb") as fp: 460 for l in fp: 461 pass 462 self.assertEqual(len(l), size) 463 self.assertEqual(l.count("\0"), size) 464 l = None 465 466class FileSubclassTests(unittest.TestCase): 467 468 def testExit(self): 469 # test that exiting with context calls subclass' close 470 class C(file): 471 def __init__(self, *args): 472 self.subclass_closed = False 473 file.__init__(self, *args) 474 def close(self): 475 self.subclass_closed = True 476 file.close(self) 477 478 with C(TESTFN, 'w') as f: 479 pass 480 self.assertTrue(f.subclass_closed) 481 482 483@unittest.skipUnless(threading, 'Threading required for this test.') 484class FileThreadingTests(unittest.TestCase): 485 # These tests check the ability to call various methods of file objects 486 # (including close()) concurrently without crashing the Python interpreter. 487 # See #815646, #595601 488 489 def setUp(self): 490 self._threads = test_support.threading_setup() 491 self.f = None 492 self.filename = TESTFN 493 with open(self.filename, "w") as f: 494 f.write("\n".join("0123456789")) 495 self._count_lock = threading.Lock() 496 self.close_count = 0 497 self.close_success_count = 0 498 self.use_buffering = False 499 500 def tearDown(self): 501 if self.f: 502 try: 503 self.f.close() 504 except (EnvironmentError, ValueError): 505 pass 506 try: 507 os.remove(self.filename) 508 except EnvironmentError: 509 pass 510 test_support.threading_cleanup(*self._threads) 511 512 def _create_file(self): 513 if self.use_buffering: 514 self.f = open(self.filename, "w+", buffering=1024*16) 515 else: 516 self.f = open(self.filename, "w+") 517 518 def _close_file(self): 519 with self._count_lock: 520 self.close_count += 1 521 self.f.close() 522 with self._count_lock: 523 self.close_success_count += 1 524 525 def _close_and_reopen_file(self): 526 self._close_file() 527 # if close raises an exception thats fine, self.f remains valid so 528 # we don't need to reopen. 529 self._create_file() 530 531 def _run_workers(self, func, nb_workers, duration=0.2): 532 with self._count_lock: 533 self.close_count = 0 534 self.close_success_count = 0 535 self.do_continue = True 536 threads = [] 537 try: 538 for i in range(nb_workers): 539 t = threading.Thread(target=func) 540 t.start() 541 threads.append(t) 542 for _ in xrange(100): 543 time.sleep(duration/100) 544 with self._count_lock: 545 if self.close_count-self.close_success_count > nb_workers+1: 546 if test_support.verbose: 547 print 'Q', 548 break 549 time.sleep(duration) 550 finally: 551 self.do_continue = False 552 for t in threads: 553 t.join() 554 555 def _test_close_open_io(self, io_func, nb_workers=5): 556 def worker(): 557 self._create_file() 558 funcs = itertools.cycle(( 559 lambda: io_func(), 560 lambda: self._close_and_reopen_file(), 561 )) 562 for f in funcs: 563 if not self.do_continue: 564 break 565 try: 566 f() 567 except (IOError, ValueError): 568 pass 569 self._run_workers(worker, nb_workers) 570 if test_support.verbose: 571 # Useful verbose statistics when tuning this test to take 572 # less time to run but still ensuring that its still useful. 573 # 574 # the percent of close calls that raised an error 575 percent = 100. - 100.*self.close_success_count/self.close_count 576 print self.close_count, ('%.4f ' % percent), 577 578 def test_close_open(self): 579 def io_func(): 580 pass 581 self._test_close_open_io(io_func) 582 583 def test_close_open_flush(self): 584 def io_func(): 585 self.f.flush() 586 self._test_close_open_io(io_func) 587 588 def test_close_open_iter(self): 589 def io_func(): 590 list(iter(self.f)) 591 self._test_close_open_io(io_func) 592 593 def test_close_open_isatty(self): 594 def io_func(): 595 self.f.isatty() 596 self._test_close_open_io(io_func) 597 598 def test_close_open_print(self): 599 def io_func(): 600 print >> self.f, '' 601 self._test_close_open_io(io_func) 602 603 def test_close_open_print_buffered(self): 604 self.use_buffering = True 605 def io_func(): 606 print >> self.f, '' 607 self._test_close_open_io(io_func) 608 609 def test_close_open_read(self): 610 def io_func(): 611 self.f.read(0) 612 self._test_close_open_io(io_func) 613 614 def test_close_open_readinto(self): 615 def io_func(): 616 a = array('c', 'xxxxx') 617 self.f.readinto(a) 618 self._test_close_open_io(io_func) 619 620 def test_close_open_readline(self): 621 def io_func(): 622 self.f.readline() 623 self._test_close_open_io(io_func) 624 625 def test_close_open_readlines(self): 626 def io_func(): 627 self.f.readlines() 628 self._test_close_open_io(io_func) 629 630 def test_close_open_seek(self): 631 def io_func(): 632 self.f.seek(0, 0) 633 self._test_close_open_io(io_func) 634 635 def test_close_open_tell(self): 636 def io_func(): 637 self.f.tell() 638 self._test_close_open_io(io_func) 639 640 def test_close_open_truncate(self): 641 def io_func(): 642 self.f.truncate() 643 self._test_close_open_io(io_func) 644 645 def test_close_open_write(self): 646 def io_func(): 647 self.f.write('') 648 self._test_close_open_io(io_func) 649 650 def test_close_open_writelines(self): 651 def io_func(): 652 self.f.writelines('') 653 self._test_close_open_io(io_func) 654 655 656@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.') 657class TestFileSignalEINTR(unittest.TestCase): 658 def _test_reading(self, data_to_write, read_and_verify_code, method_name, 659 universal_newlines=False): 660 """Generic buffered read method test harness to verify EINTR behavior. 661 662 Also validates that Python signal handlers are run during the read. 663 664 Args: 665 data_to_write: String to write to the child process for reading 666 before sending it a signal, confirming the signal was handled, 667 writing a final newline char and closing the infile pipe. 668 read_and_verify_code: Single "line" of code to read from a file 669 object named 'infile' and validate the result. This will be 670 executed as part of a python subprocess fed data_to_write. 671 method_name: The name of the read method being tested, for use in 672 an error message on failure. 673 universal_newlines: If True, infile will be opened in universal 674 newline mode in the child process. 675 """ 676 if universal_newlines: 677 # Test the \r\n -> \n conversion while we're at it. 678 data_to_write = data_to_write.replace('\n', '\r\n') 679 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")' 680 else: 681 infile_setup_code = 'infile = sys.stdin' 682 # Total pipe IO in this function is smaller than the minimum posix OS 683 # pipe buffer size of 512 bytes. No writer should block. 684 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.' 685 686 child_code = ( 687 'import os, signal, sys ;' 688 'signal.signal(' 689 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;' 690 + infile_setup_code + ' ;' + 691 'assert isinstance(infile, file) ;' 692 'sys.stderr.write("Go.\\n") ;' 693 + read_and_verify_code) 694 reader_process = subprocess.Popen( 695 [sys.executable, '-c', child_code], 696 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 697 stderr=subprocess.PIPE) 698 # Wait for the signal handler to be installed. 699 go = reader_process.stderr.read(4) 700 if go != 'Go.\n': 701 reader_process.kill() 702 self.fail('Error from %s process while awaiting "Go":\n%s' % ( 703 method_name, go+reader_process.stderr.read())) 704 reader_process.stdin.write(data_to_write) 705 signals_sent = 0 706 rlist = [] 707 # We don't know when the read_and_verify_code in our child is actually 708 # executing within the read system call we want to interrupt. This 709 # loop waits for a bit before sending the first signal to increase 710 # the likelihood of that. Implementations without correct EINTR 711 # and signal handling usually fail this test. 712 while not rlist: 713 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05) 714 reader_process.send_signal(signal.SIGINT) 715 # Give the subprocess time to handle it before we loop around and 716 # send another one. On OSX the second signal happening close to 717 # immediately after the first was causing the subprocess to crash 718 # via the OS's default SIGINT handler. 719 time.sleep(0.1) 720 signals_sent += 1 721 if signals_sent > 200: 722 reader_process.kill() 723 self.fail("failed to handle signal during %s." % method_name) 724 # This assumes anything unexpected that writes to stderr will also 725 # write a newline. That is true of the traceback printing code. 726 signal_line = reader_process.stderr.readline() 727 if signal_line != '$\n': 728 reader_process.kill() 729 self.fail('Error from %s process while awaiting signal:\n%s' % ( 730 method_name, signal_line+reader_process.stderr.read())) 731 # We append a newline to our input so that a readline call can 732 # end on its own before the EOF is seen. 733 stdout, stderr = reader_process.communicate(input='\n') 734 if reader_process.returncode != 0: 735 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % ( 736 method_name, reader_process.returncode, stdout, stderr)) 737 738 def test_readline(self, universal_newlines=False): 739 """file.readline must handle signals and not lose data.""" 740 self._test_reading( 741 data_to_write='hello, world!', 742 read_and_verify_code=( 743 'line = infile.readline() ;' 744 'expected_line = "hello, world!\\n" ;' 745 'assert line == expected_line, (' 746 '"read %r expected %r" % (line, expected_line))' 747 ), 748 method_name='readline', 749 universal_newlines=universal_newlines) 750 751 def test_readline_with_universal_newlines(self): 752 self.test_readline(universal_newlines=True) 753 754 def test_readlines(self, universal_newlines=False): 755 """file.readlines must handle signals and not lose data.""" 756 self._test_reading( 757 data_to_write='hello\nworld!', 758 read_and_verify_code=( 759 'lines = infile.readlines() ;' 760 'expected_lines = ["hello\\n", "world!\\n"] ;' 761 'assert lines == expected_lines, (' 762 '"readlines returned wrong data.\\n" ' 763 '"got lines %r\\nexpected %r" ' 764 '% (lines, expected_lines))' 765 ), 766 method_name='readlines', 767 universal_newlines=universal_newlines) 768 769 def test_readlines_with_universal_newlines(self): 770 self.test_readlines(universal_newlines=True) 771 772 def test_readall(self): 773 """Unbounded file.read() must handle signals and not lose data.""" 774 self._test_reading( 775 data_to_write='hello, world!abcdefghijklm', 776 read_and_verify_code=( 777 'data = infile.read() ;' 778 'expected_data = "hello, world!abcdefghijklm\\n";' 779 'assert data == expected_data, (' 780 '"read %r expected %r" % (data, expected_data))' 781 ), 782 method_name='unbounded read') 783 784 def test_readinto(self): 785 """file.readinto must handle signals and not lose data.""" 786 self._test_reading( 787 data_to_write='hello, world!', 788 read_and_verify_code=( 789 'data = bytearray(50) ;' 790 'num_read = infile.readinto(data) ;' 791 'expected_data = "hello, world!\\n";' 792 'assert data[:num_read] == expected_data, (' 793 '"read %r expected %r" % (data, expected_data))' 794 ), 795 method_name='readinto') 796 797 798class StdoutTests(unittest.TestCase): 799 800 def test_move_stdout_on_write(self): 801 # Issue 3242: sys.stdout can be replaced (and freed) during a 802 # print statement; prevent a segfault in this case 803 save_stdout = sys.stdout 804 805 class File: 806 def write(self, data): 807 if '\n' in data: 808 sys.stdout = save_stdout 809 810 try: 811 sys.stdout = File() 812 print "some text" 813 finally: 814 sys.stdout = save_stdout 815 816 def test_del_stdout_before_print(self): 817 # Issue 4597: 'print' with no argument wasn't reporting when 818 # sys.stdout was deleted. 819 save_stdout = sys.stdout 820 del sys.stdout 821 try: 822 print 823 except RuntimeError as e: 824 self.assertEqual(str(e), "lost sys.stdout") 825 else: 826 self.fail("Expected RuntimeError") 827 finally: 828 sys.stdout = save_stdout 829 830 def test_unicode(self): 831 import subprocess 832 833 def get_message(encoding, *code): 834 code = '\n'.join(code) 835 env = os.environ.copy() 836 env['PYTHONIOENCODING'] = encoding 837 process = subprocess.Popen([sys.executable, "-c", code], 838 stdout=subprocess.PIPE, env=env) 839 stdout, stderr = process.communicate() 840 self.assertEqual(process.returncode, 0) 841 return stdout 842 843 def check_message(text, encoding, expected): 844 stdout = get_message(encoding, 845 "import sys", 846 "sys.stdout.write(%r)" % text, 847 "sys.stdout.flush()") 848 self.assertEqual(stdout, expected) 849 850 # test the encoding 851 check_message(u'15\u20ac', "iso-8859-15", "15\xa4") 852 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac') 853 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20') 854 855 # test the error handler 856 check_message(u'15\u20ac', "iso-8859-1:ignore", "15") 857 check_message(u'15\u20ac', "iso-8859-1:replace", "15?") 858 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac") 859 860 # test the buffer API 861 for objtype in ('buffer', 'bytearray'): 862 stdout = get_message('ascii', 863 'import sys', 864 r'sys.stdout.write(%s("\xe9"))' % objtype, 865 'sys.stdout.flush()') 866 self.assertEqual(stdout, "\xe9") 867 868 869def test_main(): 870 # Historically, these tests have been sloppy about removing TESTFN. 871 # So get rid of it no matter what. 872 try: 873 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, 874 FileThreadingTests, TestFileSignalEINTR, StdoutTests) 875 finally: 876 if os.path.exists(TESTFN): 877 os.unlink(TESTFN) 878 879if __name__ == '__main__': 880 test_main() 881