• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import os
6import errno
7import unittest
8import warnings
9import sys
10import signal
11import subprocess
12import sysconfig
13import textwrap
14import time
15try:
16    import resource
17except ImportError:
18    resource = None
19
20from test import test_support
21from test.script_helper import assert_python_ok
22import mmap
23import uuid
24
25warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
26warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)
27
28# Tests creating TESTFN
29class FileTests(unittest.TestCase):
30    def setUp(self):
31        if os.path.exists(test_support.TESTFN):
32            os.unlink(test_support.TESTFN)
33    tearDown = setUp
34
35    def test_access(self):
36        f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
37        os.close(f)
38        self.assertTrue(os.access(test_support.TESTFN, os.W_OK))
39
40    def test_closerange(self):
41        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
42        # We must allocate two consecutive file descriptors, otherwise
43        # it will mess up other file descriptors (perhaps even the three
44        # standard ones).
45        second = os.dup(first)
46        try:
47            retries = 0
48            while second != first + 1:
49                os.close(first)
50                retries += 1
51                if retries > 10:
52                    # XXX test skipped
53                    self.skipTest("couldn't allocate two consecutive fds")
54                first, second = second, os.dup(second)
55        finally:
56            os.close(second)
57        # close a fd that is open, and one that isn't
58        os.closerange(first, first + 2)
59        self.assertRaises(OSError, os.write, first, "a")
60
61    @test_support.cpython_only
62    def test_rename(self):
63        path = unicode(test_support.TESTFN)
64        old = sys.getrefcount(path)
65        self.assertRaises(TypeError, os.rename, path, 0)
66        new = sys.getrefcount(path)
67        self.assertEqual(old, new)
68
69
70class TemporaryFileTests(unittest.TestCase):
71    def setUp(self):
72        self.files = []
73        os.mkdir(test_support.TESTFN)
74
75    def tearDown(self):
76        for name in self.files:
77            os.unlink(name)
78        os.rmdir(test_support.TESTFN)
79
80    def check_tempfile(self, name):
81        # make sure it doesn't already exist:
82        self.assertFalse(os.path.exists(name),
83                    "file already exists for temporary file")
84        # make sure we can create the file
85        open(name, "w")
86        self.files.append(name)
87
88    @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()')
89    def test_tempnam(self):
90        with warnings.catch_warnings():
91            warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
92                                    r"test_os$")
93            warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
94            self.check_tempfile(os.tempnam())
95
96            name = os.tempnam(test_support.TESTFN)
97            self.check_tempfile(name)
98
99            name = os.tempnam(test_support.TESTFN, "pfx")
100            self.assertTrue(os.path.basename(name)[:3] == "pfx")
101            self.check_tempfile(name)
102
103    @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()')
104    def test_tmpfile(self):
105        # As with test_tmpnam() below, the Windows implementation of tmpfile()
106        # attempts to create a file in the root directory of the current drive.
107        # On Vista and Server 2008, this test will always fail for normal users
108        # as writing to the root directory requires elevated privileges.  With
109        # XP and below, the semantics of tmpfile() are the same, but the user
110        # running the test is more likely to have administrative privileges on
111        # their account already.  If that's the case, then os.tmpfile() should
112        # work.  In order to make this test as useful as possible, rather than
113        # trying to detect Windows versions or whether or not the user has the
114        # right permissions, just try and create a file in the root directory
115        # and see if it raises a 'Permission denied' OSError.  If it does, then
116        # test that a subsequent call to os.tmpfile() raises the same error. If
117        # it doesn't, assume we're on XP or below and the user running the test
118        # has administrative privileges, and proceed with the test as normal.
119        with warnings.catch_warnings():
120            warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
121
122            if sys.platform == 'win32':
123                name = '\\python_test_os_test_tmpfile.txt'
124                if os.path.exists(name):
125                    os.remove(name)
126                try:
127                    fp = open(name, 'w')
128                except IOError, first:
129                    # open() failed, assert tmpfile() fails in the same way.
130                    # Although open() raises an IOError and os.tmpfile() raises an
131                    # OSError(), 'args' will be (13, 'Permission denied') in both
132                    # cases.
133                    try:
134                        fp = os.tmpfile()
135                    except OSError, second:
136                        self.assertEqual(first.args, second.args)
137                    else:
138                        self.fail("expected os.tmpfile() to raise OSError")
139                    return
140                else:
141                    # open() worked, therefore, tmpfile() should work.  Close our
142                    # dummy file and proceed with the test as normal.
143                    fp.close()
144                    os.remove(name)
145
146            fp = os.tmpfile()
147            fp.write("foobar")
148            fp.seek(0,0)
149            s = fp.read()
150            fp.close()
151            self.assertTrue(s == "foobar")
152
153    @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()')
154    def test_tmpnam(self):
155        with warnings.catch_warnings():
156            warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
157                                    r"test_os$")
158            warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)
159
160            name = os.tmpnam()
161            if sys.platform in ("win32",):
162                # The Windows tmpnam() seems useless.  From the MS docs:
163                #
164                #     The character string that tmpnam creates consists of
165                #     the path prefix, defined by the entry P_tmpdir in the
166                #     file STDIO.H, followed by a sequence consisting of the
167                #     digit characters '0' through '9'; the numerical value
168                #     of this string is in the range 1 - 65,535.  Changing the
169                #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not
170                #     change the operation of tmpnam.
171                #
172                # The really bizarre part is that, at least under MSVC6,
173                # P_tmpdir is "\\".  That is, the path returned refers to
174                # the root of the current drive.  That's a terrible place to
175                # put temp files, and, depending on privileges, the user
176                # may not even be able to open a file in the root directory.
177                self.assertFalse(os.path.exists(name),
178                            "file already exists for temporary file")
179            else:
180                self.check_tempfile(name)
181
182# Test attributes on return values from os.*stat* family.
183class StatAttributeTests(unittest.TestCase):
184    def setUp(self):
185        os.mkdir(test_support.TESTFN)
186        self.fname = os.path.join(test_support.TESTFN, "f1")
187        f = open(self.fname, 'wb')
188        f.write("ABC")
189        f.close()
190
191    def tearDown(self):
192        os.unlink(self.fname)
193        os.rmdir(test_support.TESTFN)
194
195    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
196    def test_stat_attributes(self):
197        import stat
198        result = os.stat(self.fname)
199
200        # Make sure direct access works
201        self.assertEqual(result[stat.ST_SIZE], 3)
202        self.assertEqual(result.st_size, 3)
203
204        # Make sure all the attributes are there
205        members = dir(result)
206        for name in dir(stat):
207            if name[:3] == 'ST_':
208                attr = name.lower()
209                if name.endswith("TIME"):
210                    def trunc(x): return int(x)
211                else:
212                    def trunc(x): return x
213                self.assertEqual(trunc(getattr(result, attr)),
214                                 result[getattr(stat, name)])
215                self.assertIn(attr, members)
216
217        try:
218            result[200]
219            self.fail("No exception raised")
220        except IndexError:
221            pass
222
223        # Make sure that assignment fails
224        try:
225            result.st_mode = 1
226            self.fail("No exception raised")
227        except (AttributeError, TypeError):
228            pass
229
230        try:
231            result.st_rdev = 1
232            self.fail("No exception raised")
233        except (AttributeError, TypeError):
234            pass
235
236        try:
237            result.parrot = 1
238            self.fail("No exception raised")
239        except AttributeError:
240            pass
241
242        # Use the stat_result constructor with a too-short tuple.
243        try:
244            result2 = os.stat_result((10,))
245            self.fail("No exception raised")
246        except TypeError:
247            pass
248
249        # Use the constructor with a too-long tuple.
250        try:
251            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
252        except TypeError:
253            pass
254
255
256    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
257    def test_statvfs_attributes(self):
258        try:
259            result = os.statvfs(self.fname)
260        except OSError, e:
261            # On AtheOS, glibc always returns ENOSYS
262            if e.errno == errno.ENOSYS:
263                self.skipTest('glibc always returns ENOSYS on AtheOS')
264
265        # Make sure direct access works
266        self.assertEqual(result.f_bfree, result[3])
267
268        # Make sure all the attributes are there.
269        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
270                    'ffree', 'favail', 'flag', 'namemax')
271        for value, member in enumerate(members):
272            self.assertEqual(getattr(result, 'f_' + member), result[value])
273
274        # Make sure that assignment really fails
275        try:
276            result.f_bfree = 1
277            self.fail("No exception raised")
278        except TypeError:
279            pass
280
281        try:
282            result.parrot = 1
283            self.fail("No exception raised")
284        except AttributeError:
285            pass
286
287        # Use the constructor with a too-short tuple.
288        try:
289            result2 = os.statvfs_result((10,))
290            self.fail("No exception raised")
291        except TypeError:
292            pass
293
294        # Use the constructor with a too-long tuple.
295        try:
296            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
297        except TypeError:
298            pass
299
300    def test_utime_dir(self):
301        delta = 1000000
302        st = os.stat(test_support.TESTFN)
303        # round to int, because some systems may support sub-second
304        # time stamps in stat, but not in utime.
305        os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
306        st2 = os.stat(test_support.TESTFN)
307        self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
308
309    # Restrict tests to Win32, since there is no guarantee other
310    # systems support centiseconds
311    def get_file_system(path):
312        if sys.platform == 'win32':
313            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
314            import ctypes
315            kernel32 = ctypes.windll.kernel32
316            buf = ctypes.create_string_buffer("", 100)
317            if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
318                return buf.value
319
320    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
321    @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
322                         "requires NTFS")
323    def test_1565150(self):
324        t1 = 1159195039.25
325        os.utime(self.fname, (t1, t1))
326        self.assertEqual(os.stat(self.fname).st_mtime, t1)
327
328    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
329    @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
330                         "requires NTFS")
331    def test_large_time(self):
332        t1 = 5000000000 # some day in 2128
333        os.utime(self.fname, (t1, t1))
334        self.assertEqual(os.stat(self.fname).st_mtime, t1)
335
336    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
337    def test_1686475(self):
338        # Verify that an open file can be stat'ed
339        try:
340            os.stat(r"c:\pagefile.sys")
341        except WindowsError, e:
342            if e.errno == 2: # file does not exist; cannot run test
343                self.skipTest(r'c:\pagefile.sys does not exist')
344            self.fail("Could not stat pagefile.sys")
345
346from test import mapping_tests
347
348class EnvironTests(mapping_tests.BasicTestMappingProtocol):
349    """check that os.environ object conform to mapping protocol"""
350    type2test = None
351    def _reference(self):
352        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
353    def _empty_mapping(self):
354        os.environ.clear()
355        return os.environ
356    def setUp(self):
357        self.__save = dict(os.environ)
358        os.environ.clear()
359    def tearDown(self):
360        os.environ.clear()
361        os.environ.update(self.__save)
362
363    # Bug 1110478
364    def test_update2(self):
365        if os.path.exists("/bin/sh"):
366            os.environ.update(HELLO="World")
367            with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
368                value = popen.read().strip()
369                self.assertEqual(value, "World")
370
371    # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
372    # #13415).
373    @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')),
374                     "due to known OS bug: see issue #13415")
375    def test_unset_error(self):
376        if sys.platform == "win32":
377            # an environment variable is limited to 32,767 characters
378            key = 'x' * 50000
379            self.assertRaises(ValueError, os.environ.__delitem__, key)
380        else:
381            # "=" is not allowed in a variable name
382            key = 'key='
383            self.assertRaises(OSError, os.environ.__delitem__, key)
384
385class WalkTests(unittest.TestCase):
386    """Tests for os.walk()."""
387
388    def test_traversal(self):
389        import os
390        from os.path import join
391
392        # Build:
393        #     TESTFN/
394        #       TEST1/              a file kid and two directory kids
395        #         tmp1
396        #         SUB1/             a file kid and a directory kid
397        #           tmp2
398        #           SUB11/          no kids
399        #         SUB2/             a file kid and a dirsymlink kid
400        #           tmp3
401        #           link/           a symlink to TESTFN.2
402        #       TEST2/
403        #         tmp4              a lone file
404        walk_path = join(test_support.TESTFN, "TEST1")
405        sub1_path = join(walk_path, "SUB1")
406        sub11_path = join(sub1_path, "SUB11")
407        sub2_path = join(walk_path, "SUB2")
408        tmp1_path = join(walk_path, "tmp1")
409        tmp2_path = join(sub1_path, "tmp2")
410        tmp3_path = join(sub2_path, "tmp3")
411        link_path = join(sub2_path, "link")
412        t2_path = join(test_support.TESTFN, "TEST2")
413        tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")
414
415        # Create stuff.
416        os.makedirs(sub11_path)
417        os.makedirs(sub2_path)
418        os.makedirs(t2_path)
419        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
420            f = file(path, "w")
421            f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
422            f.close()
423        if hasattr(os, "symlink"):
424            os.symlink(os.path.abspath(t2_path), link_path)
425            sub2_tree = (sub2_path, ["link"], ["tmp3"])
426        else:
427            sub2_tree = (sub2_path, [], ["tmp3"])
428
429        # Walk top-down.
430        all = list(os.walk(walk_path))
431        self.assertEqual(len(all), 4)
432        # We can't know which order SUB1 and SUB2 will appear in.
433        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
434        #     flipped:  TESTFN, SUB2, SUB1, SUB11
435        flipped = all[0][1][0] != "SUB1"
436        all[0][1].sort()
437        self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
438        self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
439        self.assertEqual(all[2 + flipped], (sub11_path, [], []))
440        self.assertEqual(all[3 - 2 * flipped], sub2_tree)
441
442        # Prune the search.
443        all = []
444        for root, dirs, files in os.walk(walk_path):
445            all.append((root, dirs, files))
446            # Don't descend into SUB1.
447            if 'SUB1' in dirs:
448                # Note that this also mutates the dirs we appended to all!
449                dirs.remove('SUB1')
450        self.assertEqual(len(all), 2)
451        self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
452        self.assertEqual(all[1], sub2_tree)
453
454        # Walk bottom-up.
455        all = list(os.walk(walk_path, topdown=False))
456        self.assertEqual(len(all), 4)
457        # We can't know which order SUB1 and SUB2 will appear in.
458        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
459        #     flipped:  SUB2, SUB11, SUB1, TESTFN
460        flipped = all[3][1][0] != "SUB1"
461        all[3][1].sort()
462        self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
463        self.assertEqual(all[flipped], (sub11_path, [], []))
464        self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
465        self.assertEqual(all[2 - 2 * flipped], sub2_tree)
466
467        if hasattr(os, "symlink"):
468            # Walk, following symlinks.
469            for root, dirs, files in os.walk(walk_path, followlinks=True):
470                if root == link_path:
471                    self.assertEqual(dirs, [])
472                    self.assertEqual(files, ["tmp4"])
473                    break
474            else:
475                self.fail("Didn't follow symlink with followlinks=True")
476
477    def tearDown(self):
478        # Tear everything down.  This is a decent use for bottom-up on
479        # Windows, which doesn't have a recursive delete command.  The
480        # (not so) subtlety is that rmdir will fail unless the dir's
481        # kids are removed first, so bottom up is essential.
482        for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):
483            for name in files:
484                os.remove(os.path.join(root, name))
485            for name in dirs:
486                dirname = os.path.join(root, name)
487                if not os.path.islink(dirname):
488                    os.rmdir(dirname)
489                else:
490                    os.remove(dirname)
491        os.rmdir(test_support.TESTFN)
492
493class MakedirTests (unittest.TestCase):
494    def setUp(self):
495        os.mkdir(test_support.TESTFN)
496
497    def test_makedir(self):
498        base = test_support.TESTFN
499        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
500        os.makedirs(path)             # Should work
501        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
502        os.makedirs(path)
503
504        # Try paths with a '.' in them
505        self.assertRaises(OSError, os.makedirs, os.curdir)
506        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
507        os.makedirs(path)
508        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
509                            'dir5', 'dir6')
510        os.makedirs(path)
511
512
513
514
515    def tearDown(self):
516        path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',
517                            'dir4', 'dir5', 'dir6')
518        # If the tests failed, the bottom-most directory ('../dir6')
519        # may not have been created, so we look for the outermost directory
520        # that exists.
521        while not os.path.exists(path) and path != test_support.TESTFN:
522            path = os.path.dirname(path)
523
524        os.removedirs(path)
525
526class DevNullTests (unittest.TestCase):
527    def test_devnull(self):
528        f = file(os.devnull, 'w')
529        f.write('hello')
530        f.close()
531        f = file(os.devnull, 'r')
532        self.assertEqual(f.read(), '')
533        f.close()
534
535class URandomTests (unittest.TestCase):
536
537    def test_urandom_length(self):
538        self.assertEqual(len(os.urandom(0)), 0)
539        self.assertEqual(len(os.urandom(1)), 1)
540        self.assertEqual(len(os.urandom(10)), 10)
541        self.assertEqual(len(os.urandom(100)), 100)
542        self.assertEqual(len(os.urandom(1000)), 1000)
543
544    def test_urandom_value(self):
545        data1 = os.urandom(16)
546        data2 = os.urandom(16)
547        self.assertNotEqual(data1, data2)
548
549    def get_urandom_subprocess(self, count):
550        # We need to use repr() and eval() to avoid line ending conversions
551        # under Windows.
552        code = '\n'.join((
553            'import os, sys',
554            'data = os.urandom(%s)' % count,
555            'sys.stdout.write(repr(data))',
556            'sys.stdout.flush()',
557            'print >> sys.stderr, (len(data), data)'))
558        cmd_line = [sys.executable, '-c', code]
559        p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
560                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
561        out, err = p.communicate()
562        self.assertEqual(p.wait(), 0, (p.wait(), err))
563        out = eval(out)
564        self.assertEqual(len(out), count, err)
565        return out
566
567    def test_urandom_subprocess(self):
568        data1 = self.get_urandom_subprocess(16)
569        data2 = self.get_urandom_subprocess(16)
570        self.assertNotEqual(data1, data2)
571
572
573HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
574
575@unittest.skipIf(HAVE_GETENTROPY,
576                 "getentropy() does not use a file descriptor")
577class URandomFDTests(unittest.TestCase):
578    @unittest.skipUnless(resource, "test requires the resource module")
579    def test_urandom_failure(self):
580        # Check urandom() failing when it is not able to open /dev/random.
581        # We spawn a new process to make the test more robust (if getrlimit()
582        # failed to restore the file descriptor limit after this, the whole
583        # test suite would crash; this actually happened on the OS X Tiger
584        # buildbot).
585        code = """if 1:
586            import errno
587            import os
588            import resource
589
590            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
591            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
592            try:
593                os.urandom(16)
594            except OSError as e:
595                assert e.errno == errno.EMFILE, e.errno
596            else:
597                raise AssertionError("OSError not raised")
598            """
599        assert_python_ok('-c', code)
600
601
602class ExecvpeTests(unittest.TestCase):
603
604    def test_execvpe_with_bad_arglist(self):
605        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
606
607
608@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
609class Win32ErrorTests(unittest.TestCase):
610    def test_rename(self):
611        self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
612
613    def test_remove(self):
614        self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
615
616    def test_chdir(self):
617        self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
618
619    def test_mkdir(self):
620        f = open(test_support.TESTFN, "w")
621        try:
622            self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)
623        finally:
624            f.close()
625            os.unlink(test_support.TESTFN)
626
627    def test_utime(self):
628        self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
629
630    def test_chmod(self):
631        self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)
632
633class TestInvalidFD(unittest.TestCase):
634    singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",
635               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
636    #singles.append("close")
637    #We omit close because it doesn't raise an exception on some platforms
638    def get_single(f):
639        def helper(self):
640            if  hasattr(os, f):
641                self.check(getattr(os, f))
642        return helper
643    for f in singles:
644        locals()["test_"+f] = get_single(f)
645
646    def check(self, f, *args):
647        try:
648            f(test_support.make_bad_fd(), *args)
649        except OSError as e:
650            self.assertEqual(e.errno, errno.EBADF)
651        else:
652            self.fail("%r didn't raise an OSError with a bad file descriptor"
653                      % f)
654
655    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
656    def test_isatty(self):
657        self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
658
659    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
660    def test_closerange(self):
661        fd = test_support.make_bad_fd()
662        # Make sure none of the descriptors we are about to close are
663        # currently valid (issue 6542).
664        for i in range(10):
665            try: os.fstat(fd+i)
666            except OSError:
667                pass
668            else:
669                break
670        if i < 2:
671            raise unittest.SkipTest(
672                "Unable to acquire a range of invalid file descriptors")
673        self.assertEqual(os.closerange(fd, fd + i-1), None)
674
675    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
676    def test_dup2(self):
677        self.check(os.dup2, 20)
678
679    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
680    def test_fchmod(self):
681        self.check(os.fchmod, 0)
682
683    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
684    def test_fchown(self):
685        self.check(os.fchown, -1, -1)
686
687    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
688    def test_fpathconf(self):
689        self.check(os.fpathconf, "PC_NAME_MAX")
690
691    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
692    def test_ftruncate(self):
693        self.check(os.ftruncate, 0)
694
695    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
696    def test_lseek(self):
697        self.check(os.lseek, 0, 0)
698
699    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
700    def test_read(self):
701        self.check(os.read, 1)
702
703    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
704    def test_tcsetpgrpt(self):
705        self.check(os.tcsetpgrp, 0)
706
707    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
708    def test_write(self):
709        self.check(os.write, " ")
710
711@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
712class PosixUidGidTests(unittest.TestCase):
713    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
714    def test_setuid(self):
715        if os.getuid() != 0:
716            self.assertRaises(os.error, os.setuid, 0)
717        self.assertRaises(OverflowError, os.setuid, 1<<32)
718
719    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
720    def test_setgid(self):
721        if os.getuid() != 0:
722            self.assertRaises(os.error, os.setgid, 0)
723        self.assertRaises(OverflowError, os.setgid, 1<<32)
724
725    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
726    def test_seteuid(self):
727        if os.getuid() != 0:
728            self.assertRaises(os.error, os.seteuid, 0)
729        self.assertRaises(OverflowError, os.seteuid, 1<<32)
730
731    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
732    def test_setegid(self):
733        if os.getuid() != 0:
734            self.assertRaises(os.error, os.setegid, 0)
735        self.assertRaises(OverflowError, os.setegid, 1<<32)
736
737    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
738    def test_setreuid(self):
739        if os.getuid() != 0:
740            self.assertRaises(os.error, os.setreuid, 0, 0)
741        self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
742        self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
743
744    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
745    def test_setreuid_neg1(self):
746        # Needs to accept -1.  We run this in a subprocess to avoid
747        # altering the test runner's process state (issue8045).
748        subprocess.check_call([
749                sys.executable, '-c',
750                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
751
752    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
753    def test_setregid(self):
754        if os.getuid() != 0:
755            self.assertRaises(os.error, os.setregid, 0, 0)
756        self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
757        self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
758
759    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
760    def test_setregid_neg1(self):
761        # Needs to accept -1.  We run this in a subprocess to avoid
762        # altering the test runner's process state (issue8045).
763        subprocess.check_call([
764                sys.executable, '-c',
765                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
766
767
768@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
769class Win32KillTests(unittest.TestCase):
770    def _kill(self, sig):
771        # Start sys.executable as a subprocess and communicate from the
772        # subprocess to the parent that the interpreter is ready. When it
773        # becomes ready, send *sig* via os.kill to the subprocess and check
774        # that the return code is equal to *sig*.
775        import ctypes
776        from ctypes import wintypes
777        import msvcrt
778
779        # Since we can't access the contents of the process' stdout until the
780        # process has exited, use PeekNamedPipe to see what's inside stdout
781        # without waiting. This is done so we can tell that the interpreter
782        # is started and running at a point where it could handle a signal.
783        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
784        PeekNamedPipe.restype = wintypes.BOOL
785        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
786                                  ctypes.POINTER(ctypes.c_char), # stdout buf
787                                  wintypes.DWORD, # Buffer size
788                                  ctypes.POINTER(wintypes.DWORD), # bytes read
789                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
790                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
791        msg = "running"
792        proc = subprocess.Popen([sys.executable, "-c",
793                                 "import sys;"
794                                 "sys.stdout.write('{}');"
795                                 "sys.stdout.flush();"
796                                 "input()".format(msg)],
797                                stdout=subprocess.PIPE,
798                                stderr=subprocess.PIPE,
799                                stdin=subprocess.PIPE)
800        self.addCleanup(proc.stdout.close)
801        self.addCleanup(proc.stderr.close)
802        self.addCleanup(proc.stdin.close)
803
804        count, max = 0, 100
805        while count < max and proc.poll() is None:
806            # Create a string buffer to store the result of stdout from the pipe
807            buf = ctypes.create_string_buffer(len(msg))
808            # Obtain the text currently in proc.stdout
809            # Bytes read/avail/left are left as NULL and unused
810            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
811                                 buf, ctypes.sizeof(buf), None, None, None)
812            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
813            if buf.value:
814                self.assertEqual(msg, buf.value)
815                break
816            time.sleep(0.1)
817            count += 1
818        else:
819            self.fail("Did not receive communication from the subprocess")
820
821        os.kill(proc.pid, sig)
822        self.assertEqual(proc.wait(), sig)
823
824    def test_kill_sigterm(self):
825        # SIGTERM doesn't mean anything special, but make sure it works
826        self._kill(signal.SIGTERM)
827
828    def test_kill_int(self):
829        # os.kill on Windows can take an int which gets set as the exit code
830        self._kill(100)
831
832    def _kill_with_event(self, event, name):
833        tagname = "test_os_%s" % uuid.uuid1()
834        m = mmap.mmap(-1, 1, tagname)
835        m[0] = '0'
836        # Run a script which has console control handling enabled.
837        proc = subprocess.Popen([sys.executable,
838                   os.path.join(os.path.dirname(__file__),
839                                "win_console_handler.py"), tagname],
840                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
841        # Let the interpreter startup before we send signals. See #3137.
842        count, max = 0, 20
843        while count < max and proc.poll() is None:
844            if m[0] == '1':
845                break
846            time.sleep(0.5)
847            count += 1
848        else:
849            self.fail("Subprocess didn't finish initialization")
850        os.kill(proc.pid, event)
851        # proc.send_signal(event) could also be done here.
852        # Allow time for the signal to be passed and the process to exit.
853        time.sleep(0.5)
854        if not proc.poll():
855            # Forcefully kill the process if we weren't able to signal it.
856            os.kill(proc.pid, signal.SIGINT)
857            self.fail("subprocess did not stop on {}".format(name))
858
859    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
860    def test_CTRL_C_EVENT(self):
861        from ctypes import wintypes
862        import ctypes
863
864        # Make a NULL value by creating a pointer with no argument.
865        NULL = ctypes.POINTER(ctypes.c_int)()
866        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
867        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
868                                          wintypes.BOOL)
869        SetConsoleCtrlHandler.restype = wintypes.BOOL
870
871        # Calling this with NULL and FALSE causes the calling process to
872        # handle Ctrl+C, rather than ignore it. This property is inherited
873        # by subprocesses.
874        SetConsoleCtrlHandler(NULL, 0)
875
876        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
877
878    def test_CTRL_BREAK_EVENT(self):
879        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
880
881
882def test_main():
883    test_support.run_unittest(
884        FileTests,
885        TemporaryFileTests,
886        StatAttributeTests,
887        EnvironTests,
888        WalkTests,
889        MakedirTests,
890        DevNullTests,
891        URandomTests,
892        URandomFDTests,
893        ExecvpeTests,
894        Win32ErrorTests,
895        TestInvalidFD,
896        PosixUidGidTests,
897        Win32KillTests
898    )
899
900if __name__ == "__main__":
901    test_main()
902