• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"Test posix functions"
2
3from test import support
4from test.support import import_helper
5from test.support import os_helper
6from test.support import warnings_helper
7from test.support.script_helper import assert_python_ok
8
9# Skip these tests if there is no posix module.
10posix = import_helper.import_module('posix')
11
12import errno
13import sys
14import signal
15import time
16import os
17import platform
18import pwd
19import stat
20import tempfile
21import unittest
22import warnings
23import textwrap
24from contextlib import contextmanager
25
26_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
27                              os_helper.TESTFN + '-dummy-symlink')
28
29requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
30        'test is only meaningful on 32-bit builds')
31
32def _supports_sched():
33    if not hasattr(posix, 'sched_getscheduler'):
34        return False
35    try:
36        posix.sched_getscheduler(0)
37    except OSError as e:
38        if e.errno == errno.ENOSYS:
39            return False
40    return True
41
42requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
43
44
45class PosixTester(unittest.TestCase):
46
47    def setUp(self):
48        # create empty file
49        with open(os_helper.TESTFN, "wb"):
50            pass
51        self.teardown_files = [ os_helper.TESTFN ]
52        self._warnings_manager = warnings_helper.check_warnings()
53        self._warnings_manager.__enter__()
54        warnings.filterwarnings('ignore', '.* potential security risk .*',
55                                RuntimeWarning)
56
57    def tearDown(self):
58        for teardown_file in self.teardown_files:
59            os_helper.unlink(teardown_file)
60        self._warnings_manager.__exit__(None, None, None)
61
62    def testNoArgFunctions(self):
63        # test posix functions which take no arguments and have
64        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
65        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
66                             "times", "getloadavg",
67                             "getegid", "geteuid", "getgid", "getgroups",
68                             "getpid", "getpgrp", "getppid", "getuid", "sync",
69                           ]
70
71        for name in NO_ARG_FUNCTIONS:
72            posix_func = getattr(posix, name, None)
73            if posix_func is not None:
74                posix_func()
75                self.assertRaises(TypeError, posix_func, 1)
76
77    @unittest.skipUnless(hasattr(posix, 'getresuid'),
78                         'test needs posix.getresuid()')
79    def test_getresuid(self):
80        user_ids = posix.getresuid()
81        self.assertEqual(len(user_ids), 3)
82        for val in user_ids:
83            self.assertGreaterEqual(val, 0)
84
85    @unittest.skipUnless(hasattr(posix, 'getresgid'),
86                         'test needs posix.getresgid()')
87    def test_getresgid(self):
88        group_ids = posix.getresgid()
89        self.assertEqual(len(group_ids), 3)
90        for val in group_ids:
91            self.assertGreaterEqual(val, 0)
92
93    @unittest.skipUnless(hasattr(posix, 'setresuid'),
94                         'test needs posix.setresuid()')
95    def test_setresuid(self):
96        current_user_ids = posix.getresuid()
97        self.assertIsNone(posix.setresuid(*current_user_ids))
98        # -1 means don't change that value.
99        self.assertIsNone(posix.setresuid(-1, -1, -1))
100
101    @unittest.skipUnless(hasattr(posix, 'setresuid'),
102                         'test needs posix.setresuid()')
103    def test_setresuid_exception(self):
104        # Don't do this test if someone is silly enough to run us as root.
105        current_user_ids = posix.getresuid()
106        if 0 not in current_user_ids:
107            new_user_ids = (current_user_ids[0]+1, -1, -1)
108            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
109
110    @unittest.skipUnless(hasattr(posix, 'setresgid'),
111                         'test needs posix.setresgid()')
112    def test_setresgid(self):
113        current_group_ids = posix.getresgid()
114        self.assertIsNone(posix.setresgid(*current_group_ids))
115        # -1 means don't change that value.
116        self.assertIsNone(posix.setresgid(-1, -1, -1))
117
118    @unittest.skipUnless(hasattr(posix, 'setresgid'),
119                         'test needs posix.setresgid()')
120    def test_setresgid_exception(self):
121        # Don't do this test if someone is silly enough to run us as root.
122        current_group_ids = posix.getresgid()
123        if 0 not in current_group_ids:
124            new_group_ids = (current_group_ids[0]+1, -1, -1)
125            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
126
127    @unittest.skipUnless(hasattr(posix, 'initgroups'),
128                         "test needs os.initgroups()")
129    def test_initgroups(self):
130        # It takes a string and an integer; check that it raises a TypeError
131        # for other argument lists.
132        self.assertRaises(TypeError, posix.initgroups)
133        self.assertRaises(TypeError, posix.initgroups, None)
134        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
135        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
136
137        # If a non-privileged user invokes it, it should fail with OSError
138        # EPERM.
139        if os.getuid() != 0:
140            try:
141                name = pwd.getpwuid(posix.getuid()).pw_name
142            except KeyError:
143                # the current UID may not have a pwd entry
144                raise unittest.SkipTest("need a pwd entry")
145            try:
146                posix.initgroups(name, 13)
147            except OSError as e:
148                self.assertEqual(e.errno, errno.EPERM)
149            else:
150                self.fail("Expected OSError to be raised by initgroups")
151
152    @unittest.skipUnless(hasattr(posix, 'statvfs'),
153                         'test needs posix.statvfs()')
154    def test_statvfs(self):
155        self.assertTrue(posix.statvfs(os.curdir))
156
157    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
158                         'test needs posix.fstatvfs()')
159    def test_fstatvfs(self):
160        fp = open(os_helper.TESTFN)
161        try:
162            self.assertTrue(posix.fstatvfs(fp.fileno()))
163            self.assertTrue(posix.statvfs(fp.fileno()))
164        finally:
165            fp.close()
166
167    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
168                         'test needs posix.ftruncate()')
169    def test_ftruncate(self):
170        fp = open(os_helper.TESTFN, 'w+')
171        try:
172            # we need to have some data to truncate
173            fp.write('test')
174            fp.flush()
175            posix.ftruncate(fp.fileno(), 0)
176        finally:
177            fp.close()
178
179    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
180    def test_truncate(self):
181        with open(os_helper.TESTFN, 'w') as fp:
182            fp.write('test')
183            fp.flush()
184        posix.truncate(os_helper.TESTFN, 0)
185
186    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
187    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
188    def test_fexecve(self):
189        fp = os.open(sys.executable, os.O_RDONLY)
190        try:
191            pid = os.fork()
192            if pid == 0:
193                os.chdir(os.path.split(sys.executable)[0])
194                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
195            else:
196                support.wait_process(pid, exitcode=0)
197        finally:
198            os.close(fp)
199
200
201    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
202    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
203    def test_waitid(self):
204        pid = os.fork()
205        if pid == 0:
206            os.chdir(os.path.split(sys.executable)[0])
207            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
208        else:
209            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
210            self.assertEqual(pid, res.si_pid)
211
212    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
213    def test_register_at_fork(self):
214        with self.assertRaises(TypeError, msg="Positional args not allowed"):
215            os.register_at_fork(lambda: None)
216        with self.assertRaises(TypeError, msg="Args must be callable"):
217            os.register_at_fork(before=2)
218        with self.assertRaises(TypeError, msg="Args must be callable"):
219            os.register_at_fork(after_in_child="three")
220        with self.assertRaises(TypeError, msg="Args must be callable"):
221            os.register_at_fork(after_in_parent=b"Five")
222        with self.assertRaises(TypeError, msg="Args must not be None"):
223            os.register_at_fork(before=None)
224        with self.assertRaises(TypeError, msg="Args must not be None"):
225            os.register_at_fork(after_in_child=None)
226        with self.assertRaises(TypeError, msg="Args must not be None"):
227            os.register_at_fork(after_in_parent=None)
228        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
229            # Ensure a combination of valid and invalid is an error.
230            os.register_at_fork(before=None, after_in_parent=lambda: 3)
231        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
232            # Ensure a combination of valid and invalid is an error.
233            os.register_at_fork(before=lambda: None, after_in_child='')
234        # We test actual registrations in their own process so as not to
235        # pollute this one.  There is no way to unregister for cleanup.
236        code = """if 1:
237            import os
238
239            r, w = os.pipe()
240            fin_r, fin_w = os.pipe()
241
242            os.register_at_fork(before=lambda: os.write(w, b'A'))
243            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
244            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
245            os.register_at_fork(before=lambda: os.write(w, b'B'),
246                                after_in_parent=lambda: os.write(w, b'D'),
247                                after_in_child=lambda: os.write(w, b'F'))
248
249            pid = os.fork()
250            if pid == 0:
251                # At this point, after-forkers have already been executed
252                os.close(w)
253                # Wait for parent to tell us to exit
254                os.read(fin_r, 1)
255                os._exit(0)
256            else:
257                try:
258                    os.close(w)
259                    with open(r, "rb") as f:
260                        data = f.read()
261                        assert len(data) == 6, data
262                        # Check before-fork callbacks
263                        assert data[:2] == b'BA', data
264                        # Check after-fork callbacks
265                        assert sorted(data[2:]) == list(b'CDEF'), data
266                        assert data.index(b'C') < data.index(b'D'), data
267                        assert data.index(b'E') < data.index(b'F'), data
268                finally:
269                    os.write(fin_w, b'!')
270            """
271        assert_python_ok('-c', code)
272
273    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
274    def test_lockf(self):
275        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
276        try:
277            os.write(fd, b'test')
278            os.lseek(fd, 0, os.SEEK_SET)
279            posix.lockf(fd, posix.F_LOCK, 4)
280            # section is locked
281            posix.lockf(fd, posix.F_ULOCK, 4)
282        finally:
283            os.close(fd)
284
285    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
286    def test_pread(self):
287        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
288        try:
289            os.write(fd, b'test')
290            os.lseek(fd, 0, os.SEEK_SET)
291            self.assertEqual(b'es', posix.pread(fd, 2, 1))
292            # the first pread() shouldn't disturb the file offset
293            self.assertEqual(b'te', posix.read(fd, 2))
294        finally:
295            os.close(fd)
296
297    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
298    def test_preadv(self):
299        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
300        try:
301            os.write(fd, b'test1tt2t3t5t6t6t8')
302            buf = [bytearray(i) for i in [5, 3, 2]]
303            self.assertEqual(posix.preadv(fd, buf, 3), 10)
304            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
305        finally:
306            os.close(fd)
307
308    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
309    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
310    def test_preadv_flags(self):
311        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
312        try:
313            os.write(fd, b'test1tt2t3t5t6t6t8')
314            buf = [bytearray(i) for i in [5, 3, 2]]
315            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
316            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
317        except NotImplementedError:
318            self.skipTest("preadv2 not available")
319        except OSError as inst:
320            # Is possible that the macro RWF_HIPRI was defined at compilation time
321            # but the option is not supported by the kernel or the runtime libc shared
322            # library.
323            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
324                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
325            else:
326                raise
327        finally:
328            os.close(fd)
329
330    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
331    @requires_32b
332    def test_preadv_overflow_32bits(self):
333        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
334        try:
335            buf = [bytearray(2**16)] * 2**15
336            with self.assertRaises(OSError) as cm:
337                os.preadv(fd, buf, 0)
338            self.assertEqual(cm.exception.errno, errno.EINVAL)
339            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
340        finally:
341            os.close(fd)
342
343    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
344    def test_pwrite(self):
345        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
346        try:
347            os.write(fd, b'test')
348            os.lseek(fd, 0, os.SEEK_SET)
349            posix.pwrite(fd, b'xx', 1)
350            self.assertEqual(b'txxt', posix.read(fd, 4))
351        finally:
352            os.close(fd)
353
354    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
355    def test_pwritev(self):
356        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
357        try:
358            os.write(fd, b"xx")
359            os.lseek(fd, 0, os.SEEK_SET)
360            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
361            self.assertEqual(n, 10)
362
363            os.lseek(fd, 0, os.SEEK_SET)
364            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
365        finally:
366            os.close(fd)
367
368    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
369    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
370    def test_pwritev_flags(self):
371        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
372        try:
373            os.write(fd,b"xx")
374            os.lseek(fd, 0, os.SEEK_SET)
375            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
376            self.assertEqual(n, 10)
377
378            os.lseek(fd, 0, os.SEEK_SET)
379            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
380        finally:
381            os.close(fd)
382
383    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
384    @requires_32b
385    def test_pwritev_overflow_32bits(self):
386        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
387        try:
388            with self.assertRaises(OSError) as cm:
389                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
390            self.assertEqual(cm.exception.errno, errno.EINVAL)
391        finally:
392            os.close(fd)
393
394    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
395        "test needs posix.posix_fallocate()")
396    def test_posix_fallocate(self):
397        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
398        try:
399            posix.posix_fallocate(fd, 0, 10)
400        except OSError as inst:
401            # issue10812, ZFS doesn't appear to support posix_fallocate,
402            # so skip Solaris-based since they are likely to have ZFS.
403            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
404            # often used there.
405            if inst.errno == errno.EINVAL and sys.platform.startswith(
406                ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')):
407                raise unittest.SkipTest("test may fail on ZFS filesystems")
408            else:
409                raise
410        finally:
411            os.close(fd)
412
413    # issue31106 - posix_fallocate() does not set error in errno.
414    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
415        "test needs posix.posix_fallocate()")
416    def test_posix_fallocate_errno(self):
417        try:
418            posix.posix_fallocate(-42, 0, 10)
419        except OSError as inst:
420            if inst.errno != errno.EBADF:
421                raise
422
423    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
424        "test needs posix.posix_fadvise()")
425    def test_posix_fadvise(self):
426        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
427        try:
428            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
429        finally:
430            os.close(fd)
431
432    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
433        "test needs posix.posix_fadvise()")
434    def test_posix_fadvise_errno(self):
435        try:
436            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
437        except OSError as inst:
438            if inst.errno != errno.EBADF:
439                raise
440
441    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
442    def test_utime_with_fd(self):
443        now = time.time()
444        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
445        try:
446            posix.utime(fd)
447            posix.utime(fd, None)
448            self.assertRaises(TypeError, posix.utime, fd, (None, None))
449            self.assertRaises(TypeError, posix.utime, fd, (now, None))
450            self.assertRaises(TypeError, posix.utime, fd, (None, now))
451            posix.utime(fd, (int(now), int(now)))
452            posix.utime(fd, (now, now))
453            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
454            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
455            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
456            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
457            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
458
459        finally:
460            os.close(fd)
461
462    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
463    def test_utime_nofollow_symlinks(self):
464        now = time.time()
465        posix.utime(os_helper.TESTFN, None, follow_symlinks=False)
466        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
467                          (None, None), follow_symlinks=False)
468        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
469                          (now, None), follow_symlinks=False)
470        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
471                          (None, now), follow_symlinks=False)
472        posix.utime(os_helper.TESTFN, (int(now), int(now)),
473                    follow_symlinks=False)
474        posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False)
475        posix.utime(os_helper.TESTFN, follow_symlinks=False)
476
477    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
478    def test_writev(self):
479        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
480        try:
481            n = os.writev(fd, (b'test1', b'tt2', b't3'))
482            self.assertEqual(n, 10)
483
484            os.lseek(fd, 0, os.SEEK_SET)
485            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
486
487            # Issue #20113: empty list of buffers should not crash
488            try:
489                size = posix.writev(fd, [])
490            except OSError:
491                # writev(fd, []) raises OSError(22, "Invalid argument")
492                # on OpenIndiana
493                pass
494            else:
495                self.assertEqual(size, 0)
496        finally:
497            os.close(fd)
498
499    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
500    @requires_32b
501    def test_writev_overflow_32bits(self):
502        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
503        try:
504            with self.assertRaises(OSError) as cm:
505                os.writev(fd, [b"x" * 2**16] * 2**15)
506            self.assertEqual(cm.exception.errno, errno.EINVAL)
507        finally:
508            os.close(fd)
509
510    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
511    def test_readv(self):
512        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
513        try:
514            os.write(fd, b'test1tt2t3')
515            os.lseek(fd, 0, os.SEEK_SET)
516            buf = [bytearray(i) for i in [5, 3, 2]]
517            self.assertEqual(posix.readv(fd, buf), 10)
518            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
519
520            # Issue #20113: empty list of buffers should not crash
521            try:
522                size = posix.readv(fd, [])
523            except OSError:
524                # readv(fd, []) raises OSError(22, "Invalid argument")
525                # on OpenIndiana
526                pass
527            else:
528                self.assertEqual(size, 0)
529        finally:
530            os.close(fd)
531
532    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
533    @requires_32b
534    def test_readv_overflow_32bits(self):
535        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
536        try:
537            buf = [bytearray(2**16)] * 2**15
538            with self.assertRaises(OSError) as cm:
539                os.readv(fd, buf)
540            self.assertEqual(cm.exception.errno, errno.EINVAL)
541            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
542        finally:
543            os.close(fd)
544
545    @unittest.skipUnless(hasattr(posix, 'dup'),
546                         'test needs posix.dup()')
547    def test_dup(self):
548        fp = open(os_helper.TESTFN)
549        try:
550            fd = posix.dup(fp.fileno())
551            self.assertIsInstance(fd, int)
552            os.close(fd)
553        finally:
554            fp.close()
555
556    @unittest.skipUnless(hasattr(posix, 'confstr'),
557                         'test needs posix.confstr()')
558    def test_confstr(self):
559        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
560        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
561
562    @unittest.skipUnless(hasattr(posix, 'dup2'),
563                         'test needs posix.dup2()')
564    def test_dup2(self):
565        fp1 = open(os_helper.TESTFN)
566        fp2 = open(os_helper.TESTFN)
567        try:
568            posix.dup2(fp1.fileno(), fp2.fileno())
569        finally:
570            fp1.close()
571            fp2.close()
572
573    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
574    @support.requires_linux_version(2, 6, 23)
575    def test_oscloexec(self):
576        fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
577        self.addCleanup(os.close, fd)
578        self.assertFalse(os.get_inheritable(fd))
579
580    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
581                         'test needs posix.O_EXLOCK')
582    def test_osexlock(self):
583        fd = os.open(os_helper.TESTFN,
584                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
585        self.assertRaises(OSError, os.open, os_helper.TESTFN,
586                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
587        os.close(fd)
588
589        if hasattr(posix, "O_SHLOCK"):
590            fd = os.open(os_helper.TESTFN,
591                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
592            self.assertRaises(OSError, os.open, os_helper.TESTFN,
593                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
594            os.close(fd)
595
596    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
597                         'test needs posix.O_SHLOCK')
598    def test_osshlock(self):
599        fd1 = os.open(os_helper.TESTFN,
600                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
601        fd2 = os.open(os_helper.TESTFN,
602                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
603        os.close(fd2)
604        os.close(fd1)
605
606        if hasattr(posix, "O_EXLOCK"):
607            fd = os.open(os_helper.TESTFN,
608                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
609            self.assertRaises(OSError, os.open, os_helper.TESTFN,
610                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
611            os.close(fd)
612
613    @unittest.skipUnless(hasattr(posix, 'fstat'),
614                         'test needs posix.fstat()')
615    def test_fstat(self):
616        fp = open(os_helper.TESTFN)
617        try:
618            self.assertTrue(posix.fstat(fp.fileno()))
619            self.assertTrue(posix.stat(fp.fileno()))
620
621            self.assertRaisesRegex(TypeError,
622                    'should be string, bytes, os.PathLike or integer, not',
623                    posix.stat, float(fp.fileno()))
624        finally:
625            fp.close()
626
627    def test_stat(self):
628        self.assertTrue(posix.stat(os_helper.TESTFN))
629        self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN)))
630
631        self.assertWarnsRegex(DeprecationWarning,
632                'should be string, bytes, os.PathLike or integer, not',
633                posix.stat, bytearray(os.fsencode(os_helper.TESTFN)))
634        self.assertRaisesRegex(TypeError,
635                'should be string, bytes, os.PathLike or integer, not',
636                posix.stat, None)
637        self.assertRaisesRegex(TypeError,
638                'should be string, bytes, os.PathLike or integer, not',
639                posix.stat, list(os_helper.TESTFN))
640        self.assertRaisesRegex(TypeError,
641                'should be string, bytes, os.PathLike or integer, not',
642                posix.stat, list(os.fsencode(os_helper.TESTFN)))
643
644    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
645    def test_mkfifo(self):
646        if sys.platform == "vxworks":
647            fifo_path = os.path.join("/fifos/", os_helper.TESTFN)
648        else:
649            fifo_path = os_helper.TESTFN
650        os_helper.unlink(fifo_path)
651        self.addCleanup(os_helper.unlink, fifo_path)
652        try:
653            posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR)
654        except PermissionError as e:
655            self.skipTest('posix.mkfifo(): %s' % e)
656        self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode))
657
658    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
659                         "don't have mknod()/S_IFIFO")
660    def test_mknod(self):
661        # Test using mknod() to create a FIFO (the only use specified
662        # by POSIX).
663        os_helper.unlink(os_helper.TESTFN)
664        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
665        try:
666            posix.mknod(os_helper.TESTFN, mode, 0)
667        except OSError as e:
668            # Some old systems don't allow unprivileged users to use
669            # mknod(), or only support creating device nodes.
670            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
671        else:
672            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
673
674        # Keyword arguments are also supported
675        os_helper.unlink(os_helper.TESTFN)
676        try:
677            posix.mknod(path=os_helper.TESTFN, mode=mode, device=0,
678                dir_fd=None)
679        except OSError as e:
680            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
681
682    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
683    def test_makedev(self):
684        st = posix.stat(os_helper.TESTFN)
685        dev = st.st_dev
686        self.assertIsInstance(dev, int)
687        self.assertGreaterEqual(dev, 0)
688
689        major = posix.major(dev)
690        self.assertIsInstance(major, int)
691        self.assertGreaterEqual(major, 0)
692        self.assertEqual(posix.major(dev), major)
693        self.assertRaises(TypeError, posix.major, float(dev))
694        self.assertRaises(TypeError, posix.major)
695        self.assertRaises((ValueError, OverflowError), posix.major, -1)
696
697        minor = posix.minor(dev)
698        self.assertIsInstance(minor, int)
699        self.assertGreaterEqual(minor, 0)
700        self.assertEqual(posix.minor(dev), minor)
701        self.assertRaises(TypeError, posix.minor, float(dev))
702        self.assertRaises(TypeError, posix.minor)
703        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
704
705        self.assertEqual(posix.makedev(major, minor), dev)
706        self.assertRaises(TypeError, posix.makedev, float(major), minor)
707        self.assertRaises(TypeError, posix.makedev, major, float(minor))
708        self.assertRaises(TypeError, posix.makedev, major)
709        self.assertRaises(TypeError, posix.makedev)
710
711    def _test_all_chown_common(self, chown_func, first_param, stat_func):
712        """Common code for chown, fchown and lchown tests."""
713        def check_stat(uid, gid):
714            if stat_func is not None:
715                stat = stat_func(first_param)
716                self.assertEqual(stat.st_uid, uid)
717                self.assertEqual(stat.st_gid, gid)
718        uid = os.getuid()
719        gid = os.getgid()
720        # test a successful chown call
721        chown_func(first_param, uid, gid)
722        check_stat(uid, gid)
723        chown_func(first_param, -1, gid)
724        check_stat(uid, gid)
725        chown_func(first_param, uid, -1)
726        check_stat(uid, gid)
727
728        if sys.platform == "vxworks":
729            # On VxWorks, root user id is 1 and 0 means no login user:
730            # both are super users.
731            is_root = (uid in (0, 1))
732        else:
733            is_root = (uid == 0)
734        if is_root:
735            # Try an amusingly large uid/gid to make sure we handle
736            # large unsigned values.  (chown lets you use any
737            # uid/gid you like, even if they aren't defined.)
738            #
739            # On VxWorks uid_t is defined as unsigned short. A big
740            # value greater than 65535 will result in underflow error.
741            #
742            # This problem keeps coming up:
743            #   http://bugs.python.org/issue1747858
744            #   http://bugs.python.org/issue4591
745            #   http://bugs.python.org/issue15301
746            # Hopefully the fix in 4591 fixes it for good!
747            #
748            # This part of the test only runs when run as root.
749            # Only scary people run their tests as root.
750
751            big_value = (2**31 if sys.platform != "vxworks" else 2**15)
752            chown_func(first_param, big_value, big_value)
753            check_stat(big_value, big_value)
754            chown_func(first_param, -1, -1)
755            check_stat(big_value, big_value)
756            chown_func(first_param, uid, gid)
757            check_stat(uid, gid)
758        elif platform.system() in ('HP-UX', 'SunOS'):
759            # HP-UX and Solaris can allow a non-root user to chown() to root
760            # (issue #5113)
761            raise unittest.SkipTest("Skipping because of non-standard chown() "
762                                    "behavior")
763        else:
764            # non-root cannot chown to root, raises OSError
765            self.assertRaises(OSError, chown_func, first_param, 0, 0)
766            check_stat(uid, gid)
767            self.assertRaises(OSError, chown_func, first_param, 0, -1)
768            check_stat(uid, gid)
769            if 0 not in os.getgroups():
770                self.assertRaises(OSError, chown_func, first_param, -1, 0)
771                check_stat(uid, gid)
772        # test illegal types
773        for t in str, float:
774            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
775            check_stat(uid, gid)
776            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
777            check_stat(uid, gid)
778
779    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
780    def test_chown(self):
781        # raise an OSError if the file does not exist
782        os.unlink(os_helper.TESTFN)
783        self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1)
784
785        # re-create the file
786        os_helper.create_empty_file(os_helper.TESTFN)
787        self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat)
788
789    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
790    def test_fchown(self):
791        os.unlink(os_helper.TESTFN)
792
793        # re-create the file
794        test_file = open(os_helper.TESTFN, 'w')
795        try:
796            fd = test_file.fileno()
797            self._test_all_chown_common(posix.fchown, fd,
798                                        getattr(posix, 'fstat', None))
799        finally:
800            test_file.close()
801
802    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
803    def test_lchown(self):
804        os.unlink(os_helper.TESTFN)
805        # create a symlink
806        os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN)
807        self._test_all_chown_common(posix.lchown, os_helper.TESTFN,
808                                    getattr(posix, 'lstat', None))
809
810    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
811    def test_chdir(self):
812        posix.chdir(os.curdir)
813        self.assertRaises(OSError, posix.chdir, os_helper.TESTFN)
814
815    def test_listdir(self):
816        self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir))
817
818    def test_listdir_default(self):
819        # When listdir is called without argument,
820        # it's the same as listdir(os.curdir).
821        self.assertIn(os_helper.TESTFN, posix.listdir())
822
823    def test_listdir_bytes(self):
824        # When listdir is called with a bytes object,
825        # the returned strings are of type bytes.
826        self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.'))
827
828    def test_listdir_bytes_like(self):
829        for cls in bytearray, memoryview:
830            with self.assertWarns(DeprecationWarning):
831                names = posix.listdir(cls(b'.'))
832            self.assertIn(os.fsencode(os_helper.TESTFN), names)
833            for name in names:
834                self.assertIs(type(name), bytes)
835
836    @unittest.skipUnless(posix.listdir in os.supports_fd,
837                         "test needs fd support for posix.listdir()")
838    def test_listdir_fd(self):
839        f = posix.open(posix.getcwd(), posix.O_RDONLY)
840        self.addCleanup(posix.close, f)
841        self.assertEqual(
842            sorted(posix.listdir('.')),
843            sorted(posix.listdir(f))
844            )
845        # Check that the fd offset was reset (issue #13739)
846        self.assertEqual(
847            sorted(posix.listdir('.')),
848            sorted(posix.listdir(f))
849            )
850
851    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
852    def test_access(self):
853        self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK))
854
855    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
856    def test_umask(self):
857        old_mask = posix.umask(0)
858        self.assertIsInstance(old_mask, int)
859        posix.umask(old_mask)
860
861    @unittest.skipUnless(hasattr(posix, 'strerror'),
862                         'test needs posix.strerror()')
863    def test_strerror(self):
864        self.assertTrue(posix.strerror(0))
865
866    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
867    def test_pipe(self):
868        reader, writer = posix.pipe()
869        os.close(reader)
870        os.close(writer)
871
872    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
873    @support.requires_linux_version(2, 6, 27)
874    def test_pipe2(self):
875        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
876        self.assertRaises(TypeError, os.pipe2, 0, 0)
877
878        # try calling with flags = 0, like os.pipe()
879        r, w = os.pipe2(0)
880        os.close(r)
881        os.close(w)
882
883        # test flags
884        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
885        self.addCleanup(os.close, r)
886        self.addCleanup(os.close, w)
887        self.assertFalse(os.get_inheritable(r))
888        self.assertFalse(os.get_inheritable(w))
889        self.assertFalse(os.get_blocking(r))
890        self.assertFalse(os.get_blocking(w))
891        # try reading from an empty pipe: this should fail, not block
892        self.assertRaises(OSError, os.read, r, 1)
893        # try a write big enough to fill-up the pipe: this should either
894        # fail or perform a partial write, not block
895        try:
896            os.write(w, b'x' * support.PIPE_MAX_SIZE)
897        except OSError:
898            pass
899
900    @support.cpython_only
901    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
902    @support.requires_linux_version(2, 6, 27)
903    def test_pipe2_c_limits(self):
904        # Issue 15989
905        import _testcapi
906        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
907        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
908
909    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
910    def test_utime(self):
911        now = time.time()
912        posix.utime(os_helper.TESTFN, None)
913        self.assertRaises(TypeError, posix.utime,
914                          os_helper.TESTFN, (None, None))
915        self.assertRaises(TypeError, posix.utime,
916                          os_helper.TESTFN, (now, None))
917        self.assertRaises(TypeError, posix.utime,
918                          os_helper.TESTFN, (None, now))
919        posix.utime(os_helper.TESTFN, (int(now), int(now)))
920        posix.utime(os_helper.TESTFN, (now, now))
921
922    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
923        st = os.stat(target_file)
924        self.assertTrue(hasattr(st, 'st_flags'))
925
926        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
927        flags = st.st_flags | stat.UF_IMMUTABLE
928        try:
929            chflags_func(target_file, flags, **kwargs)
930        except OSError as err:
931            if err.errno != errno.EOPNOTSUPP:
932                raise
933            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
934            self.skipTest(msg)
935
936        try:
937            new_st = os.stat(target_file)
938            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
939            try:
940                fd = open(target_file, 'w+')
941            except OSError as e:
942                self.assertEqual(e.errno, errno.EPERM)
943        finally:
944            posix.chflags(target_file, st.st_flags)
945
946    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
947    def test_chflags(self):
948        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN)
949
950    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
951    def test_lchflags_regular_file(self):
952        self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN)
953        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN,
954                                        follow_symlinks=False)
955
956    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
957    def test_lchflags_symlink(self):
958        testfn_st = os.stat(os_helper.TESTFN)
959
960        self.assertTrue(hasattr(testfn_st, 'st_flags'))
961
962        os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK)
963        self.teardown_files.append(_DUMMY_SYMLINK)
964        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
965
966        def chflags_nofollow(path, flags):
967            return posix.chflags(path, flags, follow_symlinks=False)
968
969        for fn in (posix.lchflags, chflags_nofollow):
970            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
971            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
972            try:
973                fn(_DUMMY_SYMLINK, flags)
974            except OSError as err:
975                if err.errno != errno.EOPNOTSUPP:
976                    raise
977                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
978                self.skipTest(msg)
979            try:
980                new_testfn_st = os.stat(os_helper.TESTFN)
981                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
982
983                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
984                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
985                                 new_dummy_symlink_st.st_flags)
986            finally:
987                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
988
989    def test_environ(self):
990        if os.name == "nt":
991            item_type = str
992        else:
993            item_type = bytes
994        for k, v in posix.environ.items():
995            self.assertEqual(type(k), item_type)
996            self.assertEqual(type(v), item_type)
997
998    def test_putenv(self):
999        with self.assertRaises(ValueError):
1000            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
1001        with self.assertRaises(ValueError):
1002            os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
1003        with self.assertRaises(ValueError):
1004            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
1005        with self.assertRaises(ValueError):
1006            os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
1007        with self.assertRaises(ValueError):
1008            os.putenv('FRUIT=ORANGE', 'lemon')
1009        with self.assertRaises(ValueError):
1010            os.putenv(b'FRUIT=ORANGE', b'lemon')
1011
1012    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
1013    def test_getcwd_long_pathnames(self):
1014        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
1015        curdir = os.getcwd()
1016        base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd'
1017
1018        try:
1019            os.mkdir(base_path)
1020            os.chdir(base_path)
1021        except:
1022            #  Just returning nothing instead of the SkipTest exception, because
1023            #  the test results in Error in that case.  Is that ok?
1024            #  raise unittest.SkipTest("cannot create directory for testing")
1025            return
1026
1027            def _create_and_do_getcwd(dirname, current_path_length = 0):
1028                try:
1029                    os.mkdir(dirname)
1030                except:
1031                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1032
1033                os.chdir(dirname)
1034                try:
1035                    os.getcwd()
1036                    if current_path_length < 1027:
1037                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1038                finally:
1039                    os.chdir('..')
1040                    os.rmdir(dirname)
1041
1042            _create_and_do_getcwd(dirname)
1043
1044        finally:
1045            os.chdir(curdir)
1046            os_helper.rmtree(base_path)
1047
1048    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1049    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1050    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1051    def test_getgrouplist(self):
1052        user = pwd.getpwuid(os.getuid())[0]
1053        group = pwd.getpwuid(os.getuid())[3]
1054        self.assertIn(group, posix.getgrouplist(user, group))
1055
1056
1057    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1058    @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()")
1059    def test_getgroups(self):
1060        with os.popen('id -G 2>/dev/null') as idg:
1061            groups = idg.read().strip()
1062            ret = idg.close()
1063
1064        try:
1065            idg_groups = set(int(g) for g in groups.split())
1066        except ValueError:
1067            idg_groups = set()
1068        if ret is not None or not idg_groups:
1069            raise unittest.SkipTest("need working 'id -G'")
1070
1071        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1072        if sys.platform == 'darwin':
1073            import sysconfig
1074            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3'
1075            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
1076                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1077
1078        # 'id -G' and 'os.getgroups()' should return the same
1079        # groups, ignoring order, duplicates, and the effective gid.
1080        # #10822/#26944 - It is implementation defined whether
1081        # posix.getgroups() includes the effective gid.
1082        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1083        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1084
1085    @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
1086    @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
1087    def test_cld_xxxx_constants(self):
1088        os.CLD_EXITED
1089        os.CLD_KILLED
1090        os.CLD_DUMPED
1091        os.CLD_TRAPPED
1092        os.CLD_STOPPED
1093        os.CLD_CONTINUED
1094
1095    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1096                                           "don't have scheduling support")
1097    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1098                                                  "don't have sched affinity support")
1099
1100    @requires_sched_h
1101    def test_sched_yield(self):
1102        # This has no error conditions (at least on Linux).
1103        posix.sched_yield()
1104
1105    @requires_sched_h
1106    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1107                         "requires sched_get_priority_max()")
1108    def test_sched_priority(self):
1109        # Round-robin usually has interesting priorities.
1110        pol = posix.SCHED_RR
1111        lo = posix.sched_get_priority_min(pol)
1112        hi = posix.sched_get_priority_max(pol)
1113        self.assertIsInstance(lo, int)
1114        self.assertIsInstance(hi, int)
1115        self.assertGreaterEqual(hi, lo)
1116        # OSX evidently just returns 15 without checking the argument.
1117        if sys.platform != "darwin":
1118            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1119            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1120
1121    @requires_sched
1122    def test_get_and_set_scheduler_and_param(self):
1123        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1124                               if name.startswith("SCHED_")]
1125        mine = posix.sched_getscheduler(0)
1126        self.assertIn(mine, possible_schedulers)
1127        try:
1128            parent = posix.sched_getscheduler(os.getppid())
1129        except OSError as e:
1130            if e.errno != errno.EPERM:
1131                raise
1132        else:
1133            self.assertIn(parent, possible_schedulers)
1134        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1135        self.assertRaises(OSError, posix.sched_getparam, -1)
1136        param = posix.sched_getparam(0)
1137        self.assertIsInstance(param.sched_priority, int)
1138
1139        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1140        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1141        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1142        if not sys.platform.startswith(('freebsd', 'netbsd')):
1143            try:
1144                posix.sched_setscheduler(0, mine, param)
1145                posix.sched_setparam(0, param)
1146            except OSError as e:
1147                if e.errno != errno.EPERM:
1148                    raise
1149            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1150
1151        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1152        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1153        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1154        param = posix.sched_param(None)
1155        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1156        large = 214748364700
1157        param = posix.sched_param(large)
1158        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1159        param = posix.sched_param(sched_priority=-large)
1160        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1161
1162    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1163    def test_sched_rr_get_interval(self):
1164        try:
1165            interval = posix.sched_rr_get_interval(0)
1166        except OSError as e:
1167            # This likely means that sched_rr_get_interval is only valid for
1168            # processes with the SCHED_RR scheduler in effect.
1169            if e.errno != errno.EINVAL:
1170                raise
1171            self.skipTest("only works on SCHED_RR processes")
1172        self.assertIsInstance(interval, float)
1173        # Reasonable constraints, I think.
1174        self.assertGreaterEqual(interval, 0.)
1175        self.assertLess(interval, 1.)
1176
1177    @requires_sched_affinity
1178    def test_sched_getaffinity(self):
1179        mask = posix.sched_getaffinity(0)
1180        self.assertIsInstance(mask, set)
1181        self.assertGreaterEqual(len(mask), 1)
1182        self.assertRaises(OSError, posix.sched_getaffinity, -1)
1183        for cpu in mask:
1184            self.assertIsInstance(cpu, int)
1185            self.assertGreaterEqual(cpu, 0)
1186            self.assertLess(cpu, 1 << 32)
1187
1188    @requires_sched_affinity
1189    def test_sched_setaffinity(self):
1190        mask = posix.sched_getaffinity(0)
1191        if len(mask) > 1:
1192            # Empty masks are forbidden
1193            mask.pop()
1194        posix.sched_setaffinity(0, mask)
1195        self.assertEqual(posix.sched_getaffinity(0), mask)
1196        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1197        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1198        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1199        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1200        self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1201
1202    def test_rtld_constants(self):
1203        # check presence of major RTLD_* constants
1204        posix.RTLD_LAZY
1205        posix.RTLD_NOW
1206        posix.RTLD_GLOBAL
1207        posix.RTLD_LOCAL
1208
1209    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1210                         "test needs an OS that reports file holes")
1211    def test_fs_holes(self):
1212        # Even if the filesystem doesn't report holes,
1213        # if the OS supports it the SEEK_* constants
1214        # will be defined and will have a consistent
1215        # behaviour:
1216        # os.SEEK_DATA = current position
1217        # os.SEEK_HOLE = end of file position
1218        with open(os_helper.TESTFN, 'r+b') as fp:
1219            fp.write(b"hello")
1220            fp.flush()
1221            size = fp.tell()
1222            fno = fp.fileno()
1223            try :
1224                for i in range(size):
1225                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1226                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1227                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1228                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1229            except OSError :
1230                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1231                # but it is not true.
1232                # For instance:
1233                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1234                raise unittest.SkipTest("OSError raised!")
1235
1236    def test_path_error2(self):
1237        """
1238        Test functions that call path_error2(), providing two filenames in their exceptions.
1239        """
1240        for name in ("rename", "replace", "link"):
1241            function = getattr(os, name, None)
1242            if function is None:
1243                continue
1244
1245            for dst in ("noodly2", os_helper.TESTFN):
1246                try:
1247                    function('doesnotexistfilename', dst)
1248                except OSError as e:
1249                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1250                    break
1251            else:
1252                self.fail("No valid path_error2() test for os." + name)
1253
1254    def test_path_with_null_character(self):
1255        fn = os_helper.TESTFN
1256        fn_with_NUL = fn + '\0'
1257        self.addCleanup(os_helper.unlink, fn)
1258        os_helper.unlink(fn)
1259        fd = None
1260        try:
1261            with self.assertRaises(ValueError):
1262                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1263        finally:
1264            if fd is not None:
1265                os.close(fd)
1266        self.assertFalse(os.path.exists(fn))
1267        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1268        self.assertFalse(os.path.exists(fn))
1269        open(fn, 'wb').close()
1270        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1271
1272    def test_path_with_null_byte(self):
1273        fn = os.fsencode(os_helper.TESTFN)
1274        fn_with_NUL = fn + b'\0'
1275        self.addCleanup(os_helper.unlink, fn)
1276        os_helper.unlink(fn)
1277        fd = None
1278        try:
1279            with self.assertRaises(ValueError):
1280                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1281        finally:
1282            if fd is not None:
1283                os.close(fd)
1284        self.assertFalse(os.path.exists(fn))
1285        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1286        self.assertFalse(os.path.exists(fn))
1287        open(fn, 'wb').close()
1288        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1289
1290    @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable")
1291    def test_pidfd_open(self):
1292        with self.assertRaises(OSError) as cm:
1293            os.pidfd_open(-1)
1294        if cm.exception.errno == errno.ENOSYS:
1295            self.skipTest("system does not support pidfd_open")
1296        if isinstance(cm.exception, PermissionError):
1297            self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}")
1298        self.assertEqual(cm.exception.errno, errno.EINVAL)
1299        os.close(os.pidfd_open(os.getpid(), 0))
1300
1301
1302# tests for the posix *at functions follow
1303class TestPosixDirFd(unittest.TestCase):
1304    count = 0
1305
1306    @contextmanager
1307    def prepare(self):
1308        TestPosixDirFd.count += 1
1309        name = f'{os_helper.TESTFN}_{self.count}'
1310        base_dir = f'{os_helper.TESTFN}_{self.count}base'
1311        posix.mkdir(base_dir)
1312        self.addCleanup(posix.rmdir, base_dir)
1313        fullname = os.path.join(base_dir, name)
1314        assert not os.path.exists(fullname)
1315        with os_helper.open_dir_fd(base_dir) as dir_fd:
1316            yield (dir_fd, name, fullname)
1317
1318    @contextmanager
1319    def prepare_file(self):
1320        with self.prepare() as (dir_fd, name, fullname):
1321            os_helper.create_empty_file(fullname)
1322            self.addCleanup(posix.unlink, fullname)
1323            yield (dir_fd, name, fullname)
1324
1325    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1326    def test_access_dir_fd(self):
1327        with self.prepare_file() as (dir_fd, name, fullname):
1328            self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd))
1329
1330    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1331    def test_chmod_dir_fd(self):
1332        with self.prepare_file() as (dir_fd, name, fullname):
1333            posix.chmod(fullname, stat.S_IRUSR)
1334            posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1335            s = posix.stat(fullname)
1336            self.assertEqual(s.st_mode & stat.S_IRWXU,
1337                             stat.S_IRUSR | stat.S_IWUSR)
1338
1339    @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
1340                         "test needs dir_fd support in os.chown()")
1341    def test_chown_dir_fd(self):
1342        with self.prepare_file() as (dir_fd, name, fullname):
1343            posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd)
1344
1345    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1346    def test_stat_dir_fd(self):
1347        with self.prepare() as (dir_fd, name, fullname):
1348            with open(fullname, 'w') as outfile:
1349                outfile.write("testline\n")
1350            self.addCleanup(posix.unlink, fullname)
1351
1352            s1 = posix.stat(fullname)
1353            s2 = posix.stat(name, dir_fd=dir_fd)
1354            self.assertEqual(s1, s2)
1355            s2 = posix.stat(fullname, dir_fd=None)
1356            self.assertEqual(s1, s2)
1357
1358            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1359                    posix.stat, name, dir_fd=posix.getcwd())
1360            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1361                    posix.stat, name, dir_fd=float(dir_fd))
1362            self.assertRaises(OverflowError,
1363                    posix.stat, name, dir_fd=10**20)
1364
1365    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1366    def test_utime_dir_fd(self):
1367        with self.prepare_file() as (dir_fd, name, fullname):
1368            now = time.time()
1369            posix.utime(name, None, dir_fd=dir_fd)
1370            posix.utime(name, dir_fd=dir_fd)
1371            self.assertRaises(TypeError, posix.utime, name,
1372                              now, dir_fd=dir_fd)
1373            self.assertRaises(TypeError, posix.utime, name,
1374                              (None, None), dir_fd=dir_fd)
1375            self.assertRaises(TypeError, posix.utime, name,
1376                              (now, None), dir_fd=dir_fd)
1377            self.assertRaises(TypeError, posix.utime, name,
1378                              (None, now), dir_fd=dir_fd)
1379            self.assertRaises(TypeError, posix.utime, name,
1380                              (now, "x"), dir_fd=dir_fd)
1381            posix.utime(name, (int(now), int(now)), dir_fd=dir_fd)
1382            posix.utime(name, (now, now), dir_fd=dir_fd)
1383            posix.utime(name,
1384                    (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd)
1385            posix.utime(name, dir_fd=dir_fd,
1386                            times=(int(now), int((now - int(now)) * 1e9)))
1387
1388            # try dir_fd and follow_symlinks together
1389            if os.utime in os.supports_follow_symlinks:
1390                try:
1391                    posix.utime(name, follow_symlinks=False, dir_fd=dir_fd)
1392                except ValueError:
1393                    # whoops!  using both together not supported on this platform.
1394                    pass
1395
1396    @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
1397    def test_link_dir_fd(self):
1398        with self.prepare_file() as (dir_fd, name, fullname), \
1399             self.prepare() as (dir_fd2, linkname, fulllinkname):
1400            try:
1401                posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1402            except PermissionError as e:
1403                self.skipTest('posix.link(): %s' % e)
1404            self.addCleanup(posix.unlink, fulllinkname)
1405            # should have same inodes
1406            self.assertEqual(posix.stat(fullname)[1],
1407                posix.stat(fulllinkname)[1])
1408
1409    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1410    def test_mkdir_dir_fd(self):
1411        with self.prepare() as (dir_fd, name, fullname):
1412            posix.mkdir(name, dir_fd=dir_fd)
1413            self.addCleanup(posix.rmdir, fullname)
1414            posix.stat(fullname) # should not raise exception
1415
1416    @unittest.skipUnless(hasattr(os, 'mknod')
1417                         and (os.mknod in os.supports_dir_fd)
1418                         and hasattr(stat, 'S_IFIFO'),
1419                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1420    def test_mknod_dir_fd(self):
1421        # Test using mknodat() to create a FIFO (the only use specified
1422        # by POSIX).
1423        with self.prepare() as (dir_fd, name, fullname):
1424            mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1425            try:
1426                posix.mknod(name, mode, 0, dir_fd=dir_fd)
1427            except OSError as e:
1428                # Some old systems don't allow unprivileged users to use
1429                # mknod(), or only support creating device nodes.
1430                self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1431            else:
1432                self.addCleanup(posix.unlink, fullname)
1433                self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1434
1435    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1436    def test_open_dir_fd(self):
1437        with self.prepare() as (dir_fd, name, fullname):
1438            with open(fullname, 'wb') as outfile:
1439                outfile.write(b"testline\n")
1440            self.addCleanup(posix.unlink, fullname)
1441            fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd)
1442            try:
1443                res = posix.read(fd, 9)
1444                self.assertEqual(b"testline\n", res)
1445            finally:
1446                posix.close(fd)
1447
1448    @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
1449                         "test needs dir_fd support in os.readlink()")
1450    def test_readlink_dir_fd(self):
1451        with self.prepare() as (dir_fd, name, fullname):
1452            os.symlink('symlink', fullname)
1453            self.addCleanup(posix.unlink, fullname)
1454            self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink')
1455
1456    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1457    def test_rename_dir_fd(self):
1458        with self.prepare_file() as (dir_fd, name, fullname), \
1459             self.prepare() as (dir_fd2, name2, fullname2):
1460            posix.rename(name, name2,
1461                         src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1462            posix.stat(fullname2) # should not raise exception
1463            posix.rename(fullname2, fullname)
1464
1465    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1466    def test_symlink_dir_fd(self):
1467        with self.prepare() as (dir_fd, name, fullname):
1468            posix.symlink('symlink', name, dir_fd=dir_fd)
1469            self.addCleanup(posix.unlink, fullname)
1470            self.assertEqual(posix.readlink(fullname), 'symlink')
1471
1472    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1473    def test_unlink_dir_fd(self):
1474        with self.prepare() as (dir_fd, name, fullname):
1475            os_helper.create_empty_file(fullname)
1476            posix.stat(fullname) # should not raise exception
1477            try:
1478                posix.unlink(name, dir_fd=dir_fd)
1479                self.assertRaises(OSError, posix.stat, fullname)
1480            except:
1481                self.addCleanup(posix.unlink, fullname)
1482                raise
1483
1484    @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1485    def test_mkfifo_dir_fd(self):
1486        with self.prepare() as (dir_fd, name, fullname):
1487            try:
1488                posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1489            except PermissionError as e:
1490                self.skipTest('posix.mkfifo(): %s' % e)
1491            self.addCleanup(posix.unlink, fullname)
1492            self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1493
1494
1495class PosixGroupsTester(unittest.TestCase):
1496
1497    def setUp(self):
1498        if posix.getuid() != 0:
1499            raise unittest.SkipTest("not enough privileges")
1500        if not hasattr(posix, 'getgroups'):
1501            raise unittest.SkipTest("need posix.getgroups")
1502        if sys.platform == 'darwin':
1503            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1504        self.saved_groups = posix.getgroups()
1505
1506    def tearDown(self):
1507        if hasattr(posix, 'setgroups'):
1508            posix.setgroups(self.saved_groups)
1509        elif hasattr(posix, 'initgroups'):
1510            name = pwd.getpwuid(posix.getuid()).pw_name
1511            posix.initgroups(name, self.saved_groups[0])
1512
1513    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1514                         "test needs posix.initgroups()")
1515    def test_initgroups(self):
1516        # find missing group
1517
1518        g = max(self.saved_groups or [0]) + 1
1519        name = pwd.getpwuid(posix.getuid()).pw_name
1520        posix.initgroups(name, g)
1521        self.assertIn(g, posix.getgroups())
1522
1523    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1524                         "test needs posix.setgroups()")
1525    def test_setgroups(self):
1526        for groups in [[0], list(range(16))]:
1527            posix.setgroups(groups)
1528            self.assertListEqual(groups, posix.getgroups())
1529
1530
1531class _PosixSpawnMixin:
1532    # Program which does nothing and exits with status 0 (success)
1533    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1534    spawn_func = None
1535
1536    def python_args(self, *args):
1537        # Disable site module to avoid side effects. For example,
1538        # on Fedora 28, if the HOME environment variable is not set,
1539        # site._getuserbase() calls pwd.getpwuid() which opens
1540        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1541        # test_close_file() to fail.
1542        return (sys.executable, '-I', '-S', *args)
1543
1544    def test_returns_pid(self):
1545        pidfile = os_helper.TESTFN
1546        self.addCleanup(os_helper.unlink, pidfile)
1547        script = f"""if 1:
1548            import os
1549            with open({pidfile!r}, "w") as pidfile:
1550                pidfile.write(str(os.getpid()))
1551            """
1552        args = self.python_args('-c', script)
1553        pid = self.spawn_func(args[0], args, os.environ)
1554        support.wait_process(pid, exitcode=0)
1555        with open(pidfile, encoding="utf-8") as f:
1556            self.assertEqual(f.read(), str(pid))
1557
1558    def test_no_such_executable(self):
1559        no_such_executable = 'no_such_executable'
1560        try:
1561            pid = self.spawn_func(no_such_executable,
1562                                  [no_such_executable],
1563                                  os.environ)
1564        # bpo-35794: PermissionError can be raised if there are
1565        # directories in the $PATH that are not accessible.
1566        except (FileNotFoundError, PermissionError) as exc:
1567            self.assertEqual(exc.filename, no_such_executable)
1568        else:
1569            pid2, status = os.waitpid(pid, 0)
1570            self.assertEqual(pid2, pid)
1571            self.assertNotEqual(status, 0)
1572
1573    def test_specify_environment(self):
1574        envfile = os_helper.TESTFN
1575        self.addCleanup(os_helper.unlink, envfile)
1576        script = f"""if 1:
1577            import os
1578            with open({envfile!r}, "w", encoding="utf-8") as envfile:
1579                envfile.write(os.environ['foo'])
1580        """
1581        args = self.python_args('-c', script)
1582        pid = self.spawn_func(args[0], args,
1583                              {**os.environ, 'foo': 'bar'})
1584        support.wait_process(pid, exitcode=0)
1585        with open(envfile, encoding="utf-8") as f:
1586            self.assertEqual(f.read(), 'bar')
1587
1588    def test_none_file_actions(self):
1589        pid = self.spawn_func(
1590            self.NOOP_PROGRAM[0],
1591            self.NOOP_PROGRAM,
1592            os.environ,
1593            file_actions=None
1594        )
1595        support.wait_process(pid, exitcode=0)
1596
1597    def test_empty_file_actions(self):
1598        pid = self.spawn_func(
1599            self.NOOP_PROGRAM[0],
1600            self.NOOP_PROGRAM,
1601            os.environ,
1602            file_actions=[]
1603        )
1604        support.wait_process(pid, exitcode=0)
1605
1606    def test_resetids_explicit_default(self):
1607        pid = self.spawn_func(
1608            sys.executable,
1609            [sys.executable, '-c', 'pass'],
1610            os.environ,
1611            resetids=False
1612        )
1613        support.wait_process(pid, exitcode=0)
1614
1615    def test_resetids(self):
1616        pid = self.spawn_func(
1617            sys.executable,
1618            [sys.executable, '-c', 'pass'],
1619            os.environ,
1620            resetids=True
1621        )
1622        support.wait_process(pid, exitcode=0)
1623
1624    def test_resetids_wrong_type(self):
1625        with self.assertRaises(TypeError):
1626            self.spawn_func(sys.executable,
1627                            [sys.executable, "-c", "pass"],
1628                            os.environ, resetids=None)
1629
1630    def test_setpgroup(self):
1631        pid = self.spawn_func(
1632            sys.executable,
1633            [sys.executable, '-c', 'pass'],
1634            os.environ,
1635            setpgroup=os.getpgrp()
1636        )
1637        support.wait_process(pid, exitcode=0)
1638
1639    def test_setpgroup_wrong_type(self):
1640        with self.assertRaises(TypeError):
1641            self.spawn_func(sys.executable,
1642                            [sys.executable, "-c", "pass"],
1643                            os.environ, setpgroup="023")
1644
1645    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1646                           'need signal.pthread_sigmask()')
1647    def test_setsigmask(self):
1648        code = textwrap.dedent("""\
1649            import signal
1650            signal.raise_signal(signal.SIGUSR1)""")
1651
1652        pid = self.spawn_func(
1653            sys.executable,
1654            [sys.executable, '-c', code],
1655            os.environ,
1656            setsigmask=[signal.SIGUSR1]
1657        )
1658        support.wait_process(pid, exitcode=0)
1659
1660    def test_setsigmask_wrong_type(self):
1661        with self.assertRaises(TypeError):
1662            self.spawn_func(sys.executable,
1663                            [sys.executable, "-c", "pass"],
1664                            os.environ, setsigmask=34)
1665        with self.assertRaises(TypeError):
1666            self.spawn_func(sys.executable,
1667                            [sys.executable, "-c", "pass"],
1668                            os.environ, setsigmask=["j"])
1669        with self.assertRaises(ValueError):
1670            self.spawn_func(sys.executable,
1671                            [sys.executable, "-c", "pass"],
1672                            os.environ, setsigmask=[signal.NSIG,
1673                                                    signal.NSIG+1])
1674
1675    def test_setsid(self):
1676        rfd, wfd = os.pipe()
1677        self.addCleanup(os.close, rfd)
1678        try:
1679            os.set_inheritable(wfd, True)
1680
1681            code = textwrap.dedent(f"""
1682                import os
1683                fd = {wfd}
1684                sid = os.getsid(0)
1685                os.write(fd, str(sid).encode())
1686            """)
1687
1688            try:
1689                pid = self.spawn_func(sys.executable,
1690                                      [sys.executable, "-c", code],
1691                                      os.environ, setsid=True)
1692            except NotImplementedError as exc:
1693                self.skipTest(f"setsid is not supported: {exc!r}")
1694            except PermissionError as exc:
1695                self.skipTest(f"setsid failed with: {exc!r}")
1696        finally:
1697            os.close(wfd)
1698
1699        support.wait_process(pid, exitcode=0)
1700
1701        output = os.read(rfd, 100)
1702        child_sid = int(output)
1703        parent_sid = os.getsid(os.getpid())
1704        self.assertNotEqual(parent_sid, child_sid)
1705
1706    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1707                         'need signal.pthread_sigmask()')
1708    def test_setsigdef(self):
1709        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1710        code = textwrap.dedent("""\
1711            import signal
1712            signal.raise_signal(signal.SIGUSR1)""")
1713        try:
1714            pid = self.spawn_func(
1715                sys.executable,
1716                [sys.executable, '-c', code],
1717                os.environ,
1718                setsigdef=[signal.SIGUSR1]
1719            )
1720        finally:
1721            signal.signal(signal.SIGUSR1, original_handler)
1722
1723        support.wait_process(pid, exitcode=-signal.SIGUSR1)
1724
1725    def test_setsigdef_wrong_type(self):
1726        with self.assertRaises(TypeError):
1727            self.spawn_func(sys.executable,
1728                            [sys.executable, "-c", "pass"],
1729                            os.environ, setsigdef=34)
1730        with self.assertRaises(TypeError):
1731            self.spawn_func(sys.executable,
1732                            [sys.executable, "-c", "pass"],
1733                            os.environ, setsigdef=["j"])
1734        with self.assertRaises(ValueError):
1735            self.spawn_func(sys.executable,
1736                            [sys.executable, "-c", "pass"],
1737                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1738
1739    @requires_sched
1740    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1741                     "bpo-34685: test can fail on BSD")
1742    def test_setscheduler_only_param(self):
1743        policy = os.sched_getscheduler(0)
1744        priority = os.sched_get_priority_min(policy)
1745        code = textwrap.dedent(f"""\
1746            import os, sys
1747            if os.sched_getscheduler(0) != {policy}:
1748                sys.exit(101)
1749            if os.sched_getparam(0).sched_priority != {priority}:
1750                sys.exit(102)""")
1751        pid = self.spawn_func(
1752            sys.executable,
1753            [sys.executable, '-c', code],
1754            os.environ,
1755            scheduler=(None, os.sched_param(priority))
1756        )
1757        support.wait_process(pid, exitcode=0)
1758
1759    @requires_sched
1760    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1761                     "bpo-34685: test can fail on BSD")
1762    def test_setscheduler_with_policy(self):
1763        policy = os.sched_getscheduler(0)
1764        priority = os.sched_get_priority_min(policy)
1765        code = textwrap.dedent(f"""\
1766            import os, sys
1767            if os.sched_getscheduler(0) != {policy}:
1768                sys.exit(101)
1769            if os.sched_getparam(0).sched_priority != {priority}:
1770                sys.exit(102)""")
1771        pid = self.spawn_func(
1772            sys.executable,
1773            [sys.executable, '-c', code],
1774            os.environ,
1775            scheduler=(policy, os.sched_param(priority))
1776        )
1777        support.wait_process(pid, exitcode=0)
1778
1779    def test_multiple_file_actions(self):
1780        file_actions = [
1781            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1782            (os.POSIX_SPAWN_CLOSE, 0),
1783            (os.POSIX_SPAWN_DUP2, 1, 4),
1784        ]
1785        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1786                              self.NOOP_PROGRAM,
1787                              os.environ,
1788                              file_actions=file_actions)
1789        support.wait_process(pid, exitcode=0)
1790
1791    def test_bad_file_actions(self):
1792        args = self.NOOP_PROGRAM
1793        with self.assertRaises(TypeError):
1794            self.spawn_func(args[0], args, os.environ,
1795                            file_actions=[None])
1796        with self.assertRaises(TypeError):
1797            self.spawn_func(args[0], args, os.environ,
1798                            file_actions=[()])
1799        with self.assertRaises(TypeError):
1800            self.spawn_func(args[0], args, os.environ,
1801                            file_actions=[(None,)])
1802        with self.assertRaises(TypeError):
1803            self.spawn_func(args[0], args, os.environ,
1804                            file_actions=[(12345,)])
1805        with self.assertRaises(TypeError):
1806            self.spawn_func(args[0], args, os.environ,
1807                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
1808        with self.assertRaises(TypeError):
1809            self.spawn_func(args[0], args, os.environ,
1810                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
1811        with self.assertRaises(TypeError):
1812            self.spawn_func(args[0], args, os.environ,
1813                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
1814        with self.assertRaises(ValueError):
1815            self.spawn_func(args[0], args, os.environ,
1816                            file_actions=[(os.POSIX_SPAWN_OPEN,
1817                                           3, __file__ + '\0',
1818                                           os.O_RDONLY, 0)])
1819
1820    def test_open_file(self):
1821        outfile = os_helper.TESTFN
1822        self.addCleanup(os_helper.unlink, outfile)
1823        script = """if 1:
1824            import sys
1825            sys.stdout.write("hello")
1826            """
1827        file_actions = [
1828            (os.POSIX_SPAWN_OPEN, 1, outfile,
1829                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
1830                stat.S_IRUSR | stat.S_IWUSR),
1831        ]
1832        args = self.python_args('-c', script)
1833        pid = self.spawn_func(args[0], args, os.environ,
1834                              file_actions=file_actions)
1835
1836        support.wait_process(pid, exitcode=0)
1837        with open(outfile, encoding="utf-8") as f:
1838            self.assertEqual(f.read(), 'hello')
1839
1840    def test_close_file(self):
1841        closefile = os_helper.TESTFN
1842        self.addCleanup(os_helper.unlink, closefile)
1843        script = f"""if 1:
1844            import os
1845            try:
1846                os.fstat(0)
1847            except OSError as e:
1848                with open({closefile!r}, 'w', encoding='utf-8') as closefile:
1849                    closefile.write('is closed %d' % e.errno)
1850            """
1851        args = self.python_args('-c', script)
1852        pid = self.spawn_func(args[0], args, os.environ,
1853                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
1854
1855        support.wait_process(pid, exitcode=0)
1856        with open(closefile, encoding="utf-8") as f:
1857            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
1858
1859    def test_dup2(self):
1860        dupfile = os_helper.TESTFN
1861        self.addCleanup(os_helper.unlink, dupfile)
1862        script = """if 1:
1863            import sys
1864            sys.stdout.write("hello")
1865            """
1866        with open(dupfile, "wb") as childfile:
1867            file_actions = [
1868                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
1869            ]
1870            args = self.python_args('-c', script)
1871            pid = self.spawn_func(args[0], args, os.environ,
1872                                  file_actions=file_actions)
1873            support.wait_process(pid, exitcode=0)
1874        with open(dupfile, encoding="utf-8") as f:
1875            self.assertEqual(f.read(), 'hello')
1876
1877
1878@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
1879class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
1880    spawn_func = getattr(posix, 'posix_spawn', None)
1881
1882
1883@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
1884class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
1885    spawn_func = getattr(posix, 'posix_spawnp', None)
1886
1887    @os_helper.skip_unless_symlink
1888    def test_posix_spawnp(self):
1889        # Use a symlink to create a program in its own temporary directory
1890        temp_dir = tempfile.mkdtemp()
1891        self.addCleanup(os_helper.rmtree, temp_dir)
1892
1893        program = 'posix_spawnp_test_program.exe'
1894        program_fullpath = os.path.join(temp_dir, program)
1895        os.symlink(sys.executable, program_fullpath)
1896
1897        try:
1898            path = os.pathsep.join((temp_dir, os.environ['PATH']))
1899        except KeyError:
1900            path = temp_dir   # PATH is not set
1901
1902        spawn_args = (program, '-I', '-S', '-c', 'pass')
1903        code = textwrap.dedent("""
1904            import os
1905            from test import support
1906
1907            args = %a
1908            pid = os.posix_spawnp(args[0], args, os.environ)
1909
1910            support.wait_process(pid, exitcode=0)
1911        """ % (spawn_args,))
1912
1913        # Use a subprocess to test os.posix_spawnp() with a modified PATH
1914        # environment variable: posix_spawnp() uses the current environment
1915        # to locate the program, not its environment argument.
1916        args = ('-c', code)
1917        assert_python_ok(*args, PATH=path)
1918
1919
1920@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS")
1921class TestPosixWeaklinking(unittest.TestCase):
1922    # These test cases verify that weak linking support on macOS works
1923    # as expected. These cases only test new behaviour introduced by weak linking,
1924    # regular behaviour is tested by the normal test cases.
1925    #
1926    # See the section on Weak Linking in Mac/README.txt for more information.
1927    def setUp(self):
1928        import sysconfig
1929        import platform
1930
1931        config_vars = sysconfig.get_config_vars()
1932        self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] }
1933        self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split("."))
1934
1935    def _verify_available(self, name):
1936        if name not in self.available:
1937            raise unittest.SkipTest(f"{name} not weak-linked")
1938
1939    def test_pwritev(self):
1940        self._verify_available("HAVE_PWRITEV")
1941        if self.mac_ver >= (10, 16):
1942            self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available")
1943            self.assertTrue(hasattr(os, "preadv"), "os.readv is not available")
1944
1945        else:
1946            self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available")
1947            self.assertFalse(hasattr(os, "preadv"), "os.readv is available")
1948
1949    def test_stat(self):
1950        self._verify_available("HAVE_FSTATAT")
1951        if self.mac_ver >= (10, 10):
1952            self.assertIn("HAVE_FSTATAT", posix._have_functions)
1953
1954        else:
1955            self.assertNotIn("HAVE_FSTATAT", posix._have_functions)
1956
1957            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1958                os.stat("file", dir_fd=0)
1959
1960    def test_access(self):
1961        self._verify_available("HAVE_FACCESSAT")
1962        if self.mac_ver >= (10, 10):
1963            self.assertIn("HAVE_FACCESSAT", posix._have_functions)
1964
1965        else:
1966            self.assertNotIn("HAVE_FACCESSAT", posix._have_functions)
1967
1968            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1969                os.access("file", os.R_OK, dir_fd=0)
1970
1971            with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"):
1972                os.access("file", os.R_OK, follow_symlinks=False)
1973
1974            with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"):
1975                os.access("file", os.R_OK, effective_ids=True)
1976
1977    def test_chmod(self):
1978        self._verify_available("HAVE_FCHMODAT")
1979        if self.mac_ver >= (10, 10):
1980            self.assertIn("HAVE_FCHMODAT", posix._have_functions)
1981
1982        else:
1983            self.assertNotIn("HAVE_FCHMODAT", posix._have_functions)
1984            self.assertIn("HAVE_LCHMOD", posix._have_functions)
1985
1986            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1987                os.chmod("file", 0o644, dir_fd=0)
1988
1989    def test_chown(self):
1990        self._verify_available("HAVE_FCHOWNAT")
1991        if self.mac_ver >= (10, 10):
1992            self.assertIn("HAVE_FCHOWNAT", posix._have_functions)
1993
1994        else:
1995            self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions)
1996            self.assertIn("HAVE_LCHOWN", posix._have_functions)
1997
1998            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1999                os.chown("file", 0, 0, dir_fd=0)
2000
2001    def test_link(self):
2002        self._verify_available("HAVE_LINKAT")
2003        if self.mac_ver >= (10, 10):
2004            self.assertIn("HAVE_LINKAT", posix._have_functions)
2005
2006        else:
2007            self.assertNotIn("HAVE_LINKAT", posix._have_functions)
2008
2009            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2010                os.link("source", "target",  src_dir_fd=0)
2011
2012            with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"):
2013                os.link("source", "target",  dst_dir_fd=0)
2014
2015            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2016                os.link("source", "target",  src_dir_fd=0, dst_dir_fd=0)
2017
2018            # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag
2019            with os_helper.temp_dir() as base_path:
2020                link_path = os.path.join(base_path, "link")
2021                target_path = os.path.join(base_path, "target")
2022                source_path = os.path.join(base_path, "source")
2023
2024                with open(source_path, "w") as fp:
2025                    fp.write("data")
2026
2027                os.symlink("target", link_path)
2028
2029                # Calling os.link should fail in the link(2) call, and
2030                # should not reject *follow_symlinks* (to match the
2031                # behaviour you'd get when building on a platform without
2032                # linkat)
2033                with self.assertRaises(FileExistsError):
2034                    os.link(source_path, link_path, follow_symlinks=True)
2035
2036                with self.assertRaises(FileExistsError):
2037                    os.link(source_path, link_path, follow_symlinks=False)
2038
2039
2040    def test_listdir_scandir(self):
2041        self._verify_available("HAVE_FDOPENDIR")
2042        if self.mac_ver >= (10, 10):
2043            self.assertIn("HAVE_FDOPENDIR", posix._have_functions)
2044
2045        else:
2046            self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions)
2047
2048            with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"):
2049                os.listdir(0)
2050
2051            with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"):
2052                os.scandir(0)
2053
2054    def test_mkdir(self):
2055        self._verify_available("HAVE_MKDIRAT")
2056        if self.mac_ver >= (10, 10):
2057            self.assertIn("HAVE_MKDIRAT", posix._have_functions)
2058
2059        else:
2060            self.assertNotIn("HAVE_MKDIRAT", posix._have_functions)
2061
2062            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2063                os.mkdir("dir", dir_fd=0)
2064
2065    def test_rename_replace(self):
2066        self._verify_available("HAVE_RENAMEAT")
2067        if self.mac_ver >= (10, 10):
2068            self.assertIn("HAVE_RENAMEAT", posix._have_functions)
2069
2070        else:
2071            self.assertNotIn("HAVE_RENAMEAT", posix._have_functions)
2072
2073            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2074                os.rename("a", "b", src_dir_fd=0)
2075
2076            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2077                os.rename("a", "b", dst_dir_fd=0)
2078
2079            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2080                os.replace("a", "b", src_dir_fd=0)
2081
2082            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2083                os.replace("a", "b", dst_dir_fd=0)
2084
2085    def test_unlink_rmdir(self):
2086        self._verify_available("HAVE_UNLINKAT")
2087        if self.mac_ver >= (10, 10):
2088            self.assertIn("HAVE_UNLINKAT", posix._have_functions)
2089
2090        else:
2091            self.assertNotIn("HAVE_UNLINKAT", posix._have_functions)
2092
2093            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2094                os.unlink("path", dir_fd=0)
2095
2096            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2097                os.rmdir("path", dir_fd=0)
2098
2099    def test_open(self):
2100        self._verify_available("HAVE_OPENAT")
2101        if self.mac_ver >= (10, 10):
2102            self.assertIn("HAVE_OPENAT", posix._have_functions)
2103
2104        else:
2105            self.assertNotIn("HAVE_OPENAT", posix._have_functions)
2106
2107            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2108                os.open("path", os.O_RDONLY, dir_fd=0)
2109
2110    def test_readlink(self):
2111        self._verify_available("HAVE_READLINKAT")
2112        if self.mac_ver >= (10, 10):
2113            self.assertIn("HAVE_READLINKAT", posix._have_functions)
2114
2115        else:
2116            self.assertNotIn("HAVE_READLINKAT", posix._have_functions)
2117
2118            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2119                os.readlink("path",  dir_fd=0)
2120
2121    def test_symlink(self):
2122        self._verify_available("HAVE_SYMLINKAT")
2123        if self.mac_ver >= (10, 10):
2124            self.assertIn("HAVE_SYMLINKAT", posix._have_functions)
2125
2126        else:
2127            self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions)
2128
2129            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2130                os.symlink("a", "b",  dir_fd=0)
2131
2132    def test_utime(self):
2133        self._verify_available("HAVE_FUTIMENS")
2134        self._verify_available("HAVE_UTIMENSAT")
2135        if self.mac_ver >= (10, 13):
2136            self.assertIn("HAVE_FUTIMENS", posix._have_functions)
2137            self.assertIn("HAVE_UTIMENSAT", posix._have_functions)
2138
2139        else:
2140            self.assertNotIn("HAVE_FUTIMENS", posix._have_functions)
2141            self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions)
2142
2143            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2144                os.utime("path", dir_fd=0)
2145
2146
2147def tearDownModule():
2148    support.reap_children()
2149
2150
2151if __name__ == '__main__':
2152    unittest.main()
2153