• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"Test posix functions"
2
3from test import support
4from test.support.script_helper import assert_python_ok
5
6# Skip these tests if there is no posix module.
7posix = support.import_module('posix')
8
9import errno
10import sys
11import signal
12import time
13import os
14import platform
15import pwd
16import stat
17import tempfile
18import unittest
19import warnings
20import textwrap
21
22_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
23                              support.TESTFN + '-dummy-symlink')
24
25requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
26        'test is only meaningful on 32-bit builds')
27
28def _supports_sched():
29    if not hasattr(posix, 'sched_getscheduler'):
30        return False
31    try:
32        posix.sched_getscheduler(0)
33    except OSError as e:
34        if e.errno == errno.ENOSYS:
35            return False
36    return True
37
38requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
39
40class PosixTester(unittest.TestCase):
41
42    def setUp(self):
43        # create empty file
44        fp = open(support.TESTFN, 'w+')
45        fp.close()
46        self.teardown_files = [ support.TESTFN ]
47        self._warnings_manager = support.check_warnings()
48        self._warnings_manager.__enter__()
49        warnings.filterwarnings('ignore', '.* potential security risk .*',
50                                RuntimeWarning)
51
52    def tearDown(self):
53        for teardown_file in self.teardown_files:
54            support.unlink(teardown_file)
55        self._warnings_manager.__exit__(None, None, None)
56
57    def testNoArgFunctions(self):
58        # test posix functions which take no arguments and have
59        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
60        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
61                             "times", "getloadavg",
62                             "getegid", "geteuid", "getgid", "getgroups",
63                             "getpid", "getpgrp", "getppid", "getuid", "sync",
64                           ]
65
66        for name in NO_ARG_FUNCTIONS:
67            posix_func = getattr(posix, name, None)
68            if posix_func is not None:
69                posix_func()
70                self.assertRaises(TypeError, posix_func, 1)
71
72    @unittest.skipUnless(hasattr(posix, 'getresuid'),
73                         'test needs posix.getresuid()')
74    def test_getresuid(self):
75        user_ids = posix.getresuid()
76        self.assertEqual(len(user_ids), 3)
77        for val in user_ids:
78            self.assertGreaterEqual(val, 0)
79
80    @unittest.skipUnless(hasattr(posix, 'getresgid'),
81                         'test needs posix.getresgid()')
82    def test_getresgid(self):
83        group_ids = posix.getresgid()
84        self.assertEqual(len(group_ids), 3)
85        for val in group_ids:
86            self.assertGreaterEqual(val, 0)
87
88    @unittest.skipUnless(hasattr(posix, 'setresuid'),
89                         'test needs posix.setresuid()')
90    def test_setresuid(self):
91        current_user_ids = posix.getresuid()
92        self.assertIsNone(posix.setresuid(*current_user_ids))
93        # -1 means don't change that value.
94        self.assertIsNone(posix.setresuid(-1, -1, -1))
95
96    @unittest.skipUnless(hasattr(posix, 'setresuid'),
97                         'test needs posix.setresuid()')
98    def test_setresuid_exception(self):
99        # Don't do this test if someone is silly enough to run us as root.
100        current_user_ids = posix.getresuid()
101        if 0 not in current_user_ids:
102            new_user_ids = (current_user_ids[0]+1, -1, -1)
103            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
104
105    @unittest.skipUnless(hasattr(posix, 'setresgid'),
106                         'test needs posix.setresgid()')
107    def test_setresgid(self):
108        current_group_ids = posix.getresgid()
109        self.assertIsNone(posix.setresgid(*current_group_ids))
110        # -1 means don't change that value.
111        self.assertIsNone(posix.setresgid(-1, -1, -1))
112
113    @unittest.skipUnless(hasattr(posix, 'setresgid'),
114                         'test needs posix.setresgid()')
115    def test_setresgid_exception(self):
116        # Don't do this test if someone is silly enough to run us as root.
117        current_group_ids = posix.getresgid()
118        if 0 not in current_group_ids:
119            new_group_ids = (current_group_ids[0]+1, -1, -1)
120            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
121
122    @unittest.skipUnless(hasattr(posix, 'initgroups'),
123                         "test needs os.initgroups()")
124    def test_initgroups(self):
125        # It takes a string and an integer; check that it raises a TypeError
126        # for other argument lists.
127        self.assertRaises(TypeError, posix.initgroups)
128        self.assertRaises(TypeError, posix.initgroups, None)
129        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
130        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
131
132        # If a non-privileged user invokes it, it should fail with OSError
133        # EPERM.
134        if os.getuid() != 0:
135            try:
136                name = pwd.getpwuid(posix.getuid()).pw_name
137            except KeyError:
138                # the current UID may not have a pwd entry
139                raise unittest.SkipTest("need a pwd entry")
140            try:
141                posix.initgroups(name, 13)
142            except OSError as e:
143                self.assertEqual(e.errno, errno.EPERM)
144            else:
145                self.fail("Expected OSError to be raised by initgroups")
146
147    @unittest.skipUnless(hasattr(posix, 'statvfs'),
148                         'test needs posix.statvfs()')
149    def test_statvfs(self):
150        self.assertTrue(posix.statvfs(os.curdir))
151
152    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
153                         'test needs posix.fstatvfs()')
154    def test_fstatvfs(self):
155        fp = open(support.TESTFN)
156        try:
157            self.assertTrue(posix.fstatvfs(fp.fileno()))
158            self.assertTrue(posix.statvfs(fp.fileno()))
159        finally:
160            fp.close()
161
162    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
163                         'test needs posix.ftruncate()')
164    def test_ftruncate(self):
165        fp = open(support.TESTFN, 'w+')
166        try:
167            # we need to have some data to truncate
168            fp.write('test')
169            fp.flush()
170            posix.ftruncate(fp.fileno(), 0)
171        finally:
172            fp.close()
173
174    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
175    def test_truncate(self):
176        with open(support.TESTFN, 'w') as fp:
177            fp.write('test')
178            fp.flush()
179        posix.truncate(support.TESTFN, 0)
180
181    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
182    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
183    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
184    def test_fexecve(self):
185        fp = os.open(sys.executable, os.O_RDONLY)
186        try:
187            pid = os.fork()
188            if pid == 0:
189                os.chdir(os.path.split(sys.executable)[0])
190                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
191            else:
192                self.assertEqual(os.waitpid(pid, 0), (pid, 0))
193        finally:
194            os.close(fp)
195
196
197    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
198    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
199    def test_waitid(self):
200        pid = os.fork()
201        if pid == 0:
202            os.chdir(os.path.split(sys.executable)[0])
203            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
204        else:
205            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
206            self.assertEqual(pid, res.si_pid)
207
208    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
209    def test_register_at_fork(self):
210        with self.assertRaises(TypeError, msg="Positional args not allowed"):
211            os.register_at_fork(lambda: None)
212        with self.assertRaises(TypeError, msg="Args must be callable"):
213            os.register_at_fork(before=2)
214        with self.assertRaises(TypeError, msg="Args must be callable"):
215            os.register_at_fork(after_in_child="three")
216        with self.assertRaises(TypeError, msg="Args must be callable"):
217            os.register_at_fork(after_in_parent=b"Five")
218        with self.assertRaises(TypeError, msg="Args must not be None"):
219            os.register_at_fork(before=None)
220        with self.assertRaises(TypeError, msg="Args must not be None"):
221            os.register_at_fork(after_in_child=None)
222        with self.assertRaises(TypeError, msg="Args must not be None"):
223            os.register_at_fork(after_in_parent=None)
224        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
225            # Ensure a combination of valid and invalid is an error.
226            os.register_at_fork(before=None, after_in_parent=lambda: 3)
227        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
228            # Ensure a combination of valid and invalid is an error.
229            os.register_at_fork(before=lambda: None, after_in_child='')
230        # We test actual registrations in their own process so as not to
231        # pollute this one.  There is no way to unregister for cleanup.
232        code = """if 1:
233            import os
234
235            r, w = os.pipe()
236            fin_r, fin_w = os.pipe()
237
238            os.register_at_fork(before=lambda: os.write(w, b'A'))
239            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
240            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
241            os.register_at_fork(before=lambda: os.write(w, b'B'),
242                                after_in_parent=lambda: os.write(w, b'D'),
243                                after_in_child=lambda: os.write(w, b'F'))
244
245            pid = os.fork()
246            if pid == 0:
247                # At this point, after-forkers have already been executed
248                os.close(w)
249                # Wait for parent to tell us to exit
250                os.read(fin_r, 1)
251                os._exit(0)
252            else:
253                try:
254                    os.close(w)
255                    with open(r, "rb") as f:
256                        data = f.read()
257                        assert len(data) == 6, data
258                        # Check before-fork callbacks
259                        assert data[:2] == b'BA', data
260                        # Check after-fork callbacks
261                        assert sorted(data[2:]) == list(b'CDEF'), data
262                        assert data.index(b'C') < data.index(b'D'), data
263                        assert data.index(b'E') < data.index(b'F'), data
264                finally:
265                    os.write(fin_w, b'!')
266            """
267        assert_python_ok('-c', code)
268
269    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
270    def test_lockf(self):
271        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
272        try:
273            os.write(fd, b'test')
274            os.lseek(fd, 0, os.SEEK_SET)
275            posix.lockf(fd, posix.F_LOCK, 4)
276            # section is locked
277            posix.lockf(fd, posix.F_ULOCK, 4)
278        finally:
279            os.close(fd)
280
281    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
282    def test_pread(self):
283        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
284        try:
285            os.write(fd, b'test')
286            os.lseek(fd, 0, os.SEEK_SET)
287            self.assertEqual(b'es', posix.pread(fd, 2, 1))
288            # the first pread() shouldn't disturb the file offset
289            self.assertEqual(b'te', posix.read(fd, 2))
290        finally:
291            os.close(fd)
292
293    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
294    def test_preadv(self):
295        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
296        try:
297            os.write(fd, b'test1tt2t3t5t6t6t8')
298            buf = [bytearray(i) for i in [5, 3, 2]]
299            self.assertEqual(posix.preadv(fd, buf, 3), 10)
300            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
301        finally:
302            os.close(fd)
303
304    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
305    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
306    def test_preadv_flags(self):
307        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
308        try:
309            os.write(fd, b'test1tt2t3t5t6t6t8')
310            buf = [bytearray(i) for i in [5, 3, 2]]
311            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
312            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
313        except NotImplementedError:
314            self.skipTest("preadv2 not available")
315        except OSError as inst:
316            # Is possible that the macro RWF_HIPRI was defined at compilation time
317            # but the option is not supported by the kernel or the runtime libc shared
318            # library.
319            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
320                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
321            else:
322                raise
323        finally:
324            os.close(fd)
325
326    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
327    @requires_32b
328    def test_preadv_overflow_32bits(self):
329        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
330        try:
331            buf = [bytearray(2**16)] * 2**15
332            with self.assertRaises(OSError) as cm:
333                os.preadv(fd, buf, 0)
334            self.assertEqual(cm.exception.errno, errno.EINVAL)
335            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
336        finally:
337            os.close(fd)
338
339    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
340    def test_pwrite(self):
341        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
342        try:
343            os.write(fd, b'test')
344            os.lseek(fd, 0, os.SEEK_SET)
345            posix.pwrite(fd, b'xx', 1)
346            self.assertEqual(b'txxt', posix.read(fd, 4))
347        finally:
348            os.close(fd)
349
350    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
351    def test_pwritev(self):
352        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
353        try:
354            os.write(fd, b"xx")
355            os.lseek(fd, 0, os.SEEK_SET)
356            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
357            self.assertEqual(n, 10)
358
359            os.lseek(fd, 0, os.SEEK_SET)
360            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
361        finally:
362            os.close(fd)
363
364    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
365    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
366    def test_pwritev_flags(self):
367        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
368        try:
369            os.write(fd,b"xx")
370            os.lseek(fd, 0, os.SEEK_SET)
371            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
372            self.assertEqual(n, 10)
373
374            os.lseek(fd, 0, os.SEEK_SET)
375            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
376        finally:
377            os.close(fd)
378
379    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
380    @requires_32b
381    def test_pwritev_overflow_32bits(self):
382        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
383        try:
384            with self.assertRaises(OSError) as cm:
385                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
386            self.assertEqual(cm.exception.errno, errno.EINVAL)
387        finally:
388            os.close(fd)
389
390    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
391        "test needs posix.posix_fallocate()")
392    def test_posix_fallocate(self):
393        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
394        try:
395            posix.posix_fallocate(fd, 0, 10)
396        except OSError as inst:
397            # issue10812, ZFS doesn't appear to support posix_fallocate,
398            # so skip Solaris-based since they are likely to have ZFS.
399            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
400            # often used there.
401            if inst.errno == errno.EINVAL and sys.platform.startswith(
402                ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')):
403                raise unittest.SkipTest("test may fail on ZFS filesystems")
404            else:
405                raise
406        finally:
407            os.close(fd)
408
409    # issue31106 - posix_fallocate() does not set error in errno.
410    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
411        "test needs posix.posix_fallocate()")
412    def test_posix_fallocate_errno(self):
413        try:
414            posix.posix_fallocate(-42, 0, 10)
415        except OSError as inst:
416            if inst.errno != errno.EBADF:
417                raise
418
419    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
420        "test needs posix.posix_fadvise()")
421    def test_posix_fadvise(self):
422        fd = os.open(support.TESTFN, os.O_RDONLY)
423        try:
424            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
425        finally:
426            os.close(fd)
427
428    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
429        "test needs posix.posix_fadvise()")
430    def test_posix_fadvise_errno(self):
431        try:
432            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
433        except OSError as inst:
434            if inst.errno != errno.EBADF:
435                raise
436
437    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
438    def test_utime_with_fd(self):
439        now = time.time()
440        fd = os.open(support.TESTFN, os.O_RDONLY)
441        try:
442            posix.utime(fd)
443            posix.utime(fd, None)
444            self.assertRaises(TypeError, posix.utime, fd, (None, None))
445            self.assertRaises(TypeError, posix.utime, fd, (now, None))
446            self.assertRaises(TypeError, posix.utime, fd, (None, now))
447            posix.utime(fd, (int(now), int(now)))
448            posix.utime(fd, (now, now))
449            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
450            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
451            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
452            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
453            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
454
455        finally:
456            os.close(fd)
457
458    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
459    def test_utime_nofollow_symlinks(self):
460        now = time.time()
461        posix.utime(support.TESTFN, None, follow_symlinks=False)
462        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)
463        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)
464        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)
465        posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)
466        posix.utime(support.TESTFN, (now, now), follow_symlinks=False)
467        posix.utime(support.TESTFN, follow_symlinks=False)
468
469    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
470    def test_writev(self):
471        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
472        try:
473            n = os.writev(fd, (b'test1', b'tt2', b't3'))
474            self.assertEqual(n, 10)
475
476            os.lseek(fd, 0, os.SEEK_SET)
477            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
478
479            # Issue #20113: empty list of buffers should not crash
480            try:
481                size = posix.writev(fd, [])
482            except OSError:
483                # writev(fd, []) raises OSError(22, "Invalid argument")
484                # on OpenIndiana
485                pass
486            else:
487                self.assertEqual(size, 0)
488        finally:
489            os.close(fd)
490
491    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
492    @requires_32b
493    def test_writev_overflow_32bits(self):
494        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
495        try:
496            with self.assertRaises(OSError) as cm:
497                os.writev(fd, [b"x" * 2**16] * 2**15)
498            self.assertEqual(cm.exception.errno, errno.EINVAL)
499        finally:
500            os.close(fd)
501
502    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
503    def test_readv(self):
504        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
505        try:
506            os.write(fd, b'test1tt2t3')
507            os.lseek(fd, 0, os.SEEK_SET)
508            buf = [bytearray(i) for i in [5, 3, 2]]
509            self.assertEqual(posix.readv(fd, buf), 10)
510            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
511
512            # Issue #20113: empty list of buffers should not crash
513            try:
514                size = posix.readv(fd, [])
515            except OSError:
516                # readv(fd, []) raises OSError(22, "Invalid argument")
517                # on OpenIndiana
518                pass
519            else:
520                self.assertEqual(size, 0)
521        finally:
522            os.close(fd)
523
524    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
525    @requires_32b
526    def test_readv_overflow_32bits(self):
527        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
528        try:
529            buf = [bytearray(2**16)] * 2**15
530            with self.assertRaises(OSError) as cm:
531                os.readv(fd, buf)
532            self.assertEqual(cm.exception.errno, errno.EINVAL)
533            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
534        finally:
535            os.close(fd)
536
537    @unittest.skipUnless(hasattr(posix, 'dup'),
538                         'test needs posix.dup()')
539    def test_dup(self):
540        fp = open(support.TESTFN)
541        try:
542            fd = posix.dup(fp.fileno())
543            self.assertIsInstance(fd, int)
544            os.close(fd)
545        finally:
546            fp.close()
547
548    @unittest.skipUnless(hasattr(posix, 'confstr'),
549                         'test needs posix.confstr()')
550    def test_confstr(self):
551        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
552        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
553
554    @unittest.skipUnless(hasattr(posix, 'dup2'),
555                         'test needs posix.dup2()')
556    def test_dup2(self):
557        fp1 = open(support.TESTFN)
558        fp2 = open(support.TESTFN)
559        try:
560            posix.dup2(fp1.fileno(), fp2.fileno())
561        finally:
562            fp1.close()
563            fp2.close()
564
565    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
566    @support.requires_linux_version(2, 6, 23)
567    def test_oscloexec(self):
568        fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
569        self.addCleanup(os.close, fd)
570        self.assertFalse(os.get_inheritable(fd))
571
572    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
573                         'test needs posix.O_EXLOCK')
574    def test_osexlock(self):
575        fd = os.open(support.TESTFN,
576                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
577        self.assertRaises(OSError, os.open, support.TESTFN,
578                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
579        os.close(fd)
580
581        if hasattr(posix, "O_SHLOCK"):
582            fd = os.open(support.TESTFN,
583                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
584            self.assertRaises(OSError, os.open, support.TESTFN,
585                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
586            os.close(fd)
587
588    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
589                         'test needs posix.O_SHLOCK')
590    def test_osshlock(self):
591        fd1 = os.open(support.TESTFN,
592                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
593        fd2 = os.open(support.TESTFN,
594                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
595        os.close(fd2)
596        os.close(fd1)
597
598        if hasattr(posix, "O_EXLOCK"):
599            fd = os.open(support.TESTFN,
600                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
601            self.assertRaises(OSError, os.open, support.TESTFN,
602                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
603            os.close(fd)
604
605    @unittest.skipUnless(hasattr(posix, 'fstat'),
606                         'test needs posix.fstat()')
607    def test_fstat(self):
608        fp = open(support.TESTFN)
609        try:
610            self.assertTrue(posix.fstat(fp.fileno()))
611            self.assertTrue(posix.stat(fp.fileno()))
612
613            self.assertRaisesRegex(TypeError,
614                    'should be string, bytes, os.PathLike or integer, not',
615                    posix.stat, float(fp.fileno()))
616        finally:
617            fp.close()
618
619    def test_stat(self):
620        self.assertTrue(posix.stat(support.TESTFN))
621        self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
622
623        self.assertWarnsRegex(DeprecationWarning,
624                'should be string, bytes, os.PathLike or integer, not',
625                posix.stat, bytearray(os.fsencode(support.TESTFN)))
626        self.assertRaisesRegex(TypeError,
627                'should be string, bytes, os.PathLike or integer, not',
628                posix.stat, None)
629        self.assertRaisesRegex(TypeError,
630                'should be string, bytes, os.PathLike or integer, not',
631                posix.stat, list(support.TESTFN))
632        self.assertRaisesRegex(TypeError,
633                'should be string, bytes, os.PathLike or integer, not',
634                posix.stat, list(os.fsencode(support.TESTFN)))
635
636    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
637    def test_mkfifo(self):
638        support.unlink(support.TESTFN)
639        try:
640            posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
641        except PermissionError as e:
642            self.skipTest('posix.mkfifo(): %s' % e)
643        self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
644
645    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
646                         "don't have mknod()/S_IFIFO")
647    def test_mknod(self):
648        # Test using mknod() to create a FIFO (the only use specified
649        # by POSIX).
650        support.unlink(support.TESTFN)
651        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
652        try:
653            posix.mknod(support.TESTFN, mode, 0)
654        except OSError as e:
655            # Some old systems don't allow unprivileged users to use
656            # mknod(), or only support creating device nodes.
657            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
658        else:
659            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
660
661        # Keyword arguments are also supported
662        support.unlink(support.TESTFN)
663        try:
664            posix.mknod(path=support.TESTFN, mode=mode, device=0,
665                dir_fd=None)
666        except OSError as e:
667            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
668
669    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
670    def test_makedev(self):
671        st = posix.stat(support.TESTFN)
672        dev = st.st_dev
673        self.assertIsInstance(dev, int)
674        self.assertGreaterEqual(dev, 0)
675
676        major = posix.major(dev)
677        self.assertIsInstance(major, int)
678        self.assertGreaterEqual(major, 0)
679        self.assertEqual(posix.major(dev), major)
680        self.assertRaises(TypeError, posix.major, float(dev))
681        self.assertRaises(TypeError, posix.major)
682        self.assertRaises((ValueError, OverflowError), posix.major, -1)
683
684        minor = posix.minor(dev)
685        self.assertIsInstance(minor, int)
686        self.assertGreaterEqual(minor, 0)
687        self.assertEqual(posix.minor(dev), minor)
688        self.assertRaises(TypeError, posix.minor, float(dev))
689        self.assertRaises(TypeError, posix.minor)
690        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
691
692        self.assertEqual(posix.makedev(major, minor), dev)
693        self.assertRaises(TypeError, posix.makedev, float(major), minor)
694        self.assertRaises(TypeError, posix.makedev, major, float(minor))
695        self.assertRaises(TypeError, posix.makedev, major)
696        self.assertRaises(TypeError, posix.makedev)
697
698    def _test_all_chown_common(self, chown_func, first_param, stat_func):
699        """Common code for chown, fchown and lchown tests."""
700        def check_stat(uid, gid):
701            if stat_func is not None:
702                stat = stat_func(first_param)
703                self.assertEqual(stat.st_uid, uid)
704                self.assertEqual(stat.st_gid, gid)
705        uid = os.getuid()
706        gid = os.getgid()
707        # test a successful chown call
708        chown_func(first_param, uid, gid)
709        check_stat(uid, gid)
710        chown_func(first_param, -1, gid)
711        check_stat(uid, gid)
712        chown_func(first_param, uid, -1)
713        check_stat(uid, gid)
714
715        if uid == 0:
716            # Try an amusingly large uid/gid to make sure we handle
717            # large unsigned values.  (chown lets you use any
718            # uid/gid you like, even if they aren't defined.)
719            #
720            # This problem keeps coming up:
721            #   http://bugs.python.org/issue1747858
722            #   http://bugs.python.org/issue4591
723            #   http://bugs.python.org/issue15301
724            # Hopefully the fix in 4591 fixes it for good!
725            #
726            # This part of the test only runs when run as root.
727            # Only scary people run their tests as root.
728
729            big_value = 2**31
730            chown_func(first_param, big_value, big_value)
731            check_stat(big_value, big_value)
732            chown_func(first_param, -1, -1)
733            check_stat(big_value, big_value)
734            chown_func(first_param, uid, gid)
735            check_stat(uid, gid)
736        elif platform.system() in ('HP-UX', 'SunOS'):
737            # HP-UX and Solaris can allow a non-root user to chown() to root
738            # (issue #5113)
739            raise unittest.SkipTest("Skipping because of non-standard chown() "
740                                    "behavior")
741        else:
742            # non-root cannot chown to root, raises OSError
743            self.assertRaises(OSError, chown_func, first_param, 0, 0)
744            check_stat(uid, gid)
745            self.assertRaises(OSError, chown_func, first_param, 0, -1)
746            check_stat(uid, gid)
747            if 0 not in os.getgroups():
748                self.assertRaises(OSError, chown_func, first_param, -1, 0)
749                check_stat(uid, gid)
750        # test illegal types
751        for t in str, float:
752            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
753            check_stat(uid, gid)
754            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
755            check_stat(uid, gid)
756
757    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
758    def test_chown(self):
759        # raise an OSError if the file does not exist
760        os.unlink(support.TESTFN)
761        self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
762
763        # re-create the file
764        support.create_empty_file(support.TESTFN)
765        self._test_all_chown_common(posix.chown, support.TESTFN, posix.stat)
766
767    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
768    def test_fchown(self):
769        os.unlink(support.TESTFN)
770
771        # re-create the file
772        test_file = open(support.TESTFN, 'w')
773        try:
774            fd = test_file.fileno()
775            self._test_all_chown_common(posix.fchown, fd,
776                                        getattr(posix, 'fstat', None))
777        finally:
778            test_file.close()
779
780    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
781    def test_lchown(self):
782        os.unlink(support.TESTFN)
783        # create a symlink
784        os.symlink(_DUMMY_SYMLINK, support.TESTFN)
785        self._test_all_chown_common(posix.lchown, support.TESTFN,
786                                    getattr(posix, 'lstat', None))
787
788    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
789    def test_chdir(self):
790        posix.chdir(os.curdir)
791        self.assertRaises(OSError, posix.chdir, support.TESTFN)
792
793    def test_listdir(self):
794        self.assertIn(support.TESTFN, posix.listdir(os.curdir))
795
796    def test_listdir_default(self):
797        # When listdir is called without argument,
798        # it's the same as listdir(os.curdir).
799        self.assertIn(support.TESTFN, posix.listdir())
800
801    def test_listdir_bytes(self):
802        # When listdir is called with a bytes object,
803        # the returned strings are of type bytes.
804        self.assertIn(os.fsencode(support.TESTFN), posix.listdir(b'.'))
805
806    def test_listdir_bytes_like(self):
807        for cls in bytearray, memoryview:
808            with self.assertWarns(DeprecationWarning):
809                names = posix.listdir(cls(b'.'))
810            self.assertIn(os.fsencode(support.TESTFN), names)
811            for name in names:
812                self.assertIs(type(name), bytes)
813
814    @unittest.skipUnless(posix.listdir in os.supports_fd,
815                         "test needs fd support for posix.listdir()")
816    def test_listdir_fd(self):
817        f = posix.open(posix.getcwd(), posix.O_RDONLY)
818        self.addCleanup(posix.close, f)
819        self.assertEqual(
820            sorted(posix.listdir('.')),
821            sorted(posix.listdir(f))
822            )
823        # Check that the fd offset was reset (issue #13739)
824        self.assertEqual(
825            sorted(posix.listdir('.')),
826            sorted(posix.listdir(f))
827            )
828
829    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
830    def test_access(self):
831        self.assertTrue(posix.access(support.TESTFN, os.R_OK))
832
833    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
834    def test_umask(self):
835        old_mask = posix.umask(0)
836        self.assertIsInstance(old_mask, int)
837        posix.umask(old_mask)
838
839    @unittest.skipUnless(hasattr(posix, 'strerror'),
840                         'test needs posix.strerror()')
841    def test_strerror(self):
842        self.assertTrue(posix.strerror(0))
843
844    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
845    def test_pipe(self):
846        reader, writer = posix.pipe()
847        os.close(reader)
848        os.close(writer)
849
850    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
851    @support.requires_linux_version(2, 6, 27)
852    def test_pipe2(self):
853        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
854        self.assertRaises(TypeError, os.pipe2, 0, 0)
855
856        # try calling with flags = 0, like os.pipe()
857        r, w = os.pipe2(0)
858        os.close(r)
859        os.close(w)
860
861        # test flags
862        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
863        self.addCleanup(os.close, r)
864        self.addCleanup(os.close, w)
865        self.assertFalse(os.get_inheritable(r))
866        self.assertFalse(os.get_inheritable(w))
867        self.assertFalse(os.get_blocking(r))
868        self.assertFalse(os.get_blocking(w))
869        # try reading from an empty pipe: this should fail, not block
870        self.assertRaises(OSError, os.read, r, 1)
871        # try a write big enough to fill-up the pipe: this should either
872        # fail or perform a partial write, not block
873        try:
874            os.write(w, b'x' * support.PIPE_MAX_SIZE)
875        except OSError:
876            pass
877
878    @support.cpython_only
879    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
880    @support.requires_linux_version(2, 6, 27)
881    def test_pipe2_c_limits(self):
882        # Issue 15989
883        import _testcapi
884        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
885        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
886
887    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
888    def test_utime(self):
889        now = time.time()
890        posix.utime(support.TESTFN, None)
891        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
892        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
893        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
894        posix.utime(support.TESTFN, (int(now), int(now)))
895        posix.utime(support.TESTFN, (now, now))
896
897    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
898        st = os.stat(target_file)
899        self.assertTrue(hasattr(st, 'st_flags'))
900
901        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
902        flags = st.st_flags | stat.UF_IMMUTABLE
903        try:
904            chflags_func(target_file, flags, **kwargs)
905        except OSError as err:
906            if err.errno != errno.EOPNOTSUPP:
907                raise
908            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
909            self.skipTest(msg)
910
911        try:
912            new_st = os.stat(target_file)
913            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
914            try:
915                fd = open(target_file, 'w+')
916            except OSError as e:
917                self.assertEqual(e.errno, errno.EPERM)
918        finally:
919            posix.chflags(target_file, st.st_flags)
920
921    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
922    def test_chflags(self):
923        self._test_chflags_regular_file(posix.chflags, support.TESTFN)
924
925    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
926    def test_lchflags_regular_file(self):
927        self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
928        self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)
929
930    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
931    def test_lchflags_symlink(self):
932        testfn_st = os.stat(support.TESTFN)
933
934        self.assertTrue(hasattr(testfn_st, 'st_flags'))
935
936        os.symlink(support.TESTFN, _DUMMY_SYMLINK)
937        self.teardown_files.append(_DUMMY_SYMLINK)
938        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
939
940        def chflags_nofollow(path, flags):
941            return posix.chflags(path, flags, follow_symlinks=False)
942
943        for fn in (posix.lchflags, chflags_nofollow):
944            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
945            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
946            try:
947                fn(_DUMMY_SYMLINK, flags)
948            except OSError as err:
949                if err.errno != errno.EOPNOTSUPP:
950                    raise
951                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
952                self.skipTest(msg)
953            try:
954                new_testfn_st = os.stat(support.TESTFN)
955                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
956
957                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
958                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
959                                 new_dummy_symlink_st.st_flags)
960            finally:
961                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
962
963    def test_environ(self):
964        if os.name == "nt":
965            item_type = str
966        else:
967            item_type = bytes
968        for k, v in posix.environ.items():
969            self.assertEqual(type(k), item_type)
970            self.assertEqual(type(v), item_type)
971
972    @unittest.skipUnless(hasattr(os, "putenv"), "requires os.putenv()")
973    def test_putenv(self):
974        with self.assertRaises(ValueError):
975            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
976        with self.assertRaises(ValueError):
977            os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
978        with self.assertRaises(ValueError):
979            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
980        with self.assertRaises(ValueError):
981            os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
982        with self.assertRaises(ValueError):
983            os.putenv('FRUIT=ORANGE', 'lemon')
984        with self.assertRaises(ValueError):
985            os.putenv(b'FRUIT=ORANGE', b'lemon')
986
987    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
988    def test_getcwd_long_pathnames(self):
989        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
990        curdir = os.getcwd()
991        base_path = os.path.abspath(support.TESTFN) + '.getcwd'
992
993        try:
994            os.mkdir(base_path)
995            os.chdir(base_path)
996        except:
997            #  Just returning nothing instead of the SkipTest exception, because
998            #  the test results in Error in that case.  Is that ok?
999            #  raise unittest.SkipTest("cannot create directory for testing")
1000            return
1001
1002            def _create_and_do_getcwd(dirname, current_path_length = 0):
1003                try:
1004                    os.mkdir(dirname)
1005                except:
1006                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1007
1008                os.chdir(dirname)
1009                try:
1010                    os.getcwd()
1011                    if current_path_length < 1027:
1012                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1013                finally:
1014                    os.chdir('..')
1015                    os.rmdir(dirname)
1016
1017            _create_and_do_getcwd(dirname)
1018
1019        finally:
1020            os.chdir(curdir)
1021            support.rmtree(base_path)
1022
1023    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1024    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1025    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1026    def test_getgrouplist(self):
1027        user = pwd.getpwuid(os.getuid())[0]
1028        group = pwd.getpwuid(os.getuid())[3]
1029        self.assertIn(group, posix.getgrouplist(user, group))
1030
1031
1032    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1033    def test_getgroups(self):
1034        with os.popen('id -G 2>/dev/null') as idg:
1035            groups = idg.read().strip()
1036            ret = idg.close()
1037
1038        try:
1039            idg_groups = set(int(g) for g in groups.split())
1040        except ValueError:
1041            idg_groups = set()
1042        if ret is not None or not idg_groups:
1043            raise unittest.SkipTest("need working 'id -G'")
1044
1045        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1046        if sys.platform == 'darwin':
1047            import sysconfig
1048            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
1049            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
1050                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1051
1052        # 'id -G' and 'os.getgroups()' should return the same
1053        # groups, ignoring order, duplicates, and the effective gid.
1054        # #10822/#26944 - It is implementation defined whether
1055        # posix.getgroups() includes the effective gid.
1056        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1057        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1058
1059    # tests for the posix *at functions follow
1060
1061    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1062    def test_access_dir_fd(self):
1063        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1064        try:
1065            self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
1066        finally:
1067            posix.close(f)
1068
1069    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1070    def test_chmod_dir_fd(self):
1071        os.chmod(support.TESTFN, stat.S_IRUSR)
1072
1073        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1074        try:
1075            posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1076
1077            s = posix.stat(support.TESTFN)
1078            self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
1079        finally:
1080            posix.close(f)
1081
1082    @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
1083    def test_chown_dir_fd(self):
1084        support.unlink(support.TESTFN)
1085        support.create_empty_file(support.TESTFN)
1086
1087        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1088        try:
1089            posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
1090        finally:
1091            posix.close(f)
1092
1093    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1094    def test_stat_dir_fd(self):
1095        support.unlink(support.TESTFN)
1096        with open(support.TESTFN, 'w') as outfile:
1097            outfile.write("testline\n")
1098
1099        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1100        try:
1101            s1 = posix.stat(support.TESTFN)
1102            s2 = posix.stat(support.TESTFN, dir_fd=f)
1103            self.assertEqual(s1, s2)
1104            s2 = posix.stat(support.TESTFN, dir_fd=None)
1105            self.assertEqual(s1, s2)
1106            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1107                    posix.stat, support.TESTFN, dir_fd=posix.getcwd())
1108            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1109                    posix.stat, support.TESTFN, dir_fd=float(f))
1110            self.assertRaises(OverflowError,
1111                    posix.stat, support.TESTFN, dir_fd=10**20)
1112        finally:
1113            posix.close(f)
1114
1115    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1116    def test_utime_dir_fd(self):
1117        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1118        try:
1119            now = time.time()
1120            posix.utime(support.TESTFN, None, dir_fd=f)
1121            posix.utime(support.TESTFN, dir_fd=f)
1122            self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
1123            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
1124            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
1125            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
1126            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
1127            posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
1128            posix.utime(support.TESTFN, (now, now), dir_fd=f)
1129            posix.utime(support.TESTFN,
1130                    (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
1131            posix.utime(support.TESTFN, dir_fd=f,
1132                            times=(int(now), int((now - int(now)) * 1e9)))
1133
1134            # try dir_fd and follow_symlinks together
1135            if os.utime in os.supports_follow_symlinks:
1136                try:
1137                    posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
1138                except ValueError:
1139                    # whoops!  using both together not supported on this platform.
1140                    pass
1141
1142        finally:
1143            posix.close(f)
1144
1145    @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
1146    def test_link_dir_fd(self):
1147        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1148        try:
1149            posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
1150        except PermissionError as e:
1151            self.skipTest('posix.link(): %s' % e)
1152        else:
1153            # should have same inodes
1154            self.assertEqual(posix.stat(support.TESTFN)[1],
1155                posix.stat(support.TESTFN + 'link')[1])
1156        finally:
1157            posix.close(f)
1158            support.unlink(support.TESTFN + 'link')
1159
1160    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1161    def test_mkdir_dir_fd(self):
1162        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1163        try:
1164            posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
1165            posix.stat(support.TESTFN + 'dir') # should not raise exception
1166        finally:
1167            posix.close(f)
1168            support.rmtree(support.TESTFN + 'dir')
1169
1170    @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
1171                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1172    def test_mknod_dir_fd(self):
1173        # Test using mknodat() to create a FIFO (the only use specified
1174        # by POSIX).
1175        support.unlink(support.TESTFN)
1176        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1177        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1178        try:
1179            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
1180        except OSError as e:
1181            # Some old systems don't allow unprivileged users to use
1182            # mknod(), or only support creating device nodes.
1183            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1184        else:
1185            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
1186        finally:
1187            posix.close(f)
1188
1189    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1190    def test_open_dir_fd(self):
1191        support.unlink(support.TESTFN)
1192        with open(support.TESTFN, 'w') as outfile:
1193            outfile.write("testline\n")
1194        a = posix.open(posix.getcwd(), posix.O_RDONLY)
1195        b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
1196        try:
1197            res = posix.read(b, 9).decode(encoding="utf-8")
1198            self.assertEqual("testline\n", res)
1199        finally:
1200            posix.close(a)
1201            posix.close(b)
1202
1203    @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
1204    def test_readlink_dir_fd(self):
1205        os.symlink(support.TESTFN, support.TESTFN + 'link')
1206        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1207        try:
1208            self.assertEqual(posix.readlink(support.TESTFN + 'link'),
1209                posix.readlink(support.TESTFN + 'link', dir_fd=f))
1210        finally:
1211            support.unlink(support.TESTFN + 'link')
1212            posix.close(f)
1213
1214    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1215    def test_rename_dir_fd(self):
1216        support.unlink(support.TESTFN)
1217        support.create_empty_file(support.TESTFN + 'ren')
1218        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1219        try:
1220            posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
1221        except:
1222            posix.rename(support.TESTFN + 'ren', support.TESTFN)
1223            raise
1224        else:
1225            posix.stat(support.TESTFN) # should not raise exception
1226        finally:
1227            posix.close(f)
1228
1229    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1230    def test_symlink_dir_fd(self):
1231        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1232        try:
1233            posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
1234            self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
1235        finally:
1236            posix.close(f)
1237            support.unlink(support.TESTFN + 'link')
1238
1239    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1240    def test_unlink_dir_fd(self):
1241        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1242        support.create_empty_file(support.TESTFN + 'del')
1243        posix.stat(support.TESTFN + 'del') # should not raise exception
1244        try:
1245            posix.unlink(support.TESTFN + 'del', dir_fd=f)
1246        except:
1247            support.unlink(support.TESTFN + 'del')
1248            raise
1249        else:
1250            self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
1251        finally:
1252            posix.close(f)
1253
1254    @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1255    def test_mkfifo_dir_fd(self):
1256        support.unlink(support.TESTFN)
1257        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1258        try:
1259            try:
1260                posix.mkfifo(support.TESTFN,
1261                             stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1262            except PermissionError as e:
1263                self.skipTest('posix.mkfifo(): %s' % e)
1264            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
1265        finally:
1266            posix.close(f)
1267
1268    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1269                                           "don't have scheduling support")
1270    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1271                                                  "don't have sched affinity support")
1272
1273    @requires_sched_h
1274    def test_sched_yield(self):
1275        # This has no error conditions (at least on Linux).
1276        posix.sched_yield()
1277
1278    @requires_sched_h
1279    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1280                         "requires sched_get_priority_max()")
1281    def test_sched_priority(self):
1282        # Round-robin usually has interesting priorities.
1283        pol = posix.SCHED_RR
1284        lo = posix.sched_get_priority_min(pol)
1285        hi = posix.sched_get_priority_max(pol)
1286        self.assertIsInstance(lo, int)
1287        self.assertIsInstance(hi, int)
1288        self.assertGreaterEqual(hi, lo)
1289        # OSX evidently just returns 15 without checking the argument.
1290        if sys.platform != "darwin":
1291            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1292            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1293
1294    @requires_sched
1295    def test_get_and_set_scheduler_and_param(self):
1296        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1297                               if name.startswith("SCHED_")]
1298        mine = posix.sched_getscheduler(0)
1299        self.assertIn(mine, possible_schedulers)
1300        try:
1301            parent = posix.sched_getscheduler(os.getppid())
1302        except OSError as e:
1303            if e.errno != errno.EPERM:
1304                raise
1305        else:
1306            self.assertIn(parent, possible_schedulers)
1307        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1308        self.assertRaises(OSError, posix.sched_getparam, -1)
1309        param = posix.sched_getparam(0)
1310        self.assertIsInstance(param.sched_priority, int)
1311
1312        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1313        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1314        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1315        if not sys.platform.startswith(('freebsd', 'netbsd')):
1316            try:
1317                posix.sched_setscheduler(0, mine, param)
1318                posix.sched_setparam(0, param)
1319            except OSError as e:
1320                if e.errno != errno.EPERM:
1321                    raise
1322            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1323
1324        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1325        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1326        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1327        param = posix.sched_param(None)
1328        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1329        large = 214748364700
1330        param = posix.sched_param(large)
1331        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1332        param = posix.sched_param(sched_priority=-large)
1333        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1334
1335    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1336    def test_sched_rr_get_interval(self):
1337        try:
1338            interval = posix.sched_rr_get_interval(0)
1339        except OSError as e:
1340            # This likely means that sched_rr_get_interval is only valid for
1341            # processes with the SCHED_RR scheduler in effect.
1342            if e.errno != errno.EINVAL:
1343                raise
1344            self.skipTest("only works on SCHED_RR processes")
1345        self.assertIsInstance(interval, float)
1346        # Reasonable constraints, I think.
1347        self.assertGreaterEqual(interval, 0.)
1348        self.assertLess(interval, 1.)
1349
1350    @requires_sched_affinity
1351    def test_sched_getaffinity(self):
1352        mask = posix.sched_getaffinity(0)
1353        self.assertIsInstance(mask, set)
1354        self.assertGreaterEqual(len(mask), 1)
1355        self.assertRaises(OSError, posix.sched_getaffinity, -1)
1356        for cpu in mask:
1357            self.assertIsInstance(cpu, int)
1358            self.assertGreaterEqual(cpu, 0)
1359            self.assertLess(cpu, 1 << 32)
1360
1361    @requires_sched_affinity
1362    def test_sched_setaffinity(self):
1363        mask = posix.sched_getaffinity(0)
1364        if len(mask) > 1:
1365            # Empty masks are forbidden
1366            mask.pop()
1367        posix.sched_setaffinity(0, mask)
1368        self.assertEqual(posix.sched_getaffinity(0), mask)
1369        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1370        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1371        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1372        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1373        self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1374
1375    def test_rtld_constants(self):
1376        # check presence of major RTLD_* constants
1377        posix.RTLD_LAZY
1378        posix.RTLD_NOW
1379        posix.RTLD_GLOBAL
1380        posix.RTLD_LOCAL
1381
1382    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1383                         "test needs an OS that reports file holes")
1384    def test_fs_holes(self):
1385        # Even if the filesystem doesn't report holes,
1386        # if the OS supports it the SEEK_* constants
1387        # will be defined and will have a consistent
1388        # behaviour:
1389        # os.SEEK_DATA = current position
1390        # os.SEEK_HOLE = end of file position
1391        with open(support.TESTFN, 'r+b') as fp:
1392            fp.write(b"hello")
1393            fp.flush()
1394            size = fp.tell()
1395            fno = fp.fileno()
1396            try :
1397                for i in range(size):
1398                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1399                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1400                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1401                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1402            except OSError :
1403                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1404                # but it is not true.
1405                # For instance:
1406                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1407                raise unittest.SkipTest("OSError raised!")
1408
1409    def test_path_error2(self):
1410        """
1411        Test functions that call path_error2(), providing two filenames in their exceptions.
1412        """
1413        for name in ("rename", "replace", "link"):
1414            function = getattr(os, name, None)
1415            if function is None:
1416                continue
1417
1418            for dst in ("noodly2", support.TESTFN):
1419                try:
1420                    function('doesnotexistfilename', dst)
1421                except OSError as e:
1422                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1423                    break
1424            else:
1425                self.fail("No valid path_error2() test for os." + name)
1426
1427    def test_path_with_null_character(self):
1428        fn = support.TESTFN
1429        fn_with_NUL = fn + '\0'
1430        self.addCleanup(support.unlink, fn)
1431        support.unlink(fn)
1432        fd = None
1433        try:
1434            with self.assertRaises(ValueError):
1435                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1436        finally:
1437            if fd is not None:
1438                os.close(fd)
1439        self.assertFalse(os.path.exists(fn))
1440        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1441        self.assertFalse(os.path.exists(fn))
1442        open(fn, 'wb').close()
1443        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1444
1445    def test_path_with_null_byte(self):
1446        fn = os.fsencode(support.TESTFN)
1447        fn_with_NUL = fn + b'\0'
1448        self.addCleanup(support.unlink, fn)
1449        support.unlink(fn)
1450        fd = None
1451        try:
1452            with self.assertRaises(ValueError):
1453                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1454        finally:
1455            if fd is not None:
1456                os.close(fd)
1457        self.assertFalse(os.path.exists(fn))
1458        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1459        self.assertFalse(os.path.exists(fn))
1460        open(fn, 'wb').close()
1461        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1462
1463class PosixGroupsTester(unittest.TestCase):
1464
1465    def setUp(self):
1466        if posix.getuid() != 0:
1467            raise unittest.SkipTest("not enough privileges")
1468        if not hasattr(posix, 'getgroups'):
1469            raise unittest.SkipTest("need posix.getgroups")
1470        if sys.platform == 'darwin':
1471            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1472        self.saved_groups = posix.getgroups()
1473
1474    def tearDown(self):
1475        if hasattr(posix, 'setgroups'):
1476            posix.setgroups(self.saved_groups)
1477        elif hasattr(posix, 'initgroups'):
1478            name = pwd.getpwuid(posix.getuid()).pw_name
1479            posix.initgroups(name, self.saved_groups[0])
1480
1481    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1482                         "test needs posix.initgroups()")
1483    def test_initgroups(self):
1484        # find missing group
1485
1486        g = max(self.saved_groups or [0]) + 1
1487        name = pwd.getpwuid(posix.getuid()).pw_name
1488        posix.initgroups(name, g)
1489        self.assertIn(g, posix.getgroups())
1490
1491    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1492                         "test needs posix.setgroups()")
1493    def test_setgroups(self):
1494        for groups in [[0], list(range(16))]:
1495            posix.setgroups(groups)
1496            self.assertListEqual(groups, posix.getgroups())
1497
1498
1499class _PosixSpawnMixin:
1500    # Program which does nothing and exits with status 0 (success)
1501    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1502    spawn_func = None
1503
1504    def python_args(self, *args):
1505        # Disable site module to avoid side effects. For example,
1506        # on Fedora 28, if the HOME environment variable is not set,
1507        # site._getuserbase() calls pwd.getpwuid() which opens
1508        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1509        # test_close_file() to fail.
1510        return (sys.executable, '-I', '-S', *args)
1511
1512    def test_returns_pid(self):
1513        pidfile = support.TESTFN
1514        self.addCleanup(support.unlink, pidfile)
1515        script = f"""if 1:
1516            import os
1517            with open({pidfile!r}, "w") as pidfile:
1518                pidfile.write(str(os.getpid()))
1519            """
1520        args = self.python_args('-c', script)
1521        pid = self.spawn_func(args[0], args, os.environ)
1522        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1523        with open(pidfile) as f:
1524            self.assertEqual(f.read(), str(pid))
1525
1526    def test_no_such_executable(self):
1527        no_such_executable = 'no_such_executable'
1528        try:
1529            pid = self.spawn_func(no_such_executable,
1530                                  [no_such_executable],
1531                                  os.environ)
1532        # bpo-35794: PermissionError can be raised if there are
1533        # directories in the $PATH that are not accessible.
1534        except (FileNotFoundError, PermissionError) as exc:
1535            self.assertEqual(exc.filename, no_such_executable)
1536        else:
1537            pid2, status = os.waitpid(pid, 0)
1538            self.assertEqual(pid2, pid)
1539            self.assertNotEqual(status, 0)
1540
1541    def test_specify_environment(self):
1542        envfile = support.TESTFN
1543        self.addCleanup(support.unlink, envfile)
1544        script = f"""if 1:
1545            import os
1546            with open({envfile!r}, "w") as envfile:
1547                envfile.write(os.environ['foo'])
1548        """
1549        args = self.python_args('-c', script)
1550        pid = self.spawn_func(args[0], args,
1551                              {**os.environ, 'foo': 'bar'})
1552        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1553        with open(envfile) as f:
1554            self.assertEqual(f.read(), 'bar')
1555
1556    def test_none_file_actions(self):
1557        pid = self.spawn_func(
1558            self.NOOP_PROGRAM[0],
1559            self.NOOP_PROGRAM,
1560            os.environ,
1561            file_actions=None
1562        )
1563        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1564
1565    def test_empty_file_actions(self):
1566        pid = self.spawn_func(
1567            self.NOOP_PROGRAM[0],
1568            self.NOOP_PROGRAM,
1569            os.environ,
1570            file_actions=[]
1571        )
1572        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1573
1574    def test_resetids_explicit_default(self):
1575        pid = self.spawn_func(
1576            sys.executable,
1577            [sys.executable, '-c', 'pass'],
1578            os.environ,
1579            resetids=False
1580        )
1581        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1582
1583    def test_resetids(self):
1584        pid = self.spawn_func(
1585            sys.executable,
1586            [sys.executable, '-c', 'pass'],
1587            os.environ,
1588            resetids=True
1589        )
1590        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1591
1592    def test_resetids_wrong_type(self):
1593        with self.assertRaises(TypeError):
1594            self.spawn_func(sys.executable,
1595                            [sys.executable, "-c", "pass"],
1596                            os.environ, resetids=None)
1597
1598    def test_setpgroup(self):
1599        pid = self.spawn_func(
1600            sys.executable,
1601            [sys.executable, '-c', 'pass'],
1602            os.environ,
1603            setpgroup=os.getpgrp()
1604        )
1605        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1606
1607    def test_setpgroup_wrong_type(self):
1608        with self.assertRaises(TypeError):
1609            self.spawn_func(sys.executable,
1610                            [sys.executable, "-c", "pass"],
1611                            os.environ, setpgroup="023")
1612
1613    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1614                           'need signal.pthread_sigmask()')
1615    def test_setsigmask(self):
1616        code = textwrap.dedent("""\
1617            import signal
1618            signal.raise_signal(signal.SIGUSR1)""")
1619
1620        pid = self.spawn_func(
1621            sys.executable,
1622            [sys.executable, '-c', code],
1623            os.environ,
1624            setsigmask=[signal.SIGUSR1]
1625        )
1626        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1627
1628    def test_setsigmask_wrong_type(self):
1629        with self.assertRaises(TypeError):
1630            self.spawn_func(sys.executable,
1631                            [sys.executable, "-c", "pass"],
1632                            os.environ, setsigmask=34)
1633        with self.assertRaises(TypeError):
1634            self.spawn_func(sys.executable,
1635                            [sys.executable, "-c", "pass"],
1636                            os.environ, setsigmask=["j"])
1637        with self.assertRaises(ValueError):
1638            self.spawn_func(sys.executable,
1639                            [sys.executable, "-c", "pass"],
1640                            os.environ, setsigmask=[signal.NSIG,
1641                                                    signal.NSIG+1])
1642
1643    def test_setsid(self):
1644        rfd, wfd = os.pipe()
1645        self.addCleanup(os.close, rfd)
1646        try:
1647            os.set_inheritable(wfd, True)
1648
1649            code = textwrap.dedent(f"""
1650                import os
1651                fd = {wfd}
1652                sid = os.getsid(0)
1653                os.write(fd, str(sid).encode())
1654            """)
1655
1656            try:
1657                pid = self.spawn_func(sys.executable,
1658                                      [sys.executable, "-c", code],
1659                                      os.environ, setsid=True)
1660            except NotImplementedError as exc:
1661                self.skipTest(f"setsid is not supported: {exc!r}")
1662            except PermissionError as exc:
1663                self.skipTest(f"setsid failed with: {exc!r}")
1664        finally:
1665            os.close(wfd)
1666
1667        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1668        output = os.read(rfd, 100)
1669        child_sid = int(output)
1670        parent_sid = os.getsid(os.getpid())
1671        self.assertNotEqual(parent_sid, child_sid)
1672
1673    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1674                         'need signal.pthread_sigmask()')
1675    def test_setsigdef(self):
1676        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1677        code = textwrap.dedent("""\
1678            import signal
1679            signal.raise_signal(signal.SIGUSR1)""")
1680        try:
1681            pid = self.spawn_func(
1682                sys.executable,
1683                [sys.executable, '-c', code],
1684                os.environ,
1685                setsigdef=[signal.SIGUSR1]
1686            )
1687        finally:
1688            signal.signal(signal.SIGUSR1, original_handler)
1689
1690        pid2, status = os.waitpid(pid, 0)
1691        self.assertEqual(pid2, pid)
1692        self.assertTrue(os.WIFSIGNALED(status), status)
1693        self.assertEqual(os.WTERMSIG(status), signal.SIGUSR1)
1694
1695    def test_setsigdef_wrong_type(self):
1696        with self.assertRaises(TypeError):
1697            self.spawn_func(sys.executable,
1698                            [sys.executable, "-c", "pass"],
1699                            os.environ, setsigdef=34)
1700        with self.assertRaises(TypeError):
1701            self.spawn_func(sys.executable,
1702                            [sys.executable, "-c", "pass"],
1703                            os.environ, setsigdef=["j"])
1704        with self.assertRaises(ValueError):
1705            self.spawn_func(sys.executable,
1706                            [sys.executable, "-c", "pass"],
1707                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1708
1709    @requires_sched
1710    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1711                     "bpo-34685: test can fail on BSD")
1712    def test_setscheduler_only_param(self):
1713        policy = os.sched_getscheduler(0)
1714        priority = os.sched_get_priority_min(policy)
1715        code = textwrap.dedent(f"""\
1716            import os, sys
1717            if os.sched_getscheduler(0) != {policy}:
1718                sys.exit(101)
1719            if os.sched_getparam(0).sched_priority != {priority}:
1720                sys.exit(102)""")
1721        pid = self.spawn_func(
1722            sys.executable,
1723            [sys.executable, '-c', code],
1724            os.environ,
1725            scheduler=(None, os.sched_param(priority))
1726        )
1727        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1728
1729    @requires_sched
1730    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1731                     "bpo-34685: test can fail on BSD")
1732    def test_setscheduler_with_policy(self):
1733        policy = os.sched_getscheduler(0)
1734        priority = os.sched_get_priority_min(policy)
1735        code = textwrap.dedent(f"""\
1736            import os, sys
1737            if os.sched_getscheduler(0) != {policy}:
1738                sys.exit(101)
1739            if os.sched_getparam(0).sched_priority != {priority}:
1740                sys.exit(102)""")
1741        pid = self.spawn_func(
1742            sys.executable,
1743            [sys.executable, '-c', code],
1744            os.environ,
1745            scheduler=(policy, os.sched_param(priority))
1746        )
1747        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1748
1749    def test_multiple_file_actions(self):
1750        file_actions = [
1751            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1752            (os.POSIX_SPAWN_CLOSE, 0),
1753            (os.POSIX_SPAWN_DUP2, 1, 4),
1754        ]
1755        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1756                              self.NOOP_PROGRAM,
1757                              os.environ,
1758                              file_actions=file_actions)
1759        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1760
1761    def test_bad_file_actions(self):
1762        args = self.NOOP_PROGRAM
1763        with self.assertRaises(TypeError):
1764            self.spawn_func(args[0], args, os.environ,
1765                            file_actions=[None])
1766        with self.assertRaises(TypeError):
1767            self.spawn_func(args[0], args, os.environ,
1768                            file_actions=[()])
1769        with self.assertRaises(TypeError):
1770            self.spawn_func(args[0], args, os.environ,
1771                            file_actions=[(None,)])
1772        with self.assertRaises(TypeError):
1773            self.spawn_func(args[0], args, os.environ,
1774                            file_actions=[(12345,)])
1775        with self.assertRaises(TypeError):
1776            self.spawn_func(args[0], args, os.environ,
1777                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
1778        with self.assertRaises(TypeError):
1779            self.spawn_func(args[0], args, os.environ,
1780                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
1781        with self.assertRaises(TypeError):
1782            self.spawn_func(args[0], args, os.environ,
1783                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
1784        with self.assertRaises(ValueError):
1785            self.spawn_func(args[0], args, os.environ,
1786                            file_actions=[(os.POSIX_SPAWN_OPEN,
1787                                           3, __file__ + '\0',
1788                                           os.O_RDONLY, 0)])
1789
1790    def test_open_file(self):
1791        outfile = support.TESTFN
1792        self.addCleanup(support.unlink, outfile)
1793        script = """if 1:
1794            import sys
1795            sys.stdout.write("hello")
1796            """
1797        file_actions = [
1798            (os.POSIX_SPAWN_OPEN, 1, outfile,
1799                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
1800                stat.S_IRUSR | stat.S_IWUSR),
1801        ]
1802        args = self.python_args('-c', script)
1803        pid = self.spawn_func(args[0], args, os.environ,
1804                              file_actions=file_actions)
1805        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1806        with open(outfile) as f:
1807            self.assertEqual(f.read(), 'hello')
1808
1809    def test_close_file(self):
1810        closefile = support.TESTFN
1811        self.addCleanup(support.unlink, closefile)
1812        script = f"""if 1:
1813            import os
1814            try:
1815                os.fstat(0)
1816            except OSError as e:
1817                with open({closefile!r}, 'w') as closefile:
1818                    closefile.write('is closed %d' % e.errno)
1819            """
1820        args = self.python_args('-c', script)
1821        pid = self.spawn_func(args[0], args, os.environ,
1822                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
1823        self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1824        with open(closefile) as f:
1825            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
1826
1827    def test_dup2(self):
1828        dupfile = support.TESTFN
1829        self.addCleanup(support.unlink, dupfile)
1830        script = """if 1:
1831            import sys
1832            sys.stdout.write("hello")
1833            """
1834        with open(dupfile, "wb") as childfile:
1835            file_actions = [
1836                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
1837            ]
1838            args = self.python_args('-c', script)
1839            pid = self.spawn_func(args[0], args, os.environ,
1840                                  file_actions=file_actions)
1841            self.assertEqual(os.waitpid(pid, 0), (pid, 0))
1842        with open(dupfile) as f:
1843            self.assertEqual(f.read(), 'hello')
1844
1845
1846@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
1847class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
1848    spawn_func = getattr(posix, 'posix_spawn', None)
1849
1850
1851@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
1852class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
1853    spawn_func = getattr(posix, 'posix_spawnp', None)
1854
1855    @support.skip_unless_symlink
1856    def test_posix_spawnp(self):
1857        # Use a symlink to create a program in its own temporary directory
1858        temp_dir = tempfile.mkdtemp()
1859        self.addCleanup(support.rmtree, temp_dir)
1860
1861        program = 'posix_spawnp_test_program.exe'
1862        program_fullpath = os.path.join(temp_dir, program)
1863        os.symlink(sys.executable, program_fullpath)
1864
1865        try:
1866            path = os.pathsep.join((temp_dir, os.environ['PATH']))
1867        except KeyError:
1868            path = temp_dir   # PATH is not set
1869
1870        spawn_args = (program, '-I', '-S', '-c', 'pass')
1871        code = textwrap.dedent("""
1872            import os
1873            args = %a
1874            pid = os.posix_spawnp(args[0], args, os.environ)
1875            pid2, status = os.waitpid(pid, 0)
1876            if pid2 != pid:
1877                raise Exception(f"pid {pid2} != {pid}")
1878            if status != 0:
1879                raise Exception(f"status {status} != 0")
1880        """ % (spawn_args,))
1881
1882        # Use a subprocess to test os.posix_spawnp() with a modified PATH
1883        # environment variable: posix_spawnp() uses the current environment
1884        # to locate the program, not its environment argument.
1885        args = ('-c', code)
1886        assert_python_ok(*args, PATH=path)
1887
1888
1889def test_main():
1890    try:
1891        support.run_unittest(
1892            PosixTester,
1893            PosixGroupsTester,
1894            TestPosixSpawn,
1895            TestPosixSpawnP,
1896        )
1897    finally:
1898        support.reap_children()
1899
1900if __name__ == '__main__':
1901    test_main()
1902