1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import os 6import errno 7import unittest 8import warnings 9import sys 10import signal 11import subprocess 12import sysconfig 13import textwrap 14import time 15try: 16 import resource 17except ImportError: 18 resource = None 19 20from test import test_support 21from test.script_helper import assert_python_ok 22import mmap 23import uuid 24 25warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__) 26warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__) 27 28# Tests creating TESTFN 29class FileTests(unittest.TestCase): 30 def setUp(self): 31 if os.path.exists(test_support.TESTFN): 32 os.unlink(test_support.TESTFN) 33 tearDown = setUp 34 35 def test_access(self): 36 f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 37 os.close(f) 38 self.assertTrue(os.access(test_support.TESTFN, os.W_OK)) 39 40 def test_closerange(self): 41 first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 42 # We must allocate two consecutive file descriptors, otherwise 43 # it will mess up other file descriptors (perhaps even the three 44 # standard ones). 45 second = os.dup(first) 46 try: 47 retries = 0 48 while second != first + 1: 49 os.close(first) 50 retries += 1 51 if retries > 10: 52 # XXX test skipped 53 self.skipTest("couldn't allocate two consecutive fds") 54 first, second = second, os.dup(second) 55 finally: 56 os.close(second) 57 # close a fd that is open, and one that isn't 58 os.closerange(first, first + 2) 59 self.assertRaises(OSError, os.write, first, "a") 60 61 @test_support.cpython_only 62 def test_rename(self): 63 path = unicode(test_support.TESTFN) 64 old = sys.getrefcount(path) 65 self.assertRaises(TypeError, os.rename, path, 0) 66 new = sys.getrefcount(path) 67 self.assertEqual(old, new) 68 69 70class TemporaryFileTests(unittest.TestCase): 71 def setUp(self): 72 self.files = [] 73 os.mkdir(test_support.TESTFN) 74 75 def tearDown(self): 76 for name in self.files: 77 os.unlink(name) 78 os.rmdir(test_support.TESTFN) 79 80 def check_tempfile(self, name): 81 # make sure it doesn't already exist: 82 self.assertFalse(os.path.exists(name), 83 "file already exists for temporary file") 84 # make sure we can create the file 85 open(name, "w") 86 self.files.append(name) 87 88 @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()') 89 def test_tempnam(self): 90 with warnings.catch_warnings(): 91 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, 92 r"test_os$") 93 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 94 self.check_tempfile(os.tempnam()) 95 96 name = os.tempnam(test_support.TESTFN) 97 self.check_tempfile(name) 98 99 name = os.tempnam(test_support.TESTFN, "pfx") 100 self.assertTrue(os.path.basename(name)[:3] == "pfx") 101 self.check_tempfile(name) 102 103 @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()') 104 def test_tmpfile(self): 105 # As with test_tmpnam() below, the Windows implementation of tmpfile() 106 # attempts to create a file in the root directory of the current drive. 107 # On Vista and Server 2008, this test will always fail for normal users 108 # as writing to the root directory requires elevated privileges. With 109 # XP and below, the semantics of tmpfile() are the same, but the user 110 # running the test is more likely to have administrative privileges on 111 # their account already. If that's the case, then os.tmpfile() should 112 # work. In order to make this test as useful as possible, rather than 113 # trying to detect Windows versions or whether or not the user has the 114 # right permissions, just try and create a file in the root directory 115 # and see if it raises a 'Permission denied' OSError. If it does, then 116 # test that a subsequent call to os.tmpfile() raises the same error. If 117 # it doesn't, assume we're on XP or below and the user running the test 118 # has administrative privileges, and proceed with the test as normal. 119 with warnings.catch_warnings(): 120 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 121 122 if sys.platform == 'win32': 123 name = '\\python_test_os_test_tmpfile.txt' 124 if os.path.exists(name): 125 os.remove(name) 126 try: 127 fp = open(name, 'w') 128 except IOError, first: 129 # open() failed, assert tmpfile() fails in the same way. 130 # Although open() raises an IOError and os.tmpfile() raises an 131 # OSError(), 'args' will be (13, 'Permission denied') in both 132 # cases. 133 try: 134 fp = os.tmpfile() 135 except OSError, second: 136 self.assertEqual(first.args, second.args) 137 else: 138 self.fail("expected os.tmpfile() to raise OSError") 139 return 140 else: 141 # open() worked, therefore, tmpfile() should work. Close our 142 # dummy file and proceed with the test as normal. 143 fp.close() 144 os.remove(name) 145 146 fp = os.tmpfile() 147 fp.write("foobar") 148 fp.seek(0,0) 149 s = fp.read() 150 fp.close() 151 self.assertTrue(s == "foobar") 152 153 @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()') 154 def test_tmpnam(self): 155 with warnings.catch_warnings(): 156 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, 157 r"test_os$") 158 warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning) 159 160 name = os.tmpnam() 161 if sys.platform in ("win32",): 162 # The Windows tmpnam() seems useless. From the MS docs: 163 # 164 # The character string that tmpnam creates consists of 165 # the path prefix, defined by the entry P_tmpdir in the 166 # file STDIO.H, followed by a sequence consisting of the 167 # digit characters '0' through '9'; the numerical value 168 # of this string is in the range 1 - 65,535. Changing the 169 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not 170 # change the operation of tmpnam. 171 # 172 # The really bizarre part is that, at least under MSVC6, 173 # P_tmpdir is "\\". That is, the path returned refers to 174 # the root of the current drive. That's a terrible place to 175 # put temp files, and, depending on privileges, the user 176 # may not even be able to open a file in the root directory. 177 self.assertFalse(os.path.exists(name), 178 "file already exists for temporary file") 179 else: 180 self.check_tempfile(name) 181 182# Test attributes on return values from os.*stat* family. 183class StatAttributeTests(unittest.TestCase): 184 def setUp(self): 185 os.mkdir(test_support.TESTFN) 186 self.fname = os.path.join(test_support.TESTFN, "f1") 187 f = open(self.fname, 'wb') 188 f.write("ABC") 189 f.close() 190 191 def tearDown(self): 192 os.unlink(self.fname) 193 os.rmdir(test_support.TESTFN) 194 195 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 196 def test_stat_attributes(self): 197 import stat 198 result = os.stat(self.fname) 199 200 # Make sure direct access works 201 self.assertEqual(result[stat.ST_SIZE], 3) 202 self.assertEqual(result.st_size, 3) 203 204 # Make sure all the attributes are there 205 members = dir(result) 206 for name in dir(stat): 207 if name[:3] == 'ST_': 208 attr = name.lower() 209 if name.endswith("TIME"): 210 def trunc(x): return int(x) 211 else: 212 def trunc(x): return x 213 self.assertEqual(trunc(getattr(result, attr)), 214 result[getattr(stat, name)]) 215 self.assertIn(attr, members) 216 217 try: 218 result[200] 219 self.fail("No exception raised") 220 except IndexError: 221 pass 222 223 # Make sure that assignment fails 224 try: 225 result.st_mode = 1 226 self.fail("No exception raised") 227 except (AttributeError, TypeError): 228 pass 229 230 try: 231 result.st_rdev = 1 232 self.fail("No exception raised") 233 except (AttributeError, TypeError): 234 pass 235 236 try: 237 result.parrot = 1 238 self.fail("No exception raised") 239 except AttributeError: 240 pass 241 242 # Use the stat_result constructor with a too-short tuple. 243 try: 244 result2 = os.stat_result((10,)) 245 self.fail("No exception raised") 246 except TypeError: 247 pass 248 249 # Use the constructor with a too-long tuple. 250 try: 251 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 252 except TypeError: 253 pass 254 255 256 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 257 def test_statvfs_attributes(self): 258 try: 259 result = os.statvfs(self.fname) 260 except OSError, e: 261 # On AtheOS, glibc always returns ENOSYS 262 if e.errno == errno.ENOSYS: 263 self.skipTest('glibc always returns ENOSYS on AtheOS') 264 265 # Make sure direct access works 266 self.assertEqual(result.f_bfree, result[3]) 267 268 # Make sure all the attributes are there. 269 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 270 'ffree', 'favail', 'flag', 'namemax') 271 for value, member in enumerate(members): 272 self.assertEqual(getattr(result, 'f_' + member), result[value]) 273 274 # Make sure that assignment really fails 275 try: 276 result.f_bfree = 1 277 self.fail("No exception raised") 278 except TypeError: 279 pass 280 281 try: 282 result.parrot = 1 283 self.fail("No exception raised") 284 except AttributeError: 285 pass 286 287 # Use the constructor with a too-short tuple. 288 try: 289 result2 = os.statvfs_result((10,)) 290 self.fail("No exception raised") 291 except TypeError: 292 pass 293 294 # Use the constructor with a too-long tuple. 295 try: 296 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 297 except TypeError: 298 pass 299 300 def test_utime_dir(self): 301 delta = 1000000 302 st = os.stat(test_support.TESTFN) 303 # round to int, because some systems may support sub-second 304 # time stamps in stat, but not in utime. 305 os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) 306 st2 = os.stat(test_support.TESTFN) 307 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) 308 309 # Restrict tests to Win32, since there is no guarantee other 310 # systems support centiseconds 311 def get_file_system(path): 312 if sys.platform == 'win32': 313 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 314 import ctypes 315 kernel32 = ctypes.windll.kernel32 316 buf = ctypes.create_string_buffer("", 100) 317 if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)): 318 return buf.value 319 320 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 321 @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS", 322 "requires NTFS") 323 def test_1565150(self): 324 t1 = 1159195039.25 325 os.utime(self.fname, (t1, t1)) 326 self.assertEqual(os.stat(self.fname).st_mtime, t1) 327 328 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 329 @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS", 330 "requires NTFS") 331 def test_large_time(self): 332 t1 = 5000000000 # some day in 2128 333 os.utime(self.fname, (t1, t1)) 334 self.assertEqual(os.stat(self.fname).st_mtime, t1) 335 336 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 337 def test_1686475(self): 338 # Verify that an open file can be stat'ed 339 try: 340 os.stat(r"c:\pagefile.sys") 341 except WindowsError, e: 342 if e.errno == 2: # file does not exist; cannot run test 343 self.skipTest(r'c:\pagefile.sys does not exist') 344 self.fail("Could not stat pagefile.sys") 345 346from test import mapping_tests 347 348class EnvironTests(mapping_tests.BasicTestMappingProtocol): 349 """check that os.environ object conform to mapping protocol""" 350 type2test = None 351 def _reference(self): 352 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 353 def _empty_mapping(self): 354 os.environ.clear() 355 return os.environ 356 def setUp(self): 357 self.__save = dict(os.environ) 358 os.environ.clear() 359 def tearDown(self): 360 os.environ.clear() 361 os.environ.update(self.__save) 362 363 # Bug 1110478 364 def test_update2(self): 365 if os.path.exists("/bin/sh"): 366 os.environ.update(HELLO="World") 367 with os.popen("/bin/sh -c 'echo $HELLO'") as popen: 368 value = popen.read().strip() 369 self.assertEqual(value, "World") 370 371 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue 372 # #13415). 373 @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')), 374 "due to known OS bug: see issue #13415") 375 def test_unset_error(self): 376 if sys.platform == "win32": 377 # an environment variable is limited to 32,767 characters 378 key = 'x' * 50000 379 self.assertRaises(ValueError, os.environ.__delitem__, key) 380 else: 381 # "=" is not allowed in a variable name 382 key = 'key=' 383 self.assertRaises(OSError, os.environ.__delitem__, key) 384 385class WalkTests(unittest.TestCase): 386 """Tests for os.walk().""" 387 388 def test_traversal(self): 389 import os 390 from os.path import join 391 392 # Build: 393 # TESTFN/ 394 # TEST1/ a file kid and two directory kids 395 # tmp1 396 # SUB1/ a file kid and a directory kid 397 # tmp2 398 # SUB11/ no kids 399 # SUB2/ a file kid and a dirsymlink kid 400 # tmp3 401 # link/ a symlink to TESTFN.2 402 # TEST2/ 403 # tmp4 a lone file 404 walk_path = join(test_support.TESTFN, "TEST1") 405 sub1_path = join(walk_path, "SUB1") 406 sub11_path = join(sub1_path, "SUB11") 407 sub2_path = join(walk_path, "SUB2") 408 tmp1_path = join(walk_path, "tmp1") 409 tmp2_path = join(sub1_path, "tmp2") 410 tmp3_path = join(sub2_path, "tmp3") 411 link_path = join(sub2_path, "link") 412 t2_path = join(test_support.TESTFN, "TEST2") 413 tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4") 414 415 # Create stuff. 416 os.makedirs(sub11_path) 417 os.makedirs(sub2_path) 418 os.makedirs(t2_path) 419 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 420 f = file(path, "w") 421 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 422 f.close() 423 if hasattr(os, "symlink"): 424 os.symlink(os.path.abspath(t2_path), link_path) 425 sub2_tree = (sub2_path, ["link"], ["tmp3"]) 426 else: 427 sub2_tree = (sub2_path, [], ["tmp3"]) 428 429 # Walk top-down. 430 all = list(os.walk(walk_path)) 431 self.assertEqual(len(all), 4) 432 # We can't know which order SUB1 and SUB2 will appear in. 433 # Not flipped: TESTFN, SUB1, SUB11, SUB2 434 # flipped: TESTFN, SUB2, SUB1, SUB11 435 flipped = all[0][1][0] != "SUB1" 436 all[0][1].sort() 437 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 438 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) 439 self.assertEqual(all[2 + flipped], (sub11_path, [], [])) 440 self.assertEqual(all[3 - 2 * flipped], sub2_tree) 441 442 # Prune the search. 443 all = [] 444 for root, dirs, files in os.walk(walk_path): 445 all.append((root, dirs, files)) 446 # Don't descend into SUB1. 447 if 'SUB1' in dirs: 448 # Note that this also mutates the dirs we appended to all! 449 dirs.remove('SUB1') 450 self.assertEqual(len(all), 2) 451 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) 452 self.assertEqual(all[1], sub2_tree) 453 454 # Walk bottom-up. 455 all = list(os.walk(walk_path, topdown=False)) 456 self.assertEqual(len(all), 4) 457 # We can't know which order SUB1 and SUB2 will appear in. 458 # Not flipped: SUB11, SUB1, SUB2, TESTFN 459 # flipped: SUB2, SUB11, SUB1, TESTFN 460 flipped = all[3][1][0] != "SUB1" 461 all[3][1].sort() 462 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 463 self.assertEqual(all[flipped], (sub11_path, [], [])) 464 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) 465 self.assertEqual(all[2 - 2 * flipped], sub2_tree) 466 467 if hasattr(os, "symlink"): 468 # Walk, following symlinks. 469 for root, dirs, files in os.walk(walk_path, followlinks=True): 470 if root == link_path: 471 self.assertEqual(dirs, []) 472 self.assertEqual(files, ["tmp4"]) 473 break 474 else: 475 self.fail("Didn't follow symlink with followlinks=True") 476 477 def tearDown(self): 478 # Tear everything down. This is a decent use for bottom-up on 479 # Windows, which doesn't have a recursive delete command. The 480 # (not so) subtlety is that rmdir will fail unless the dir's 481 # kids are removed first, so bottom up is essential. 482 for root, dirs, files in os.walk(test_support.TESTFN, topdown=False): 483 for name in files: 484 os.remove(os.path.join(root, name)) 485 for name in dirs: 486 dirname = os.path.join(root, name) 487 if not os.path.islink(dirname): 488 os.rmdir(dirname) 489 else: 490 os.remove(dirname) 491 os.rmdir(test_support.TESTFN) 492 493class MakedirTests (unittest.TestCase): 494 def setUp(self): 495 os.mkdir(test_support.TESTFN) 496 497 def test_makedir(self): 498 base = test_support.TESTFN 499 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 500 os.makedirs(path) # Should work 501 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 502 os.makedirs(path) 503 504 # Try paths with a '.' in them 505 self.assertRaises(OSError, os.makedirs, os.curdir) 506 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 507 os.makedirs(path) 508 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 509 'dir5', 'dir6') 510 os.makedirs(path) 511 512 513 514 515 def tearDown(self): 516 path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3', 517 'dir4', 'dir5', 'dir6') 518 # If the tests failed, the bottom-most directory ('../dir6') 519 # may not have been created, so we look for the outermost directory 520 # that exists. 521 while not os.path.exists(path) and path != test_support.TESTFN: 522 path = os.path.dirname(path) 523 524 os.removedirs(path) 525 526class DevNullTests (unittest.TestCase): 527 def test_devnull(self): 528 f = file(os.devnull, 'w') 529 f.write('hello') 530 f.close() 531 f = file(os.devnull, 'r') 532 self.assertEqual(f.read(), '') 533 f.close() 534 535class URandomTests (unittest.TestCase): 536 537 def test_urandom_length(self): 538 self.assertEqual(len(os.urandom(0)), 0) 539 self.assertEqual(len(os.urandom(1)), 1) 540 self.assertEqual(len(os.urandom(10)), 10) 541 self.assertEqual(len(os.urandom(100)), 100) 542 self.assertEqual(len(os.urandom(1000)), 1000) 543 544 def test_urandom_value(self): 545 data1 = os.urandom(16) 546 data2 = os.urandom(16) 547 self.assertNotEqual(data1, data2) 548 549 def get_urandom_subprocess(self, count): 550 # We need to use repr() and eval() to avoid line ending conversions 551 # under Windows. 552 code = '\n'.join(( 553 'import os, sys', 554 'data = os.urandom(%s)' % count, 555 'sys.stdout.write(repr(data))', 556 'sys.stdout.flush()', 557 'print >> sys.stderr, (len(data), data)')) 558 cmd_line = [sys.executable, '-c', code] 559 p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 560 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 561 out, err = p.communicate() 562 self.assertEqual(p.wait(), 0, (p.wait(), err)) 563 out = eval(out) 564 self.assertEqual(len(out), count, err) 565 return out 566 567 def test_urandom_subprocess(self): 568 data1 = self.get_urandom_subprocess(16) 569 data2 = self.get_urandom_subprocess(16) 570 self.assertNotEqual(data1, data2) 571 572 573HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) 574 575@unittest.skipIf(HAVE_GETENTROPY, 576 "getentropy() does not use a file descriptor") 577class URandomFDTests(unittest.TestCase): 578 @unittest.skipUnless(resource, "test requires the resource module") 579 def test_urandom_failure(self): 580 # Check urandom() failing when it is not able to open /dev/random. 581 # We spawn a new process to make the test more robust (if getrlimit() 582 # failed to restore the file descriptor limit after this, the whole 583 # test suite would crash; this actually happened on the OS X Tiger 584 # buildbot). 585 code = """if 1: 586 import errno 587 import os 588 import resource 589 590 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 591 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 592 try: 593 os.urandom(16) 594 except OSError as e: 595 assert e.errno == errno.EMFILE, e.errno 596 else: 597 raise AssertionError("OSError not raised") 598 """ 599 assert_python_ok('-c', code) 600 601 602class ExecTests(unittest.TestCase): 603 604 def test_execvpe_with_bad_arglist(self): 605 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 606 607 def test_execve_invalid_env(self): 608 args = [sys.executable, '-c', 'pass'] 609 610 # null character in the enviroment variable name 611 newenv = os.environ.copy() 612 newenv["FRUIT\0VEGETABLE"] = "cabbage" 613 with self.assertRaises(TypeError): 614 os.execve(args[0], args, newenv) 615 616 # null character in the enviroment variable value 617 newenv = os.environ.copy() 618 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 619 with self.assertRaises(TypeError): 620 os.execve(args[0], args, newenv) 621 622 # equal character in the enviroment variable name 623 newenv = os.environ.copy() 624 newenv["FRUIT=ORANGE"] = "lemon" 625 with self.assertRaises(ValueError): 626 os.execve(args[0], args, newenv) 627 628 629@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 630class Win32ErrorTests(unittest.TestCase): 631 def test_rename(self): 632 self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak") 633 634 def test_remove(self): 635 self.assertRaises(WindowsError, os.remove, test_support.TESTFN) 636 637 def test_chdir(self): 638 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) 639 640 def test_mkdir(self): 641 f = open(test_support.TESTFN, "w") 642 try: 643 self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN) 644 finally: 645 f.close() 646 os.unlink(test_support.TESTFN) 647 648 def test_utime(self): 649 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None) 650 651 def test_chmod(self): 652 self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0) 653 654class TestInvalidFD(unittest.TestCase): 655 singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat", 656 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 657 #singles.append("close") 658 #We omit close because it doesn't raise an exception on some platforms 659 def get_single(f): 660 def helper(self): 661 if hasattr(os, f): 662 self.check(getattr(os, f)) 663 return helper 664 for f in singles: 665 locals()["test_"+f] = get_single(f) 666 667 def check(self, f, *args): 668 try: 669 f(test_support.make_bad_fd(), *args) 670 except OSError as e: 671 self.assertEqual(e.errno, errno.EBADF) 672 else: 673 self.fail("%r didn't raise an OSError with a bad file descriptor" 674 % f) 675 676 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 677 def test_isatty(self): 678 self.assertEqual(os.isatty(test_support.make_bad_fd()), False) 679 680 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 681 def test_closerange(self): 682 fd = test_support.make_bad_fd() 683 # Make sure none of the descriptors we are about to close are 684 # currently valid (issue 6542). 685 for i in range(10): 686 try: os.fstat(fd+i) 687 except OSError: 688 pass 689 else: 690 break 691 if i < 2: 692 raise unittest.SkipTest( 693 "Unable to acquire a range of invalid file descriptors") 694 self.assertEqual(os.closerange(fd, fd + i-1), None) 695 696 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 697 def test_dup2(self): 698 self.check(os.dup2, 20) 699 700 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 701 def test_fchmod(self): 702 self.check(os.fchmod, 0) 703 704 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 705 def test_fchown(self): 706 self.check(os.fchown, -1, -1) 707 708 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 709 def test_fpathconf(self): 710 self.check(os.fpathconf, "PC_NAME_MAX") 711 712 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 713 def test_ftruncate(self): 714 self.check(os.ftruncate, 0) 715 716 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 717 def test_lseek(self): 718 self.check(os.lseek, 0, 0) 719 720 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 721 def test_read(self): 722 self.check(os.read, 1) 723 724 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 725 def test_tcsetpgrpt(self): 726 self.check(os.tcsetpgrp, 0) 727 728 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 729 def test_write(self): 730 self.check(os.write, " ") 731 732@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 733class PosixUidGidTests(unittest.TestCase): 734 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 735 def test_setuid(self): 736 if os.getuid() != 0: 737 self.assertRaises(os.error, os.setuid, 0) 738 self.assertRaises(OverflowError, os.setuid, 1<<32) 739 740 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 741 def test_setgid(self): 742 if os.getuid() != 0: 743 self.assertRaises(os.error, os.setgid, 0) 744 self.assertRaises(OverflowError, os.setgid, 1<<32) 745 746 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 747 def test_seteuid(self): 748 if os.getuid() != 0: 749 self.assertRaises(os.error, os.seteuid, 0) 750 self.assertRaises(OverflowError, os.seteuid, 1<<32) 751 752 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 753 def test_setegid(self): 754 if os.getuid() != 0: 755 self.assertRaises(os.error, os.setegid, 0) 756 self.assertRaises(OverflowError, os.setegid, 1<<32) 757 758 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 759 def test_setreuid(self): 760 if os.getuid() != 0: 761 self.assertRaises(os.error, os.setreuid, 0, 0) 762 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 763 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 764 765 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 766 def test_setreuid_neg1(self): 767 # Needs to accept -1. We run this in a subprocess to avoid 768 # altering the test runner's process state (issue8045). 769 subprocess.check_call([ 770 sys.executable, '-c', 771 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 772 773 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 774 def test_setregid(self): 775 if os.getuid() != 0: 776 self.assertRaises(os.error, os.setregid, 0, 0) 777 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 778 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 779 780 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 781 def test_setregid_neg1(self): 782 # Needs to accept -1. We run this in a subprocess to avoid 783 # altering the test runner's process state (issue8045). 784 subprocess.check_call([ 785 sys.executable, '-c', 786 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 787 788 789@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 790class Win32KillTests(unittest.TestCase): 791 def _kill(self, sig): 792 # Start sys.executable as a subprocess and communicate from the 793 # subprocess to the parent that the interpreter is ready. When it 794 # becomes ready, send *sig* via os.kill to the subprocess and check 795 # that the return code is equal to *sig*. 796 import ctypes 797 from ctypes import wintypes 798 import msvcrt 799 800 # Since we can't access the contents of the process' stdout until the 801 # process has exited, use PeekNamedPipe to see what's inside stdout 802 # without waiting. This is done so we can tell that the interpreter 803 # is started and running at a point where it could handle a signal. 804 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 805 PeekNamedPipe.restype = wintypes.BOOL 806 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 807 ctypes.POINTER(ctypes.c_char), # stdout buf 808 wintypes.DWORD, # Buffer size 809 ctypes.POINTER(wintypes.DWORD), # bytes read 810 ctypes.POINTER(wintypes.DWORD), # bytes avail 811 ctypes.POINTER(wintypes.DWORD)) # bytes left 812 msg = "running" 813 proc = subprocess.Popen([sys.executable, "-c", 814 "import sys;" 815 "sys.stdout.write('{}');" 816 "sys.stdout.flush();" 817 "input()".format(msg)], 818 stdout=subprocess.PIPE, 819 stderr=subprocess.PIPE, 820 stdin=subprocess.PIPE) 821 self.addCleanup(proc.stdout.close) 822 self.addCleanup(proc.stderr.close) 823 self.addCleanup(proc.stdin.close) 824 825 count, max = 0, 100 826 while count < max and proc.poll() is None: 827 # Create a string buffer to store the result of stdout from the pipe 828 buf = ctypes.create_string_buffer(len(msg)) 829 # Obtain the text currently in proc.stdout 830 # Bytes read/avail/left are left as NULL and unused 831 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 832 buf, ctypes.sizeof(buf), None, None, None) 833 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 834 if buf.value: 835 self.assertEqual(msg, buf.value) 836 break 837 time.sleep(0.1) 838 count += 1 839 else: 840 self.fail("Did not receive communication from the subprocess") 841 842 os.kill(proc.pid, sig) 843 self.assertEqual(proc.wait(), sig) 844 845 def test_kill_sigterm(self): 846 # SIGTERM doesn't mean anything special, but make sure it works 847 self._kill(signal.SIGTERM) 848 849 def test_kill_int(self): 850 # os.kill on Windows can take an int which gets set as the exit code 851 self._kill(100) 852 853 def _kill_with_event(self, event, name): 854 tagname = "test_os_%s" % uuid.uuid1() 855 m = mmap.mmap(-1, 1, tagname) 856 m[0] = '0' 857 # Run a script which has console control handling enabled. 858 proc = subprocess.Popen([sys.executable, 859 os.path.join(os.path.dirname(__file__), 860 "win_console_handler.py"), tagname], 861 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 862 # Let the interpreter startup before we send signals. See #3137. 863 count, max = 0, 20 864 while count < max and proc.poll() is None: 865 if m[0] == '1': 866 break 867 time.sleep(0.5) 868 count += 1 869 else: 870 self.fail("Subprocess didn't finish initialization") 871 os.kill(proc.pid, event) 872 # proc.send_signal(event) could also be done here. 873 # Allow time for the signal to be passed and the process to exit. 874 time.sleep(0.5) 875 if not proc.poll(): 876 # Forcefully kill the process if we weren't able to signal it. 877 os.kill(proc.pid, signal.SIGINT) 878 self.fail("subprocess did not stop on {}".format(name)) 879 880 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 881 def test_CTRL_C_EVENT(self): 882 from ctypes import wintypes 883 import ctypes 884 885 # Make a NULL value by creating a pointer with no argument. 886 NULL = ctypes.POINTER(ctypes.c_int)() 887 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 888 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 889 wintypes.BOOL) 890 SetConsoleCtrlHandler.restype = wintypes.BOOL 891 892 # Calling this with NULL and FALSE causes the calling process to 893 # handle Ctrl+C, rather than ignore it. This property is inherited 894 # by subprocesses. 895 SetConsoleCtrlHandler(NULL, 0) 896 897 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 898 899 def test_CTRL_BREAK_EVENT(self): 900 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 901 902 903@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 904class Win32ListdirTests(unittest.TestCase): 905 """Test listdir on Windows.""" 906 907 def setUp(self): 908 self.created_paths = [] 909 for i in range(2): 910 dir_name = 'SUB%d' % i 911 dir_path = os.path.join(support.TESTFN, dir_name) 912 file_name = 'FILE%d' % i 913 file_path = os.path.join(support.TESTFN, file_name) 914 os.makedirs(dir_path) 915 with open(file_path, 'w') as f: 916 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 917 self.created_paths.extend([dir_name, file_name]) 918 self.created_paths.sort() 919 920 def tearDown(self): 921 shutil.rmtree(support.TESTFN) 922 923 def test_listdir_no_extended_path(self): 924 """Test when the path is not an "extended" path.""" 925 # unicode 926 fs_encoding = sys.getfilesystemencoding() 927 self.assertEqual( 928 sorted(os.listdir(support.TESTFN.decode(fs_encoding))), 929 [path.decode(fs_encoding) for path in self.created_paths]) 930 931 # bytes 932 self.assertEqual( 933 sorted(os.listdir(os.fsencode(support.TESTFN))), 934 self.created_paths) 935 936 def test_listdir_extended_path(self): 937 """Test when the path starts with '\\\\?\\'.""" 938 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 939 # unicode 940 fs_encoding = sys.getfilesystemencoding() 941 path = u'\\\\?\\' + os.path.abspath(support.TESTFN.decode(fs_encoding)) 942 self.assertEqual( 943 sorted(os.listdir(path)), 944 [path.decode(fs_encoding) for path in self.created_paths]) 945 946 # bytes 947 path = b'\\\\?\\' + os.path.abspath(support.TESTFN) 948 self.assertEqual( 949 sorted(os.listdir(path)), 950 self.created_paths) 951 952 953class SpawnTests(unittest.TestCase): 954 def _test_invalid_env(self, spawn): 955 args = [sys.executable, '-c', 'pass'] 956 957 # null character in the enviroment variable name 958 newenv = os.environ.copy() 959 newenv["FRUIT\0VEGETABLE"] = "cabbage" 960 try: 961 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 962 except TypeError: 963 pass 964 else: 965 self.assertEqual(exitcode, 127) 966 967 # null character in the enviroment variable value 968 newenv = os.environ.copy() 969 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 970 try: 971 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 972 except TypeError: 973 pass 974 else: 975 self.assertEqual(exitcode, 127) 976 977 # equal character in the enviroment variable name 978 newenv = os.environ.copy() 979 newenv["FRUIT=ORANGE"] = "lemon" 980 try: 981 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 982 except ValueError: 983 pass 984 else: 985 self.assertEqual(exitcode, 127) 986 987 # equal character in the enviroment variable value 988 filename = test_support.TESTFN 989 self.addCleanup(test_support.unlink, filename) 990 with open(filename, "w") as fp: 991 fp.write('import sys, os\n' 992 'if os.getenv("FRUIT") != "orange=lemon":\n' 993 ' raise AssertionError') 994 args = [sys.executable, filename] 995 newenv = os.environ.copy() 996 newenv["FRUIT"] = "orange=lemon" 997 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 998 self.assertEqual(exitcode, 0) 999 1000 @unittest.skipUnless(hasattr(os, 'spawnve'), 'test needs os.spawnve()') 1001 def test_spawnve_invalid_env(self): 1002 self._test_invalid_env(os.spawnve) 1003 1004 @unittest.skipUnless(hasattr(os, 'spawnvpe'), 'test needs os.spawnvpe()') 1005 def test_spawnvpe_invalid_env(self): 1006 self._test_invalid_env(os.spawnvpe) 1007 1008 1009def test_main(): 1010 test_support.run_unittest( 1011 FileTests, 1012 TemporaryFileTests, 1013 StatAttributeTests, 1014 EnvironTests, 1015 WalkTests, 1016 MakedirTests, 1017 DevNullTests, 1018 URandomTests, 1019 URandomFDTests, 1020 ExecTests, 1021 Win32ErrorTests, 1022 TestInvalidFD, 1023 PosixUidGidTests, 1024 Win32KillTests, 1025 SpawnTests, 1026 ) 1027 1028if __name__ == "__main__": 1029 test_main() 1030