• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"Test posix functions"
2
3from test import support
4from test.support import is_apple
5from test.support import os_helper
6from test.support import warnings_helper
7from test.support.script_helper import assert_python_ok
8
9import copy
10import errno
11import sys
12import signal
13import time
14import os
15import platform
16import pickle
17import stat
18import tempfile
19import unittest
20import warnings
21import textwrap
22from contextlib import contextmanager
23
24try:
25    import posix
26except ImportError:
27    import nt as posix
28
29try:
30    import pwd
31except ImportError:
32    pwd = None
33
34_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
35                              os_helper.TESTFN + '-dummy-symlink')
36
37requires_32b = unittest.skipUnless(
38    # Emscripten/WASI have 32 bits pointers, but support 64 bits syscall args.
39    sys.maxsize < 2**32 and not (support.is_emscripten or support.is_wasi),
40    'test is only meaningful on 32-bit builds'
41)
42
43def _supports_sched():
44    if not hasattr(posix, 'sched_getscheduler'):
45        return False
46    try:
47        posix.sched_getscheduler(0)
48    except OSError as e:
49        if e.errno == errno.ENOSYS:
50            return False
51    return True
52
53requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
54
55
56class PosixTester(unittest.TestCase):
57
58    def setUp(self):
59        # create empty file
60        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
61        with open(os_helper.TESTFN, "wb"):
62            pass
63        self.enterContext(warnings_helper.check_warnings())
64        warnings.filterwarnings('ignore', '.* potential security risk .*',
65                                RuntimeWarning)
66
67    def testNoArgFunctions(self):
68        # test posix functions which take no arguments and have
69        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
70        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
71                             "times", "getloadavg",
72                             "getegid", "geteuid", "getgid", "getgroups",
73                             "getpid", "getpgrp", "getppid", "getuid", "sync",
74                           ]
75
76        for name in NO_ARG_FUNCTIONS:
77            posix_func = getattr(posix, name, None)
78            if posix_func is not None:
79                with self.subTest(name):
80                    posix_func()
81                    self.assertRaises(TypeError, posix_func, 1)
82
83    @unittest.skipUnless(hasattr(posix, 'getresuid'),
84                         'test needs posix.getresuid()')
85    def test_getresuid(self):
86        user_ids = posix.getresuid()
87        self.assertEqual(len(user_ids), 3)
88        for val in user_ids:
89            self.assertGreaterEqual(val, 0)
90
91    @unittest.skipUnless(hasattr(posix, 'getresgid'),
92                         'test needs posix.getresgid()')
93    def test_getresgid(self):
94        group_ids = posix.getresgid()
95        self.assertEqual(len(group_ids), 3)
96        for val in group_ids:
97            self.assertGreaterEqual(val, 0)
98
99    @unittest.skipUnless(hasattr(posix, 'setresuid'),
100                         'test needs posix.setresuid()')
101    def test_setresuid(self):
102        current_user_ids = posix.getresuid()
103        self.assertIsNone(posix.setresuid(*current_user_ids))
104        # -1 means don't change that value.
105        self.assertIsNone(posix.setresuid(-1, -1, -1))
106
107    @unittest.skipUnless(hasattr(posix, 'setresuid'),
108                         'test needs posix.setresuid()')
109    def test_setresuid_exception(self):
110        # Don't do this test if someone is silly enough to run us as root.
111        current_user_ids = posix.getresuid()
112        if 0 not in current_user_ids:
113            new_user_ids = (current_user_ids[0]+1, -1, -1)
114            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
115
116    @unittest.skipUnless(hasattr(posix, 'setresgid'),
117                         'test needs posix.setresgid()')
118    def test_setresgid(self):
119        current_group_ids = posix.getresgid()
120        self.assertIsNone(posix.setresgid(*current_group_ids))
121        # -1 means don't change that value.
122        self.assertIsNone(posix.setresgid(-1, -1, -1))
123
124    @unittest.skipUnless(hasattr(posix, 'setresgid'),
125                         'test needs posix.setresgid()')
126    def test_setresgid_exception(self):
127        # Don't do this test if someone is silly enough to run us as root.
128        current_group_ids = posix.getresgid()
129        if 0 not in current_group_ids:
130            new_group_ids = (current_group_ids[0]+1, -1, -1)
131            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
132
133    @unittest.skipUnless(hasattr(posix, 'initgroups'),
134                         "test needs os.initgroups()")
135    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
136    def test_initgroups(self):
137        # It takes a string and an integer; check that it raises a TypeError
138        # for other argument lists.
139        self.assertRaises(TypeError, posix.initgroups)
140        self.assertRaises(TypeError, posix.initgroups, None)
141        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
142        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
143
144        # If a non-privileged user invokes it, it should fail with OSError
145        # EPERM.
146        if os.getuid() != 0:
147            try:
148                name = pwd.getpwuid(posix.getuid()).pw_name
149            except KeyError:
150                # the current UID may not have a pwd entry
151                raise unittest.SkipTest("need a pwd entry")
152            try:
153                posix.initgroups(name, 13)
154            except OSError as e:
155                self.assertEqual(e.errno, errno.EPERM)
156            else:
157                self.fail("Expected OSError to be raised by initgroups")
158
159    @unittest.skipUnless(hasattr(posix, 'statvfs'),
160                         'test needs posix.statvfs()')
161    def test_statvfs(self):
162        self.assertTrue(posix.statvfs(os.curdir))
163
164    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
165                         'test needs posix.fstatvfs()')
166    def test_fstatvfs(self):
167        fp = open(os_helper.TESTFN)
168        try:
169            self.assertTrue(posix.fstatvfs(fp.fileno()))
170            self.assertTrue(posix.statvfs(fp.fileno()))
171        finally:
172            fp.close()
173
174    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
175                         'test needs posix.ftruncate()')
176    def test_ftruncate(self):
177        fp = open(os_helper.TESTFN, 'w+')
178        try:
179            # we need to have some data to truncate
180            fp.write('test')
181            fp.flush()
182            posix.ftruncate(fp.fileno(), 0)
183        finally:
184            fp.close()
185
186    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
187    def test_truncate(self):
188        with open(os_helper.TESTFN, 'w') as fp:
189            fp.write('test')
190            fp.flush()
191        posix.truncate(os_helper.TESTFN, 0)
192
193    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
194    @support.requires_fork()
195    def test_fexecve(self):
196        fp = os.open(sys.executable, os.O_RDONLY)
197        try:
198            pid = os.fork()
199            if pid == 0:
200                os.chdir(os.path.split(sys.executable)[0])
201                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
202            else:
203                support.wait_process(pid, exitcode=0)
204        finally:
205            os.close(fp)
206
207
208    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
209    @support.requires_fork()
210    def test_waitid(self):
211        pid = os.fork()
212        if pid == 0:
213            os.chdir(os.path.split(sys.executable)[0])
214            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
215        else:
216            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
217            self.assertEqual(pid, res.si_pid)
218
219    @support.requires_fork()
220    def test_register_at_fork(self):
221        with self.assertRaises(TypeError, msg="Positional args not allowed"):
222            os.register_at_fork(lambda: None)
223        with self.assertRaises(TypeError, msg="Args must be callable"):
224            os.register_at_fork(before=2)
225        with self.assertRaises(TypeError, msg="Args must be callable"):
226            os.register_at_fork(after_in_child="three")
227        with self.assertRaises(TypeError, msg="Args must be callable"):
228            os.register_at_fork(after_in_parent=b"Five")
229        with self.assertRaises(TypeError, msg="Args must not be None"):
230            os.register_at_fork(before=None)
231        with self.assertRaises(TypeError, msg="Args must not be None"):
232            os.register_at_fork(after_in_child=None)
233        with self.assertRaises(TypeError, msg="Args must not be None"):
234            os.register_at_fork(after_in_parent=None)
235        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
236            # Ensure a combination of valid and invalid is an error.
237            os.register_at_fork(before=None, after_in_parent=lambda: 3)
238        with self.assertRaises(TypeError, msg="At least one argument is required"):
239            # when no arg is passed
240            os.register_at_fork()
241        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
242            # Ensure a combination of valid and invalid is an error.
243            os.register_at_fork(before=lambda: None, after_in_child='')
244        # We test actual registrations in their own process so as not to
245        # pollute this one.  There is no way to unregister for cleanup.
246        code = """if 1:
247            import os
248
249            r, w = os.pipe()
250            fin_r, fin_w = os.pipe()
251
252            os.register_at_fork(before=lambda: os.write(w, b'A'))
253            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
254            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
255            os.register_at_fork(before=lambda: os.write(w, b'B'),
256                                after_in_parent=lambda: os.write(w, b'D'),
257                                after_in_child=lambda: os.write(w, b'F'))
258
259            pid = os.fork()
260            if pid == 0:
261                # At this point, after-forkers have already been executed
262                os.close(w)
263                # Wait for parent to tell us to exit
264                os.read(fin_r, 1)
265                os._exit(0)
266            else:
267                try:
268                    os.close(w)
269                    with open(r, "rb") as f:
270                        data = f.read()
271                        assert len(data) == 6, data
272                        # Check before-fork callbacks
273                        assert data[:2] == b'BA', data
274                        # Check after-fork callbacks
275                        assert sorted(data[2:]) == list(b'CDEF'), data
276                        assert data.index(b'C') < data.index(b'D'), data
277                        assert data.index(b'E') < data.index(b'F'), data
278                finally:
279                    os.write(fin_w, b'!')
280            """
281        assert_python_ok('-c', code)
282
283    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
284    def test_lockf(self):
285        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
286        try:
287            os.write(fd, b'test')
288            os.lseek(fd, 0, os.SEEK_SET)
289            posix.lockf(fd, posix.F_LOCK, 4)
290            # section is locked
291            posix.lockf(fd, posix.F_ULOCK, 4)
292        finally:
293            os.close(fd)
294
295    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
296    def test_pread(self):
297        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
298        try:
299            os.write(fd, b'test')
300            os.lseek(fd, 0, os.SEEK_SET)
301            self.assertEqual(b'es', posix.pread(fd, 2, 1))
302            # the first pread() shouldn't disturb the file offset
303            self.assertEqual(b'te', posix.read(fd, 2))
304        finally:
305            os.close(fd)
306
307    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
308    def test_preadv(self):
309        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
310        try:
311            os.write(fd, b'test1tt2t3t5t6t6t8')
312            buf = [bytearray(i) for i in [5, 3, 2]]
313            self.assertEqual(posix.preadv(fd, buf, 3), 10)
314            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
315        finally:
316            os.close(fd)
317
318    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
319    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
320    def test_preadv_flags(self):
321        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
322        try:
323            os.write(fd, b'test1tt2t3t5t6t6t8')
324            buf = [bytearray(i) for i in [5, 3, 2]]
325            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
326            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
327        except NotImplementedError:
328            self.skipTest("preadv2 not available")
329        except OSError as inst:
330            # Is possible that the macro RWF_HIPRI was defined at compilation time
331            # but the option is not supported by the kernel or the runtime libc shared
332            # library.
333            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
334                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
335            else:
336                raise
337        finally:
338            os.close(fd)
339
340    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
341    @requires_32b
342    def test_preadv_overflow_32bits(self):
343        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
344        try:
345            buf = [bytearray(2**16)] * 2**15
346            with self.assertRaises(OSError) as cm:
347                os.preadv(fd, buf, 0)
348            self.assertEqual(cm.exception.errno, errno.EINVAL)
349            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
350        finally:
351            os.close(fd)
352
353    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
354    def test_pwrite(self):
355        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
356        try:
357            os.write(fd, b'test')
358            os.lseek(fd, 0, os.SEEK_SET)
359            posix.pwrite(fd, b'xx', 1)
360            self.assertEqual(b'txxt', posix.read(fd, 4))
361        finally:
362            os.close(fd)
363
364    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
365    def test_pwritev(self):
366        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
367        try:
368            os.write(fd, b"xx")
369            os.lseek(fd, 0, os.SEEK_SET)
370            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
371            self.assertEqual(n, 10)
372
373            os.lseek(fd, 0, os.SEEK_SET)
374            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
375        finally:
376            os.close(fd)
377
378    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
379    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
380    def test_pwritev_flags(self):
381        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
382        try:
383            os.write(fd,b"xx")
384            os.lseek(fd, 0, os.SEEK_SET)
385            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
386            self.assertEqual(n, 10)
387
388            os.lseek(fd, 0, os.SEEK_SET)
389            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
390        finally:
391            os.close(fd)
392
393    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
394    @requires_32b
395    def test_pwritev_overflow_32bits(self):
396        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
397        try:
398            with self.assertRaises(OSError) as cm:
399                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
400            self.assertEqual(cm.exception.errno, errno.EINVAL)
401        finally:
402            os.close(fd)
403
404    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
405        "test needs posix.posix_fallocate()")
406    def test_posix_fallocate(self):
407        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
408        try:
409            posix.posix_fallocate(fd, 0, 10)
410        except OSError as inst:
411            # issue10812, ZFS doesn't appear to support posix_fallocate,
412            # so skip Solaris-based since they are likely to have ZFS.
413            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
414            # often used there.
415            if inst.errno == errno.EINVAL and sys.platform.startswith(
416                ('sunos', 'freebsd', 'openbsd', 'gnukfreebsd')):
417                raise unittest.SkipTest("test may fail on ZFS filesystems")
418            elif inst.errno == errno.EOPNOTSUPP and sys.platform.startswith("netbsd"):
419                raise unittest.SkipTest("test may fail on FFS filesystems")
420            else:
421                raise
422        finally:
423            os.close(fd)
424
425    # issue31106 - posix_fallocate() does not set error in errno.
426    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
427        "test needs posix.posix_fallocate()")
428    def test_posix_fallocate_errno(self):
429        try:
430            posix.posix_fallocate(-42, 0, 10)
431        except OSError as inst:
432            if inst.errno != errno.EBADF:
433                raise
434
435    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
436        "test needs posix.posix_fadvise()")
437    def test_posix_fadvise(self):
438        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
439        try:
440            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
441        finally:
442            os.close(fd)
443
444    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
445        "test needs posix.posix_fadvise()")
446    def test_posix_fadvise_errno(self):
447        try:
448            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
449        except OSError as inst:
450            if inst.errno != errno.EBADF:
451                raise
452
453    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
454    def test_utime_with_fd(self):
455        now = time.time()
456        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
457        try:
458            posix.utime(fd)
459            posix.utime(fd, None)
460            self.assertRaises(TypeError, posix.utime, fd, (None, None))
461            self.assertRaises(TypeError, posix.utime, fd, (now, None))
462            self.assertRaises(TypeError, posix.utime, fd, (None, now))
463            posix.utime(fd, (int(now), int(now)))
464            posix.utime(fd, (now, now))
465            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
466            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
467            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
468            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
469            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
470
471        finally:
472            os.close(fd)
473
474    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
475    def test_utime_nofollow_symlinks(self):
476        now = time.time()
477        posix.utime(os_helper.TESTFN, None, follow_symlinks=False)
478        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
479                          (None, None), follow_symlinks=False)
480        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
481                          (now, None), follow_symlinks=False)
482        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
483                          (None, now), follow_symlinks=False)
484        posix.utime(os_helper.TESTFN, (int(now), int(now)),
485                    follow_symlinks=False)
486        posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False)
487        posix.utime(os_helper.TESTFN, follow_symlinks=False)
488
489    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
490    def test_writev(self):
491        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
492        try:
493            n = os.writev(fd, (b'test1', b'tt2', b't3'))
494            self.assertEqual(n, 10)
495
496            os.lseek(fd, 0, os.SEEK_SET)
497            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
498
499            # Issue #20113: empty list of buffers should not crash
500            try:
501                size = posix.writev(fd, [])
502            except OSError:
503                # writev(fd, []) raises OSError(22, "Invalid argument")
504                # on OpenIndiana
505                pass
506            else:
507                self.assertEqual(size, 0)
508        finally:
509            os.close(fd)
510
511    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
512    @requires_32b
513    def test_writev_overflow_32bits(self):
514        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
515        try:
516            with self.assertRaises(OSError) as cm:
517                os.writev(fd, [b"x" * 2**16] * 2**15)
518            self.assertEqual(cm.exception.errno, errno.EINVAL)
519        finally:
520            os.close(fd)
521
522    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
523    def test_readv(self):
524        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
525        try:
526            os.write(fd, b'test1tt2t3')
527            os.lseek(fd, 0, os.SEEK_SET)
528            buf = [bytearray(i) for i in [5, 3, 2]]
529            self.assertEqual(posix.readv(fd, buf), 10)
530            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
531
532            # Issue #20113: empty list of buffers should not crash
533            try:
534                size = posix.readv(fd, [])
535            except OSError:
536                # readv(fd, []) raises OSError(22, "Invalid argument")
537                # on OpenIndiana
538                pass
539            else:
540                self.assertEqual(size, 0)
541        finally:
542            os.close(fd)
543
544    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
545    @requires_32b
546    def test_readv_overflow_32bits(self):
547        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
548        try:
549            buf = [bytearray(2**16)] * 2**15
550            with self.assertRaises(OSError) as cm:
551                os.readv(fd, buf)
552            self.assertEqual(cm.exception.errno, errno.EINVAL)
553            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
554        finally:
555            os.close(fd)
556
557    @unittest.skipUnless(hasattr(posix, 'dup'),
558                         'test needs posix.dup()')
559    @unittest.skipIf(support.is_wasi, "WASI does not have dup()")
560    def test_dup(self):
561        fp = open(os_helper.TESTFN)
562        try:
563            fd = posix.dup(fp.fileno())
564            self.assertIsInstance(fd, int)
565            os.close(fd)
566        finally:
567            fp.close()
568
569    @unittest.skipUnless(hasattr(posix, 'confstr'),
570                         'test needs posix.confstr()')
571    @unittest.skipIf(support.is_apple_mobile, "gh-118201: Test is flaky on iOS")
572    def test_confstr(self):
573        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
574        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
575
576    @unittest.skipUnless(hasattr(posix, 'dup2'),
577                         'test needs posix.dup2()')
578    @unittest.skipIf(support.is_wasi, "WASI does not have dup2()")
579    def test_dup2(self):
580        fp1 = open(os_helper.TESTFN)
581        fp2 = open(os_helper.TESTFN)
582        try:
583            posix.dup2(fp1.fileno(), fp2.fileno())
584        finally:
585            fp1.close()
586            fp2.close()
587
588    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
589    @support.requires_linux_version(2, 6, 23)
590    @support.requires_subprocess()
591    def test_oscloexec(self):
592        fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
593        self.addCleanup(os.close, fd)
594        self.assertFalse(os.get_inheritable(fd))
595
596    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
597                         'test needs posix.O_EXLOCK')
598    def test_osexlock(self):
599        fd = os.open(os_helper.TESTFN,
600                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
601        self.assertRaises(OSError, os.open, os_helper.TESTFN,
602                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
603        os.close(fd)
604
605        if hasattr(posix, "O_SHLOCK"):
606            fd = os.open(os_helper.TESTFN,
607                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
608            self.assertRaises(OSError, os.open, os_helper.TESTFN,
609                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
610            os.close(fd)
611
612    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
613                         'test needs posix.O_SHLOCK')
614    def test_osshlock(self):
615        fd1 = os.open(os_helper.TESTFN,
616                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
617        fd2 = os.open(os_helper.TESTFN,
618                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
619        os.close(fd2)
620        os.close(fd1)
621
622        if hasattr(posix, "O_EXLOCK"):
623            fd = os.open(os_helper.TESTFN,
624                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
625            self.assertRaises(OSError, os.open, os_helper.TESTFN,
626                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
627            os.close(fd)
628
629    @unittest.skipUnless(hasattr(posix, 'fstat'),
630                         'test needs posix.fstat()')
631    def test_fstat(self):
632        fp = open(os_helper.TESTFN)
633        try:
634            self.assertTrue(posix.fstat(fp.fileno()))
635            self.assertTrue(posix.stat(fp.fileno()))
636
637            self.assertRaisesRegex(TypeError,
638                    'should be string, bytes, os.PathLike or integer, not',
639                    posix.stat, float(fp.fileno()))
640        finally:
641            fp.close()
642
643    def test_stat(self):
644        self.assertTrue(posix.stat(os_helper.TESTFN))
645        self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN)))
646
647        self.assertRaisesRegex(TypeError,
648                'should be string, bytes, os.PathLike or integer, not',
649                posix.stat, bytearray(os.fsencode(os_helper.TESTFN)))
650        self.assertRaisesRegex(TypeError,
651                'should be string, bytes, os.PathLike or integer, not',
652                posix.stat, None)
653        self.assertRaisesRegex(TypeError,
654                'should be string, bytes, os.PathLike or integer, not',
655                posix.stat, list(os_helper.TESTFN))
656        self.assertRaisesRegex(TypeError,
657                'should be string, bytes, os.PathLike or integer, not',
658                posix.stat, list(os.fsencode(os_helper.TESTFN)))
659
660    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
661    def test_mkfifo(self):
662        if sys.platform == "vxworks":
663            fifo_path = os.path.join("/fifos/", os_helper.TESTFN)
664        else:
665            fifo_path = os_helper.TESTFN
666        os_helper.unlink(fifo_path)
667        self.addCleanup(os_helper.unlink, fifo_path)
668        try:
669            posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR)
670        except PermissionError as e:
671            self.skipTest('posix.mkfifo(): %s' % e)
672        self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode))
673
674    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
675                         "don't have mknod()/S_IFIFO")
676    def test_mknod(self):
677        # Test using mknod() to create a FIFO (the only use specified
678        # by POSIX).
679        os_helper.unlink(os_helper.TESTFN)
680        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
681        try:
682            posix.mknod(os_helper.TESTFN, mode, 0)
683        except OSError as e:
684            # Some old systems don't allow unprivileged users to use
685            # mknod(), or only support creating device nodes.
686            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
687        else:
688            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
689
690        # Keyword arguments are also supported
691        os_helper.unlink(os_helper.TESTFN)
692        try:
693            posix.mknod(path=os_helper.TESTFN, mode=mode, device=0,
694                dir_fd=None)
695        except OSError as e:
696            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
697
698    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
699    def test_makedev(self):
700        st = posix.stat(os_helper.TESTFN)
701        dev = st.st_dev
702        self.assertIsInstance(dev, int)
703        self.assertGreaterEqual(dev, 0)
704
705        major = posix.major(dev)
706        self.assertIsInstance(major, int)
707        self.assertGreaterEqual(major, 0)
708        self.assertEqual(posix.major(dev), major)
709        self.assertRaises(TypeError, posix.major, float(dev))
710        self.assertRaises(TypeError, posix.major)
711        for x in -2, 2**64, -2**63-1:
712            self.assertRaises((ValueError, OverflowError), posix.major, x)
713
714        minor = posix.minor(dev)
715        self.assertIsInstance(minor, int)
716        self.assertGreaterEqual(minor, 0)
717        self.assertEqual(posix.minor(dev), minor)
718        self.assertRaises(TypeError, posix.minor, float(dev))
719        self.assertRaises(TypeError, posix.minor)
720        for x in -2, 2**64, -2**63-1:
721            self.assertRaises((ValueError, OverflowError), posix.minor, x)
722
723        self.assertEqual(posix.makedev(major, minor), dev)
724        self.assertRaises(TypeError, posix.makedev, float(major), minor)
725        self.assertRaises(TypeError, posix.makedev, major, float(minor))
726        self.assertRaises(TypeError, posix.makedev, major)
727        self.assertRaises(TypeError, posix.makedev)
728        for x in -2, 2**32, 2**64, -2**63-1:
729            self.assertRaises((ValueError, OverflowError), posix.makedev, x, minor)
730            self.assertRaises((ValueError, OverflowError), posix.makedev, major, x)
731
732        if sys.platform == 'linux':
733            NODEV = -1
734            self.assertEqual(posix.major(NODEV), NODEV)
735            self.assertEqual(posix.minor(NODEV), NODEV)
736            self.assertEqual(posix.makedev(NODEV, NODEV), NODEV)
737
738    def _test_all_chown_common(self, chown_func, first_param, stat_func):
739        """Common code for chown, fchown and lchown tests."""
740        def check_stat(uid, gid):
741            if stat_func is not None:
742                stat = stat_func(first_param)
743                self.assertEqual(stat.st_uid, uid)
744                self.assertEqual(stat.st_gid, gid)
745        uid = os.getuid()
746        gid = os.getgid()
747        # test a successful chown call
748        chown_func(first_param, uid, gid)
749        check_stat(uid, gid)
750        chown_func(first_param, -1, gid)
751        check_stat(uid, gid)
752        chown_func(first_param, uid, -1)
753        check_stat(uid, gid)
754
755        if sys.platform == "vxworks":
756            # On VxWorks, root user id is 1 and 0 means no login user:
757            # both are super users.
758            is_root = (uid in (0, 1))
759        else:
760            is_root = (uid == 0)
761        if support.is_emscripten:
762            # Emscripten getuid() / geteuid() always return 0 (root), but
763            # cannot chown uid/gid to random value.
764            pass
765        elif is_root:
766            # Try an amusingly large uid/gid to make sure we handle
767            # large unsigned values.  (chown lets you use any
768            # uid/gid you like, even if they aren't defined.)
769            #
770            # On VxWorks uid_t is defined as unsigned short. A big
771            # value greater than 65535 will result in underflow error.
772            #
773            # This problem keeps coming up:
774            #   http://bugs.python.org/issue1747858
775            #   http://bugs.python.org/issue4591
776            #   http://bugs.python.org/issue15301
777            # Hopefully the fix in 4591 fixes it for good!
778            #
779            # This part of the test only runs when run as root.
780            # Only scary people run their tests as root.
781
782            big_value = (2**31 if sys.platform != "vxworks" else 2**15)
783            chown_func(first_param, big_value, big_value)
784            check_stat(big_value, big_value)
785            chown_func(first_param, -1, -1)
786            check_stat(big_value, big_value)
787            chown_func(first_param, uid, gid)
788            check_stat(uid, gid)
789        elif platform.system() in ('HP-UX', 'SunOS'):
790            # HP-UX and Solaris can allow a non-root user to chown() to root
791            # (issue #5113)
792            raise unittest.SkipTest("Skipping because of non-standard chown() "
793                                    "behavior")
794        else:
795            # non-root cannot chown to root, raises OSError
796            self.assertRaises(OSError, chown_func, first_param, 0, 0)
797            check_stat(uid, gid)
798            self.assertRaises(OSError, chown_func, first_param, 0, -1)
799            check_stat(uid, gid)
800            if hasattr(os, 'getgroups'):
801                if 0 not in os.getgroups():
802                    self.assertRaises(OSError, chown_func, first_param, -1, 0)
803                    check_stat(uid, gid)
804        # test illegal types
805        for t in str, float:
806            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
807            check_stat(uid, gid)
808            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
809            check_stat(uid, gid)
810
811    @unittest.skipUnless(hasattr(os, "chown"), "requires os.chown()")
812    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
813    def test_chown(self):
814        # raise an OSError if the file does not exist
815        os.unlink(os_helper.TESTFN)
816        self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1)
817
818        # re-create the file
819        os_helper.create_empty_file(os_helper.TESTFN)
820        self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat)
821
822    @os_helper.skip_unless_working_chmod
823    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
824    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
825    def test_fchown(self):
826        os.unlink(os_helper.TESTFN)
827
828        # re-create the file
829        test_file = open(os_helper.TESTFN, 'w')
830        try:
831            fd = test_file.fileno()
832            self._test_all_chown_common(posix.fchown, fd,
833                                        getattr(posix, 'fstat', None))
834        finally:
835            test_file.close()
836
837    @os_helper.skip_unless_working_chmod
838    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
839    def test_lchown(self):
840        os.unlink(os_helper.TESTFN)
841        # create a symlink
842        os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN)
843        self._test_all_chown_common(posix.lchown, os_helper.TESTFN,
844                                    getattr(posix, 'lstat', None))
845
846    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
847    def test_chdir(self):
848        posix.chdir(os.curdir)
849        self.assertRaises(OSError, posix.chdir, os_helper.TESTFN)
850
851    def test_listdir(self):
852        self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir))
853
854    def test_listdir_default(self):
855        # When listdir is called without argument,
856        # it's the same as listdir(os.curdir).
857        self.assertIn(os_helper.TESTFN, posix.listdir())
858
859    def test_listdir_bytes(self):
860        # When listdir is called with a bytes object,
861        # the returned strings are of type bytes.
862        self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.'))
863
864    def test_listdir_bytes_like(self):
865        for cls in bytearray, memoryview:
866            with self.assertRaises(TypeError):
867                posix.listdir(cls(b'.'))
868
869    @unittest.skipUnless(posix.listdir in os.supports_fd,
870                         "test needs fd support for posix.listdir()")
871    def test_listdir_fd(self):
872        f = posix.open(posix.getcwd(), posix.O_RDONLY)
873        self.addCleanup(posix.close, f)
874        self.assertEqual(
875            sorted(posix.listdir('.')),
876            sorted(posix.listdir(f))
877            )
878        # Check that the fd offset was reset (issue #13739)
879        self.assertEqual(
880            sorted(posix.listdir('.')),
881            sorted(posix.listdir(f))
882            )
883
884    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
885    def test_access(self):
886        self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK))
887
888    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
889    def test_umask(self):
890        old_mask = posix.umask(0)
891        self.assertIsInstance(old_mask, int)
892        posix.umask(old_mask)
893
894    @unittest.skipUnless(hasattr(posix, 'strerror'),
895                         'test needs posix.strerror()')
896    def test_strerror(self):
897        self.assertTrue(posix.strerror(0))
898
899    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
900    def test_pipe(self):
901        reader, writer = posix.pipe()
902        os.close(reader)
903        os.close(writer)
904
905    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
906    @support.requires_linux_version(2, 6, 27)
907    def test_pipe2(self):
908        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
909        self.assertRaises(TypeError, os.pipe2, 0, 0)
910
911        # try calling with flags = 0, like os.pipe()
912        r, w = os.pipe2(0)
913        os.close(r)
914        os.close(w)
915
916        # test flags
917        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
918        self.addCleanup(os.close, r)
919        self.addCleanup(os.close, w)
920        self.assertFalse(os.get_inheritable(r))
921        self.assertFalse(os.get_inheritable(w))
922        self.assertFalse(os.get_blocking(r))
923        self.assertFalse(os.get_blocking(w))
924        # try reading from an empty pipe: this should fail, not block
925        self.assertRaises(OSError, os.read, r, 1)
926        # try a write big enough to fill-up the pipe: this should either
927        # fail or perform a partial write, not block
928        try:
929            os.write(w, b'x' * support.PIPE_MAX_SIZE)
930        except OSError:
931            pass
932
933    @support.cpython_only
934    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
935    @support.requires_linux_version(2, 6, 27)
936    def test_pipe2_c_limits(self):
937        # Issue 15989
938        import _testcapi
939        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
940        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
941
942    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
943    def test_utime(self):
944        now = time.time()
945        posix.utime(os_helper.TESTFN, None)
946        self.assertRaises(TypeError, posix.utime,
947                          os_helper.TESTFN, (None, None))
948        self.assertRaises(TypeError, posix.utime,
949                          os_helper.TESTFN, (now, None))
950        self.assertRaises(TypeError, posix.utime,
951                          os_helper.TESTFN, (None, now))
952        posix.utime(os_helper.TESTFN, (int(now), int(now)))
953        posix.utime(os_helper.TESTFN, (now, now))
954
955    def check_chmod(self, chmod_func, target, **kwargs):
956        closefd = not isinstance(target, int)
957        mode = os.stat(target).st_mode
958        try:
959            new_mode = mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
960            chmod_func(target, new_mode, **kwargs)
961            self.assertEqual(os.stat(target).st_mode, new_mode)
962            if stat.S_ISREG(mode):
963                try:
964                    with open(target, 'wb+', closefd=closefd):
965                        pass
966                except PermissionError:
967                    pass
968            new_mode = mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
969            chmod_func(target, new_mode, **kwargs)
970            self.assertEqual(os.stat(target).st_mode, new_mode)
971            if stat.S_ISREG(mode):
972                with open(target, 'wb+', closefd=closefd):
973                    pass
974        finally:
975            chmod_func(target, mode)
976
977    @os_helper.skip_unless_working_chmod
978    def test_chmod_file(self):
979        self.check_chmod(posix.chmod, os_helper.TESTFN)
980
981    def tempdir(self):
982        target = os_helper.TESTFN + 'd'
983        posix.mkdir(target)
984        self.addCleanup(posix.rmdir, target)
985        return target
986
987    @os_helper.skip_unless_working_chmod
988    def test_chmod_dir(self):
989        target = self.tempdir()
990        self.check_chmod(posix.chmod, target)
991
992    @os_helper.skip_unless_working_chmod
993    def test_fchmod_file(self):
994        with open(os_helper.TESTFN, 'wb+') as f:
995            self.check_chmod(posix.fchmod, f.fileno())
996            self.check_chmod(posix.chmod, f.fileno())
997
998    @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()')
999    def test_lchmod_file(self):
1000        self.check_chmod(posix.lchmod, os_helper.TESTFN)
1001        self.check_chmod(posix.chmod, os_helper.TESTFN, follow_symlinks=False)
1002
1003    @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()')
1004    def test_lchmod_dir(self):
1005        target = self.tempdir()
1006        self.check_chmod(posix.lchmod, target)
1007        self.check_chmod(posix.chmod, target, follow_symlinks=False)
1008
1009    def check_chmod_link(self, chmod_func, target, link, **kwargs):
1010        target_mode = os.stat(target).st_mode
1011        link_mode = os.lstat(link).st_mode
1012        try:
1013            new_mode = target_mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
1014            chmod_func(link, new_mode, **kwargs)
1015            self.assertEqual(os.stat(target).st_mode, new_mode)
1016            self.assertEqual(os.lstat(link).st_mode, link_mode)
1017            new_mode = target_mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
1018            chmod_func(link, new_mode, **kwargs)
1019            self.assertEqual(os.stat(target).st_mode, new_mode)
1020            self.assertEqual(os.lstat(link).st_mode, link_mode)
1021        finally:
1022            posix.chmod(target, target_mode)
1023
1024    def check_lchmod_link(self, chmod_func, target, link, **kwargs):
1025        target_mode = os.stat(target).st_mode
1026        link_mode = os.lstat(link).st_mode
1027        new_mode = link_mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
1028        chmod_func(link, new_mode, **kwargs)
1029        self.assertEqual(os.stat(target).st_mode, target_mode)
1030        self.assertEqual(os.lstat(link).st_mode, new_mode)
1031        new_mode = link_mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR)
1032        chmod_func(link, new_mode, **kwargs)
1033        self.assertEqual(os.stat(target).st_mode, target_mode)
1034        self.assertEqual(os.lstat(link).st_mode, new_mode)
1035
1036    @os_helper.skip_unless_symlink
1037    def test_chmod_file_symlink(self):
1038        target = os_helper.TESTFN
1039        link = os_helper.TESTFN + '-link'
1040        os.symlink(target, link)
1041        self.addCleanup(posix.unlink, link)
1042        if os.name == 'nt':
1043            self.check_lchmod_link(posix.chmod, target, link)
1044        else:
1045            self.check_chmod_link(posix.chmod, target, link)
1046        self.check_chmod_link(posix.chmod, target, link, follow_symlinks=True)
1047
1048    @os_helper.skip_unless_symlink
1049    def test_chmod_dir_symlink(self):
1050        target = self.tempdir()
1051        link = os_helper.TESTFN + '-link'
1052        os.symlink(target, link, target_is_directory=True)
1053        self.addCleanup(posix.unlink, link)
1054        if os.name == 'nt':
1055            self.check_lchmod_link(posix.chmod, target, link)
1056        else:
1057            self.check_chmod_link(posix.chmod, target, link)
1058        self.check_chmod_link(posix.chmod, target, link, follow_symlinks=True)
1059
1060    @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()')
1061    @os_helper.skip_unless_symlink
1062    def test_lchmod_file_symlink(self):
1063        target = os_helper.TESTFN
1064        link = os_helper.TESTFN + '-link'
1065        os.symlink(target, link)
1066        self.addCleanup(posix.unlink, link)
1067        self.check_lchmod_link(posix.chmod, target, link, follow_symlinks=False)
1068        self.check_lchmod_link(posix.lchmod, target, link)
1069
1070    @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()')
1071    @os_helper.skip_unless_symlink
1072    def test_lchmod_dir_symlink(self):
1073        target = self.tempdir()
1074        link = os_helper.TESTFN + '-link'
1075        os.symlink(target, link)
1076        self.addCleanup(posix.unlink, link)
1077        self.check_lchmod_link(posix.chmod, target, link, follow_symlinks=False)
1078        self.check_lchmod_link(posix.lchmod, target, link)
1079
1080    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
1081        st = os.stat(target_file)
1082        self.assertTrue(hasattr(st, 'st_flags'))
1083
1084        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
1085        flags = st.st_flags | stat.UF_IMMUTABLE
1086        try:
1087            chflags_func(target_file, flags, **kwargs)
1088        except OSError as err:
1089            if err.errno != errno.EOPNOTSUPP:
1090                raise
1091            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
1092            self.skipTest(msg)
1093
1094        try:
1095            new_st = os.stat(target_file)
1096            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
1097            try:
1098                fd = open(target_file, 'w+')
1099            except OSError as e:
1100                self.assertEqual(e.errno, errno.EPERM)
1101        finally:
1102            posix.chflags(target_file, st.st_flags)
1103
1104    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
1105    def test_chflags(self):
1106        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN)
1107
1108    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
1109    def test_lchflags_regular_file(self):
1110        self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN)
1111        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN,
1112                                        follow_symlinks=False)
1113
1114    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
1115    def test_lchflags_symlink(self):
1116        testfn_st = os.stat(os_helper.TESTFN)
1117
1118        self.assertTrue(hasattr(testfn_st, 'st_flags'))
1119
1120        self.addCleanup(os_helper.unlink, _DUMMY_SYMLINK)
1121        os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK)
1122        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
1123
1124        def chflags_nofollow(path, flags):
1125            return posix.chflags(path, flags, follow_symlinks=False)
1126
1127        for fn in (posix.lchflags, chflags_nofollow):
1128            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
1129            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
1130            try:
1131                fn(_DUMMY_SYMLINK, flags)
1132            except OSError as err:
1133                if err.errno != errno.EOPNOTSUPP:
1134                    raise
1135                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
1136                self.skipTest(msg)
1137            try:
1138                new_testfn_st = os.stat(os_helper.TESTFN)
1139                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
1140
1141                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
1142                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
1143                                 new_dummy_symlink_st.st_flags)
1144            finally:
1145                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
1146
1147    def test_environ(self):
1148        if os.name == "nt":
1149            item_type = str
1150        else:
1151            item_type = bytes
1152        for k, v in posix.environ.items():
1153            self.assertEqual(type(k), item_type)
1154            self.assertEqual(type(v), item_type)
1155
1156    def test_putenv(self):
1157        with self.assertRaises(ValueError):
1158            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
1159        with self.assertRaises(ValueError):
1160            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
1161        with self.assertRaises(ValueError):
1162            os.putenv('FRUIT=ORANGE', 'lemon')
1163        if os.name == 'posix':
1164            with self.assertRaises(ValueError):
1165                os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
1166            with self.assertRaises(ValueError):
1167                os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
1168            with self.assertRaises(ValueError):
1169                os.putenv(b'FRUIT=ORANGE', b'lemon')
1170
1171    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
1172    def test_getcwd_long_pathnames(self):
1173        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
1174        curdir = os.getcwd()
1175        base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd'
1176
1177        try:
1178            os.mkdir(base_path)
1179            os.chdir(base_path)
1180        except:
1181            #  Just returning nothing instead of the SkipTest exception, because
1182            #  the test results in Error in that case.  Is that ok?
1183            #  raise unittest.SkipTest("cannot create directory for testing")
1184            return
1185
1186            def _create_and_do_getcwd(dirname, current_path_length = 0):
1187                try:
1188                    os.mkdir(dirname)
1189                except:
1190                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1191
1192                os.chdir(dirname)
1193                try:
1194                    os.getcwd()
1195                    if current_path_length < 1027:
1196                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1197                finally:
1198                    os.chdir('..')
1199                    os.rmdir(dirname)
1200
1201            _create_and_do_getcwd(dirname)
1202
1203        finally:
1204            os.chdir(curdir)
1205            os_helper.rmtree(base_path)
1206
1207    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1208    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1209    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1210    def test_getgrouplist(self):
1211        user = pwd.getpwuid(os.getuid())[0]
1212        group = pwd.getpwuid(os.getuid())[3]
1213        self.assertIn(group, posix.getgrouplist(user, group))
1214
1215
1216    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1217    @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()")
1218    @support.requires_subprocess()
1219    def test_getgroups(self):
1220        with os.popen('id -G 2>/dev/null') as idg:
1221            groups = idg.read().strip()
1222            ret = idg.close()
1223
1224        try:
1225            idg_groups = set(int(g) for g in groups.split())
1226        except ValueError:
1227            idg_groups = set()
1228        if ret is not None or not idg_groups:
1229            raise unittest.SkipTest("need working 'id -G'")
1230
1231        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1232        if sys.platform == 'darwin':
1233            import sysconfig
1234            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3'
1235            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
1236                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1237
1238        # 'id -G' and 'os.getgroups()' should return the same
1239        # groups, ignoring order, duplicates, and the effective gid.
1240        # #10822/#26944 - It is implementation defined whether
1241        # posix.getgroups() includes the effective gid.
1242        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1243        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1244
1245    @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
1246    @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
1247    def test_cld_xxxx_constants(self):
1248        os.CLD_EXITED
1249        os.CLD_KILLED
1250        os.CLD_DUMPED
1251        os.CLD_TRAPPED
1252        os.CLD_STOPPED
1253        os.CLD_CONTINUED
1254
1255    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1256                                           "don't have scheduling support")
1257    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1258                                                  "don't have sched affinity support")
1259
1260    @requires_sched_h
1261    def test_sched_yield(self):
1262        # This has no error conditions (at least on Linux).
1263        posix.sched_yield()
1264
1265    @requires_sched_h
1266    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1267                         "requires sched_get_priority_max()")
1268    def test_sched_priority(self):
1269        # Round-robin usually has interesting priorities.
1270        pol = posix.SCHED_RR
1271        lo = posix.sched_get_priority_min(pol)
1272        hi = posix.sched_get_priority_max(pol)
1273        self.assertIsInstance(lo, int)
1274        self.assertIsInstance(hi, int)
1275        self.assertGreaterEqual(hi, lo)
1276        # Apple platforms return 15 without checking the argument.
1277        if not is_apple:
1278            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1279            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1280
1281    @requires_sched
1282    def test_get_and_set_scheduler_and_param(self):
1283        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1284                               if name.startswith("SCHED_")]
1285        mine = posix.sched_getscheduler(0)
1286        self.assertIn(mine, possible_schedulers)
1287        try:
1288            parent = posix.sched_getscheduler(os.getppid())
1289        except PermissionError:
1290            # POSIX specifies EPERM, but Android returns EACCES. Both errno
1291            # values are mapped to PermissionError.
1292            pass
1293        else:
1294            self.assertIn(parent, possible_schedulers)
1295        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1296        self.assertRaises(OSError, posix.sched_getparam, -1)
1297        param = posix.sched_getparam(0)
1298        self.assertIsInstance(param.sched_priority, int)
1299
1300        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1301        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1302        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1303        if not sys.platform.startswith(('freebsd', 'netbsd')):
1304            try:
1305                posix.sched_setscheduler(0, mine, param)
1306                posix.sched_setparam(0, param)
1307            except PermissionError:
1308                pass
1309            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1310
1311        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1312        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1313        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1314        param = posix.sched_param(None)
1315        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1316        large = 214748364700
1317        param = posix.sched_param(large)
1318        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1319        param = posix.sched_param(sched_priority=-large)
1320        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1321
1322    @requires_sched
1323    def test_sched_param(self):
1324        param = posix.sched_param(1)
1325        for proto in range(pickle.HIGHEST_PROTOCOL+1):
1326            newparam = pickle.loads(pickle.dumps(param, proto))
1327            self.assertEqual(newparam, param)
1328        newparam = copy.copy(param)
1329        self.assertIsNot(newparam, param)
1330        self.assertEqual(newparam, param)
1331        newparam = copy.deepcopy(param)
1332        self.assertIsNot(newparam, param)
1333        self.assertEqual(newparam, param)
1334        newparam = copy.replace(param)
1335        self.assertIsNot(newparam, param)
1336        self.assertEqual(newparam, param)
1337        newparam = copy.replace(param, sched_priority=0)
1338        self.assertNotEqual(newparam, param)
1339        self.assertEqual(newparam.sched_priority, 0)
1340
1341    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1342    def test_sched_rr_get_interval(self):
1343        try:
1344            interval = posix.sched_rr_get_interval(0)
1345        except OSError as e:
1346            # This likely means that sched_rr_get_interval is only valid for
1347            # processes with the SCHED_RR scheduler in effect.
1348            if e.errno != errno.EINVAL:
1349                raise
1350            self.skipTest("only works on SCHED_RR processes")
1351        self.assertIsInstance(interval, float)
1352        # Reasonable constraints, I think.
1353        self.assertGreaterEqual(interval, 0.)
1354        self.assertLess(interval, 1.)
1355
1356    @requires_sched_affinity
1357    def test_sched_getaffinity(self):
1358        mask = posix.sched_getaffinity(0)
1359        self.assertIsInstance(mask, set)
1360        self.assertGreaterEqual(len(mask), 1)
1361        if not sys.platform.startswith("freebsd"):
1362            # bpo-47205: does not raise OSError on FreeBSD
1363            self.assertRaises(OSError, posix.sched_getaffinity, -1)
1364        for cpu in mask:
1365            self.assertIsInstance(cpu, int)
1366            self.assertGreaterEqual(cpu, 0)
1367            self.assertLess(cpu, 1 << 32)
1368
1369    @requires_sched_affinity
1370    def test_sched_setaffinity(self):
1371        mask = posix.sched_getaffinity(0)
1372        self.addCleanup(posix.sched_setaffinity, 0, list(mask))
1373
1374        if len(mask) > 1:
1375            # Empty masks are forbidden
1376            mask.pop()
1377        posix.sched_setaffinity(0, mask)
1378        self.assertEqual(posix.sched_getaffinity(0), mask)
1379
1380        try:
1381            posix.sched_setaffinity(0, [])
1382            # gh-117061: On RHEL9, sched_setaffinity(0, []) does not fail
1383        except OSError:
1384            # sched_setaffinity() manual page documents EINVAL error
1385            # when the mask is empty.
1386            pass
1387
1388        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1389        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1390        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1391        if not sys.platform.startswith("freebsd"):
1392            # bpo-47205: does not raise OSError on FreeBSD
1393            self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1394
1395    @unittest.skipIf(support.is_wasi, "No dynamic linking on WASI")
1396    @unittest.skipUnless(os.name == 'posix', "POSIX-only test")
1397    def test_rtld_constants(self):
1398        # check presence of major RTLD_* constants
1399        posix.RTLD_LAZY
1400        posix.RTLD_NOW
1401        posix.RTLD_GLOBAL
1402        posix.RTLD_LOCAL
1403
1404    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1405                         "test needs an OS that reports file holes")
1406    def test_fs_holes(self):
1407        # Even if the filesystem doesn't report holes,
1408        # if the OS supports it the SEEK_* constants
1409        # will be defined and will have a consistent
1410        # behaviour:
1411        # os.SEEK_DATA = current position
1412        # os.SEEK_HOLE = end of file position
1413        with open(os_helper.TESTFN, 'r+b') as fp:
1414            fp.write(b"hello")
1415            fp.flush()
1416            size = fp.tell()
1417            fno = fp.fileno()
1418            try :
1419                for i in range(size):
1420                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1421                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1422                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1423                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1424            except OSError :
1425                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1426                # but it is not true.
1427                # For instance:
1428                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1429                raise unittest.SkipTest("OSError raised!")
1430
1431    def test_path_error2(self):
1432        """
1433        Test functions that call path_error2(), providing two filenames in their exceptions.
1434        """
1435        for name in ("rename", "replace", "link"):
1436            function = getattr(os, name, None)
1437            if function is None:
1438                continue
1439
1440            for dst in ("noodly2", os_helper.TESTFN):
1441                try:
1442                    function('doesnotexistfilename', dst)
1443                except OSError as e:
1444                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1445                    break
1446            else:
1447                self.fail("No valid path_error2() test for os." + name)
1448
1449    def test_path_with_null_character(self):
1450        fn = os_helper.TESTFN
1451        fn_with_NUL = fn + '\0'
1452        self.addCleanup(os_helper.unlink, fn)
1453        os_helper.unlink(fn)
1454        fd = None
1455        try:
1456            with self.assertRaises(ValueError):
1457                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1458        finally:
1459            if fd is not None:
1460                os.close(fd)
1461        self.assertFalse(os.path.exists(fn))
1462        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1463        self.assertFalse(os.path.exists(fn))
1464        open(fn, 'wb').close()
1465        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1466
1467    def test_path_with_null_byte(self):
1468        fn = os.fsencode(os_helper.TESTFN)
1469        fn_with_NUL = fn + b'\0'
1470        self.addCleanup(os_helper.unlink, fn)
1471        os_helper.unlink(fn)
1472        fd = None
1473        try:
1474            with self.assertRaises(ValueError):
1475                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1476        finally:
1477            if fd is not None:
1478                os.close(fd)
1479        self.assertFalse(os.path.exists(fn))
1480        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1481        self.assertFalse(os.path.exists(fn))
1482        open(fn, 'wb').close()
1483        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1484
1485    @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable")
1486    def test_pidfd_open(self):
1487        with self.assertRaises(OSError) as cm:
1488            os.pidfd_open(-1)
1489        if cm.exception.errno == errno.ENOSYS:
1490            self.skipTest("system does not support pidfd_open")
1491        if isinstance(cm.exception, PermissionError):
1492            self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}")
1493        self.assertEqual(cm.exception.errno, errno.EINVAL)
1494        os.close(os.pidfd_open(os.getpid(), 0))
1495
1496
1497# tests for the posix *at functions follow
1498class TestPosixDirFd(unittest.TestCase):
1499    count = 0
1500
1501    @contextmanager
1502    def prepare(self):
1503        TestPosixDirFd.count += 1
1504        name = f'{os_helper.TESTFN}_{self.count}'
1505        base_dir = f'{os_helper.TESTFN}_{self.count}base'
1506        posix.mkdir(base_dir)
1507        self.addCleanup(posix.rmdir, base_dir)
1508        fullname = os.path.join(base_dir, name)
1509        assert not os.path.exists(fullname)
1510        with os_helper.open_dir_fd(base_dir) as dir_fd:
1511            yield (dir_fd, name, fullname)
1512
1513    @contextmanager
1514    def prepare_file(self):
1515        with self.prepare() as (dir_fd, name, fullname):
1516            os_helper.create_empty_file(fullname)
1517            self.addCleanup(posix.unlink, fullname)
1518            yield (dir_fd, name, fullname)
1519
1520    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1521    def test_access_dir_fd(self):
1522        with self.prepare_file() as (dir_fd, name, fullname):
1523            self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd))
1524
1525    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1526    def test_chmod_dir_fd(self):
1527        with self.prepare_file() as (dir_fd, name, fullname):
1528            posix.chmod(fullname, stat.S_IRUSR)
1529            posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1530            s = posix.stat(fullname)
1531            self.assertEqual(s.st_mode & stat.S_IRWXU,
1532                             stat.S_IRUSR | stat.S_IWUSR)
1533
1534    @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
1535                         "test needs dir_fd support in os.chown()")
1536    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
1537    def test_chown_dir_fd(self):
1538        with self.prepare_file() as (dir_fd, name, fullname):
1539            posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd)
1540
1541    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1542    def test_stat_dir_fd(self):
1543        with self.prepare() as (dir_fd, name, fullname):
1544            with open(fullname, 'w') as outfile:
1545                outfile.write("testline\n")
1546            self.addCleanup(posix.unlink, fullname)
1547
1548            s1 = posix.stat(fullname)
1549            s2 = posix.stat(name, dir_fd=dir_fd)
1550            self.assertEqual(s1, s2)
1551            s2 = posix.stat(fullname, dir_fd=None)
1552            self.assertEqual(s1, s2)
1553
1554            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1555                    posix.stat, name, dir_fd=posix.getcwd())
1556            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1557                    posix.stat, name, dir_fd=float(dir_fd))
1558            self.assertRaises(OverflowError,
1559                    posix.stat, name, dir_fd=10**20)
1560
1561            for fd in False, True:
1562                with self.assertWarnsRegex(RuntimeWarning,
1563                        'bool is used as a file descriptor') as cm:
1564                    with self.assertRaises(OSError):
1565                        posix.stat('nonexisting', dir_fd=fd)
1566                self.assertEqual(cm.filename, __file__)
1567
1568    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1569    def test_utime_dir_fd(self):
1570        with self.prepare_file() as (dir_fd, name, fullname):
1571            now = time.time()
1572            posix.utime(name, None, dir_fd=dir_fd)
1573            posix.utime(name, dir_fd=dir_fd)
1574            self.assertRaises(TypeError, posix.utime, name,
1575                              now, dir_fd=dir_fd)
1576            self.assertRaises(TypeError, posix.utime, name,
1577                              (None, None), dir_fd=dir_fd)
1578            self.assertRaises(TypeError, posix.utime, name,
1579                              (now, None), dir_fd=dir_fd)
1580            self.assertRaises(TypeError, posix.utime, name,
1581                              (None, now), dir_fd=dir_fd)
1582            self.assertRaises(TypeError, posix.utime, name,
1583                              (now, "x"), dir_fd=dir_fd)
1584            posix.utime(name, (int(now), int(now)), dir_fd=dir_fd)
1585            posix.utime(name, (now, now), dir_fd=dir_fd)
1586            posix.utime(name,
1587                    (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd)
1588            posix.utime(name, dir_fd=dir_fd,
1589                            times=(int(now), int((now - int(now)) * 1e9)))
1590
1591            # try dir_fd and follow_symlinks together
1592            if os.utime in os.supports_follow_symlinks:
1593                try:
1594                    posix.utime(name, follow_symlinks=False, dir_fd=dir_fd)
1595                except ValueError:
1596                    # whoops!  using both together not supported on this platform.
1597                    pass
1598
1599    @unittest.skipIf(
1600        support.is_wasi,
1601        "WASI: symlink following on path_link is not supported"
1602    )
1603    @unittest.skipUnless(
1604        hasattr(os, "link") and os.link in os.supports_dir_fd,
1605        "test needs dir_fd support in os.link()"
1606    )
1607    def test_link_dir_fd(self):
1608        with self.prepare_file() as (dir_fd, name, fullname), \
1609             self.prepare() as (dir_fd2, linkname, fulllinkname):
1610            try:
1611                posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1612            except PermissionError as e:
1613                self.skipTest('posix.link(): %s' % e)
1614            self.addCleanup(posix.unlink, fulllinkname)
1615            # should have same inodes
1616            self.assertEqual(posix.stat(fullname)[1],
1617                posix.stat(fulllinkname)[1])
1618
1619    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1620    def test_mkdir_dir_fd(self):
1621        with self.prepare() as (dir_fd, name, fullname):
1622            posix.mkdir(name, dir_fd=dir_fd)
1623            self.addCleanup(posix.rmdir, fullname)
1624            posix.stat(fullname) # should not raise exception
1625
1626    @unittest.skipUnless(hasattr(os, 'mknod')
1627                         and (os.mknod in os.supports_dir_fd)
1628                         and hasattr(stat, 'S_IFIFO'),
1629                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1630    def test_mknod_dir_fd(self):
1631        # Test using mknodat() to create a FIFO (the only use specified
1632        # by POSIX).
1633        with self.prepare() as (dir_fd, name, fullname):
1634            mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1635            try:
1636                posix.mknod(name, mode, 0, dir_fd=dir_fd)
1637            except OSError as e:
1638                # Some old systems don't allow unprivileged users to use
1639                # mknod(), or only support creating device nodes.
1640                self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1641            else:
1642                self.addCleanup(posix.unlink, fullname)
1643                self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1644
1645    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1646    def test_open_dir_fd(self):
1647        with self.prepare() as (dir_fd, name, fullname):
1648            with open(fullname, 'wb') as outfile:
1649                outfile.write(b"testline\n")
1650            self.addCleanup(posix.unlink, fullname)
1651            fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd)
1652            try:
1653                res = posix.read(fd, 9)
1654                self.assertEqual(b"testline\n", res)
1655            finally:
1656                posix.close(fd)
1657
1658    @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
1659                         "test needs dir_fd support in os.readlink()")
1660    def test_readlink_dir_fd(self):
1661        with self.prepare() as (dir_fd, name, fullname):
1662            os.symlink('symlink', fullname)
1663            self.addCleanup(posix.unlink, fullname)
1664            self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink')
1665
1666    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1667    def test_rename_dir_fd(self):
1668        with self.prepare_file() as (dir_fd, name, fullname), \
1669             self.prepare() as (dir_fd2, name2, fullname2):
1670            posix.rename(name, name2,
1671                         src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1672            posix.stat(fullname2) # should not raise exception
1673            posix.rename(fullname2, fullname)
1674
1675    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1676    def test_symlink_dir_fd(self):
1677        with self.prepare() as (dir_fd, name, fullname):
1678            posix.symlink('symlink', name, dir_fd=dir_fd)
1679            self.addCleanup(posix.unlink, fullname)
1680            self.assertEqual(posix.readlink(fullname), 'symlink')
1681
1682    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1683    def test_unlink_dir_fd(self):
1684        with self.prepare() as (dir_fd, name, fullname):
1685            os_helper.create_empty_file(fullname)
1686            posix.stat(fullname) # should not raise exception
1687            try:
1688                posix.unlink(name, dir_fd=dir_fd)
1689                self.assertRaises(OSError, posix.stat, fullname)
1690            except:
1691                self.addCleanup(posix.unlink, fullname)
1692                raise
1693
1694    @unittest.skipUnless(hasattr(os, 'mkfifo') and os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1695    def test_mkfifo_dir_fd(self):
1696        with self.prepare() as (dir_fd, name, fullname):
1697            try:
1698                posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1699            except PermissionError as e:
1700                self.skipTest('posix.mkfifo(): %s' % e)
1701            self.addCleanup(posix.unlink, fullname)
1702            self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1703
1704
1705class PosixGroupsTester(unittest.TestCase):
1706
1707    def setUp(self):
1708        if posix.getuid() != 0:
1709            raise unittest.SkipTest("not enough privileges")
1710        if not hasattr(posix, 'getgroups'):
1711            raise unittest.SkipTest("need posix.getgroups")
1712        if sys.platform == 'darwin':
1713            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1714        self.saved_groups = posix.getgroups()
1715
1716    def tearDown(self):
1717        if hasattr(posix, 'setgroups'):
1718            posix.setgroups(self.saved_groups)
1719        elif hasattr(posix, 'initgroups'):
1720            name = pwd.getpwuid(posix.getuid()).pw_name
1721            posix.initgroups(name, self.saved_groups[0])
1722
1723    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1724                         "test needs posix.initgroups()")
1725    def test_initgroups(self):
1726        # find missing group
1727
1728        g = max(self.saved_groups or [0]) + 1
1729        name = pwd.getpwuid(posix.getuid()).pw_name
1730        posix.initgroups(name, g)
1731        self.assertIn(g, posix.getgroups())
1732
1733    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1734                         "test needs posix.setgroups()")
1735    def test_setgroups(self):
1736        for groups in [[0], list(range(16))]:
1737            posix.setgroups(groups)
1738            self.assertListEqual(groups, posix.getgroups())
1739
1740
1741class _PosixSpawnMixin:
1742    # Program which does nothing and exits with status 0 (success)
1743    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1744    spawn_func = None
1745
1746    def python_args(self, *args):
1747        # Disable site module to avoid side effects. For example,
1748        # on Fedora 28, if the HOME environment variable is not set,
1749        # site._getuserbase() calls pwd.getpwuid() which opens
1750        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1751        # test_close_file() to fail.
1752        return (sys.executable, '-I', '-S', *args)
1753
1754    def test_returns_pid(self):
1755        pidfile = os_helper.TESTFN
1756        self.addCleanup(os_helper.unlink, pidfile)
1757        script = f"""if 1:
1758            import os
1759            with open({pidfile!r}, "w") as pidfile:
1760                pidfile.write(str(os.getpid()))
1761            """
1762        args = self.python_args('-c', script)
1763        pid = self.spawn_func(args[0], args, os.environ)
1764        support.wait_process(pid, exitcode=0)
1765        with open(pidfile, encoding="utf-8") as f:
1766            self.assertEqual(f.read(), str(pid))
1767
1768    def test_no_such_executable(self):
1769        no_such_executable = 'no_such_executable'
1770        try:
1771            pid = self.spawn_func(no_such_executable,
1772                                  [no_such_executable],
1773                                  os.environ)
1774        # bpo-35794: PermissionError can be raised if there are
1775        # directories in the $PATH that are not accessible.
1776        except (FileNotFoundError, PermissionError) as exc:
1777            self.assertEqual(exc.filename, no_such_executable)
1778        else:
1779            pid2, status = os.waitpid(pid, 0)
1780            self.assertEqual(pid2, pid)
1781            self.assertNotEqual(status, 0)
1782
1783    def test_specify_environment(self):
1784        envfile = os_helper.TESTFN
1785        self.addCleanup(os_helper.unlink, envfile)
1786        script = f"""if 1:
1787            import os
1788            with open({envfile!r}, "w", encoding="utf-8") as envfile:
1789                envfile.write(os.environ['foo'])
1790        """
1791        args = self.python_args('-c', script)
1792        pid = self.spawn_func(args[0], args,
1793                              {**os.environ, 'foo': 'bar'})
1794        support.wait_process(pid, exitcode=0)
1795        with open(envfile, encoding="utf-8") as f:
1796            self.assertEqual(f.read(), 'bar')
1797
1798    def test_none_file_actions(self):
1799        pid = self.spawn_func(
1800            self.NOOP_PROGRAM[0],
1801            self.NOOP_PROGRAM,
1802            os.environ,
1803            file_actions=None
1804        )
1805        support.wait_process(pid, exitcode=0)
1806
1807    def test_empty_file_actions(self):
1808        pid = self.spawn_func(
1809            self.NOOP_PROGRAM[0],
1810            self.NOOP_PROGRAM,
1811            os.environ,
1812            file_actions=[]
1813        )
1814        support.wait_process(pid, exitcode=0)
1815
1816    def test_resetids_explicit_default(self):
1817        pid = self.spawn_func(
1818            sys.executable,
1819            [sys.executable, '-c', 'pass'],
1820            os.environ,
1821            resetids=False
1822        )
1823        support.wait_process(pid, exitcode=0)
1824
1825    def test_resetids(self):
1826        pid = self.spawn_func(
1827            sys.executable,
1828            [sys.executable, '-c', 'pass'],
1829            os.environ,
1830            resetids=True
1831        )
1832        support.wait_process(pid, exitcode=0)
1833
1834    def test_setpgroup(self):
1835        pid = self.spawn_func(
1836            sys.executable,
1837            [sys.executable, '-c', 'pass'],
1838            os.environ,
1839            setpgroup=os.getpgrp()
1840        )
1841        support.wait_process(pid, exitcode=0)
1842
1843    def test_setpgroup_wrong_type(self):
1844        with self.assertRaises(TypeError):
1845            self.spawn_func(sys.executable,
1846                            [sys.executable, "-c", "pass"],
1847                            os.environ, setpgroup="023")
1848
1849    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1850                           'need signal.pthread_sigmask()')
1851    def test_setsigmask(self):
1852        code = textwrap.dedent("""\
1853            import signal
1854            signal.raise_signal(signal.SIGUSR1)""")
1855
1856        pid = self.spawn_func(
1857            sys.executable,
1858            [sys.executable, '-c', code],
1859            os.environ,
1860            setsigmask=[signal.SIGUSR1]
1861        )
1862        support.wait_process(pid, exitcode=0)
1863
1864    def test_setsigmask_wrong_type(self):
1865        with self.assertRaises(TypeError):
1866            self.spawn_func(sys.executable,
1867                            [sys.executable, "-c", "pass"],
1868                            os.environ, setsigmask=34)
1869        with self.assertRaises(TypeError):
1870            self.spawn_func(sys.executable,
1871                            [sys.executable, "-c", "pass"],
1872                            os.environ, setsigmask=["j"])
1873        with self.assertRaises(ValueError):
1874            self.spawn_func(sys.executable,
1875                            [sys.executable, "-c", "pass"],
1876                            os.environ, setsigmask=[signal.NSIG,
1877                                                    signal.NSIG+1])
1878
1879    def test_setsid(self):
1880        rfd, wfd = os.pipe()
1881        self.addCleanup(os.close, rfd)
1882        try:
1883            os.set_inheritable(wfd, True)
1884
1885            code = textwrap.dedent(f"""
1886                import os
1887                fd = {wfd}
1888                sid = os.getsid(0)
1889                os.write(fd, str(sid).encode())
1890            """)
1891
1892            try:
1893                pid = self.spawn_func(sys.executable,
1894                                      [sys.executable, "-c", code],
1895                                      os.environ, setsid=True)
1896            except NotImplementedError as exc:
1897                self.skipTest(f"setsid is not supported: {exc!r}")
1898            except PermissionError as exc:
1899                self.skipTest(f"setsid failed with: {exc!r}")
1900        finally:
1901            os.close(wfd)
1902
1903        support.wait_process(pid, exitcode=0)
1904
1905        output = os.read(rfd, 100)
1906        child_sid = int(output)
1907        parent_sid = os.getsid(os.getpid())
1908        self.assertNotEqual(parent_sid, child_sid)
1909
1910    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1911                         'need signal.pthread_sigmask()')
1912    def test_setsigdef(self):
1913        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1914        code = textwrap.dedent("""\
1915            import signal
1916            signal.raise_signal(signal.SIGUSR1)""")
1917        try:
1918            pid = self.spawn_func(
1919                sys.executable,
1920                [sys.executable, '-c', code],
1921                os.environ,
1922                setsigdef=[signal.SIGUSR1]
1923            )
1924        finally:
1925            signal.signal(signal.SIGUSR1, original_handler)
1926
1927        support.wait_process(pid, exitcode=-signal.SIGUSR1)
1928
1929    def test_setsigdef_wrong_type(self):
1930        with self.assertRaises(TypeError):
1931            self.spawn_func(sys.executable,
1932                            [sys.executable, "-c", "pass"],
1933                            os.environ, setsigdef=34)
1934        with self.assertRaises(TypeError):
1935            self.spawn_func(sys.executable,
1936                            [sys.executable, "-c", "pass"],
1937                            os.environ, setsigdef=["j"])
1938        with self.assertRaises(ValueError):
1939            self.spawn_func(sys.executable,
1940                            [sys.executable, "-c", "pass"],
1941                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1942
1943    @requires_sched
1944    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1945                     "bpo-34685: test can fail on BSD")
1946    def test_setscheduler_only_param(self):
1947        policy = os.sched_getscheduler(0)
1948        priority = os.sched_get_priority_min(policy)
1949        code = textwrap.dedent(f"""\
1950            import os, sys
1951            if os.sched_getscheduler(0) != {policy}:
1952                sys.exit(101)
1953            if os.sched_getparam(0).sched_priority != {priority}:
1954                sys.exit(102)""")
1955        pid = self.spawn_func(
1956            sys.executable,
1957            [sys.executable, '-c', code],
1958            os.environ,
1959            scheduler=(None, os.sched_param(priority))
1960        )
1961        support.wait_process(pid, exitcode=0)
1962
1963    @requires_sched
1964    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1965                     "bpo-34685: test can fail on BSD")
1966    def test_setscheduler_with_policy(self):
1967        policy = os.sched_getscheduler(0)
1968        priority = os.sched_get_priority_min(policy)
1969        code = textwrap.dedent(f"""\
1970            import os, sys
1971            if os.sched_getscheduler(0) != {policy}:
1972                sys.exit(101)
1973            if os.sched_getparam(0).sched_priority != {priority}:
1974                sys.exit(102)""")
1975        pid = self.spawn_func(
1976            sys.executable,
1977            [sys.executable, '-c', code],
1978            os.environ,
1979            scheduler=(policy, os.sched_param(priority))
1980        )
1981        support.wait_process(pid, exitcode=0)
1982
1983    def test_multiple_file_actions(self):
1984        file_actions = [
1985            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1986            (os.POSIX_SPAWN_CLOSE, 0),
1987            (os.POSIX_SPAWN_DUP2, 1, 4),
1988        ]
1989        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1990                              self.NOOP_PROGRAM,
1991                              os.environ,
1992                              file_actions=file_actions)
1993        support.wait_process(pid, exitcode=0)
1994
1995    def test_bad_file_actions(self):
1996        args = self.NOOP_PROGRAM
1997        with self.assertRaises(TypeError):
1998            self.spawn_func(args[0], args, os.environ,
1999                            file_actions=[None])
2000        with self.assertRaises(TypeError):
2001            self.spawn_func(args[0], args, os.environ,
2002                            file_actions=[()])
2003        with self.assertRaises(TypeError):
2004            self.spawn_func(args[0], args, os.environ,
2005                            file_actions=[(None,)])
2006        with self.assertRaises(TypeError):
2007            self.spawn_func(args[0], args, os.environ,
2008                            file_actions=[(12345,)])
2009        with self.assertRaises(TypeError):
2010            self.spawn_func(args[0], args, os.environ,
2011                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
2012        with self.assertRaises(TypeError):
2013            self.spawn_func(args[0], args, os.environ,
2014                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
2015        with self.assertRaises(TypeError):
2016            self.spawn_func(args[0], args, os.environ,
2017                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
2018        with self.assertRaises(ValueError):
2019            self.spawn_func(args[0], args, os.environ,
2020                            file_actions=[(os.POSIX_SPAWN_OPEN,
2021                                           3, __file__ + '\0',
2022                                           os.O_RDONLY, 0)])
2023
2024    def test_open_file(self):
2025        outfile = os_helper.TESTFN
2026        self.addCleanup(os_helper.unlink, outfile)
2027        script = """if 1:
2028            import sys
2029            sys.stdout.write("hello")
2030            """
2031        file_actions = [
2032            (os.POSIX_SPAWN_OPEN, 1, outfile,
2033                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
2034                stat.S_IRUSR | stat.S_IWUSR),
2035        ]
2036        args = self.python_args('-c', script)
2037        pid = self.spawn_func(args[0], args, os.environ,
2038                              file_actions=file_actions)
2039
2040        support.wait_process(pid, exitcode=0)
2041        with open(outfile, encoding="utf-8") as f:
2042            self.assertEqual(f.read(), 'hello')
2043
2044    def test_close_file(self):
2045        closefile = os_helper.TESTFN
2046        self.addCleanup(os_helper.unlink, closefile)
2047        script = f"""if 1:
2048            import os
2049            try:
2050                os.fstat(0)
2051            except OSError as e:
2052                with open({closefile!r}, 'w', encoding='utf-8') as closefile:
2053                    closefile.write('is closed %d' % e.errno)
2054            """
2055        args = self.python_args('-c', script)
2056        pid = self.spawn_func(args[0], args, os.environ,
2057                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
2058
2059        support.wait_process(pid, exitcode=0)
2060        with open(closefile, encoding="utf-8") as f:
2061            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
2062
2063    def test_dup2(self):
2064        dupfile = os_helper.TESTFN
2065        self.addCleanup(os_helper.unlink, dupfile)
2066        script = """if 1:
2067            import sys
2068            sys.stdout.write("hello")
2069            """
2070        with open(dupfile, "wb") as childfile:
2071            file_actions = [
2072                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
2073            ]
2074            args = self.python_args('-c', script)
2075            pid = self.spawn_func(args[0], args, os.environ,
2076                                  file_actions=file_actions)
2077            support.wait_process(pid, exitcode=0)
2078        with open(dupfile, encoding="utf-8") as f:
2079            self.assertEqual(f.read(), 'hello')
2080
2081
2082@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
2083@support.requires_subprocess()
2084class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
2085    spawn_func = getattr(posix, 'posix_spawn', None)
2086
2087
2088@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
2089@support.requires_subprocess()
2090class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
2091    spawn_func = getattr(posix, 'posix_spawnp', None)
2092
2093    @os_helper.skip_unless_symlink
2094    def test_posix_spawnp(self):
2095        # Use a symlink to create a program in its own temporary directory
2096        temp_dir = tempfile.mkdtemp()
2097        self.addCleanup(os_helper.rmtree, temp_dir)
2098
2099        program = 'posix_spawnp_test_program.exe'
2100        program_fullpath = os.path.join(temp_dir, program)
2101        os.symlink(sys.executable, program_fullpath)
2102
2103        try:
2104            path = os.pathsep.join((temp_dir, os.environ['PATH']))
2105        except KeyError:
2106            path = temp_dir   # PATH is not set
2107
2108        spawn_args = (program, '-I', '-S', '-c', 'pass')
2109        code = textwrap.dedent("""
2110            import os
2111            from test import support
2112
2113            args = %a
2114            pid = os.posix_spawnp(args[0], args, os.environ)
2115
2116            support.wait_process(pid, exitcode=0)
2117        """ % (spawn_args,))
2118
2119        # Use a subprocess to test os.posix_spawnp() with a modified PATH
2120        # environment variable: posix_spawnp() uses the current environment
2121        # to locate the program, not its environment argument.
2122        args = ('-c', code)
2123        assert_python_ok(*args, PATH=path)
2124
2125
2126@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS")
2127class TestPosixWeaklinking(unittest.TestCase):
2128    # These test cases verify that weak linking support on macOS works
2129    # as expected. These cases only test new behaviour introduced by weak linking,
2130    # regular behaviour is tested by the normal test cases.
2131    #
2132    # See the section on Weak Linking in Mac/README.txt for more information.
2133    def setUp(self):
2134        import sysconfig
2135        import platform
2136
2137        config_vars = sysconfig.get_config_vars()
2138        self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] }
2139        self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split("."))
2140
2141    def _verify_available(self, name):
2142        if name not in self.available:
2143            raise unittest.SkipTest(f"{name} not weak-linked")
2144
2145    def test_pwritev(self):
2146        self._verify_available("HAVE_PWRITEV")
2147        if self.mac_ver >= (10, 16):
2148            self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available")
2149            self.assertTrue(hasattr(os, "preadv"), "os.readv is not available")
2150
2151        else:
2152            self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available")
2153            self.assertFalse(hasattr(os, "preadv"), "os.readv is available")
2154
2155    def test_stat(self):
2156        self._verify_available("HAVE_FSTATAT")
2157        if self.mac_ver >= (10, 10):
2158            self.assertIn("HAVE_FSTATAT", posix._have_functions)
2159
2160        else:
2161            self.assertNotIn("HAVE_FSTATAT", posix._have_functions)
2162
2163            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2164                os.stat("file", dir_fd=0)
2165
2166    def test_ptsname_r(self):
2167        self._verify_available("HAVE_PTSNAME_R")
2168        if self.mac_ver >= (10, 13, 4):
2169            self.assertIn("HAVE_PTSNAME_R", posix._have_functions)
2170        else:
2171            self.assertNotIn("HAVE_PTSNAME_R", posix._have_functions)
2172
2173    def test_access(self):
2174        self._verify_available("HAVE_FACCESSAT")
2175        if self.mac_ver >= (10, 10):
2176            self.assertIn("HAVE_FACCESSAT", posix._have_functions)
2177
2178        else:
2179            self.assertNotIn("HAVE_FACCESSAT", posix._have_functions)
2180
2181            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2182                os.access("file", os.R_OK, dir_fd=0)
2183
2184            with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"):
2185                os.access("file", os.R_OK, follow_symlinks=False)
2186
2187            with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"):
2188                os.access("file", os.R_OK, effective_ids=True)
2189
2190    def test_chmod(self):
2191        self._verify_available("HAVE_FCHMODAT")
2192        if self.mac_ver >= (10, 10):
2193            self.assertIn("HAVE_FCHMODAT", posix._have_functions)
2194
2195        else:
2196            self.assertNotIn("HAVE_FCHMODAT", posix._have_functions)
2197            self.assertIn("HAVE_LCHMOD", posix._have_functions)
2198
2199            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2200                os.chmod("file", 0o644, dir_fd=0)
2201
2202    def test_chown(self):
2203        self._verify_available("HAVE_FCHOWNAT")
2204        if self.mac_ver >= (10, 10):
2205            self.assertIn("HAVE_FCHOWNAT", posix._have_functions)
2206
2207        else:
2208            self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions)
2209            self.assertIn("HAVE_LCHOWN", posix._have_functions)
2210
2211            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2212                os.chown("file", 0, 0, dir_fd=0)
2213
2214    def test_link(self):
2215        self._verify_available("HAVE_LINKAT")
2216        if self.mac_ver >= (10, 10):
2217            self.assertIn("HAVE_LINKAT", posix._have_functions)
2218
2219        else:
2220            self.assertNotIn("HAVE_LINKAT", posix._have_functions)
2221
2222            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2223                os.link("source", "target",  src_dir_fd=0)
2224
2225            with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"):
2226                os.link("source", "target",  dst_dir_fd=0)
2227
2228            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2229                os.link("source", "target",  src_dir_fd=0, dst_dir_fd=0)
2230
2231            # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag
2232            with os_helper.temp_dir() as base_path:
2233                link_path = os.path.join(base_path, "link")
2234                target_path = os.path.join(base_path, "target")
2235                source_path = os.path.join(base_path, "source")
2236
2237                with open(source_path, "w") as fp:
2238                    fp.write("data")
2239
2240                os.symlink("target", link_path)
2241
2242                # Calling os.link should fail in the link(2) call, and
2243                # should not reject *follow_symlinks* (to match the
2244                # behaviour you'd get when building on a platform without
2245                # linkat)
2246                with self.assertRaises(FileExistsError):
2247                    os.link(source_path, link_path, follow_symlinks=True)
2248
2249                with self.assertRaises(FileExistsError):
2250                    os.link(source_path, link_path, follow_symlinks=False)
2251
2252
2253    def test_listdir_scandir(self):
2254        self._verify_available("HAVE_FDOPENDIR")
2255        if self.mac_ver >= (10, 10):
2256            self.assertIn("HAVE_FDOPENDIR", posix._have_functions)
2257
2258        else:
2259            self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions)
2260
2261            with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"):
2262                os.listdir(0)
2263
2264            with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"):
2265                os.scandir(0)
2266
2267    def test_mkdir(self):
2268        self._verify_available("HAVE_MKDIRAT")
2269        if self.mac_ver >= (10, 10):
2270            self.assertIn("HAVE_MKDIRAT", posix._have_functions)
2271
2272        else:
2273            self.assertNotIn("HAVE_MKDIRAT", posix._have_functions)
2274
2275            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2276                os.mkdir("dir", dir_fd=0)
2277
2278    def test_mkfifo(self):
2279        self._verify_available("HAVE_MKFIFOAT")
2280        if self.mac_ver >= (13, 0):
2281            self.assertIn("HAVE_MKFIFOAT", posix._have_functions)
2282
2283        else:
2284            self.assertNotIn("HAVE_MKFIFOAT", posix._have_functions)
2285
2286            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2287                os.mkfifo("path", dir_fd=0)
2288
2289    def test_mknod(self):
2290        self._verify_available("HAVE_MKNODAT")
2291        if self.mac_ver >= (13, 0):
2292            self.assertIn("HAVE_MKNODAT", posix._have_functions)
2293
2294        else:
2295            self.assertNotIn("HAVE_MKNODAT", posix._have_functions)
2296
2297            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2298                os.mknod("path", dir_fd=0)
2299
2300    def test_rename_replace(self):
2301        self._verify_available("HAVE_RENAMEAT")
2302        if self.mac_ver >= (10, 10):
2303            self.assertIn("HAVE_RENAMEAT", posix._have_functions)
2304
2305        else:
2306            self.assertNotIn("HAVE_RENAMEAT", posix._have_functions)
2307
2308            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2309                os.rename("a", "b", src_dir_fd=0)
2310
2311            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2312                os.rename("a", "b", dst_dir_fd=0)
2313
2314            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2315                os.replace("a", "b", src_dir_fd=0)
2316
2317            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2318                os.replace("a", "b", dst_dir_fd=0)
2319
2320    def test_unlink_rmdir(self):
2321        self._verify_available("HAVE_UNLINKAT")
2322        if self.mac_ver >= (10, 10):
2323            self.assertIn("HAVE_UNLINKAT", posix._have_functions)
2324
2325        else:
2326            self.assertNotIn("HAVE_UNLINKAT", posix._have_functions)
2327
2328            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2329                os.unlink("path", dir_fd=0)
2330
2331            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2332                os.rmdir("path", dir_fd=0)
2333
2334    def test_open(self):
2335        self._verify_available("HAVE_OPENAT")
2336        if self.mac_ver >= (10, 10):
2337            self.assertIn("HAVE_OPENAT", posix._have_functions)
2338
2339        else:
2340            self.assertNotIn("HAVE_OPENAT", posix._have_functions)
2341
2342            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2343                os.open("path", os.O_RDONLY, dir_fd=0)
2344
2345    def test_readlink(self):
2346        self._verify_available("HAVE_READLINKAT")
2347        if self.mac_ver >= (10, 10):
2348            self.assertIn("HAVE_READLINKAT", posix._have_functions)
2349
2350        else:
2351            self.assertNotIn("HAVE_READLINKAT", posix._have_functions)
2352
2353            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2354                os.readlink("path",  dir_fd=0)
2355
2356    def test_symlink(self):
2357        self._verify_available("HAVE_SYMLINKAT")
2358        if self.mac_ver >= (10, 10):
2359            self.assertIn("HAVE_SYMLINKAT", posix._have_functions)
2360
2361        else:
2362            self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions)
2363
2364            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2365                os.symlink("a", "b",  dir_fd=0)
2366
2367    def test_utime(self):
2368        self._verify_available("HAVE_FUTIMENS")
2369        self._verify_available("HAVE_UTIMENSAT")
2370        if self.mac_ver >= (10, 13):
2371            self.assertIn("HAVE_FUTIMENS", posix._have_functions)
2372            self.assertIn("HAVE_UTIMENSAT", posix._have_functions)
2373
2374        else:
2375            self.assertNotIn("HAVE_FUTIMENS", posix._have_functions)
2376            self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions)
2377
2378            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2379                os.utime("path", dir_fd=0)
2380
2381
2382class NamespacesTests(unittest.TestCase):
2383    """Tests for os.unshare() and os.setns()."""
2384
2385    @unittest.skipUnless(hasattr(os, 'unshare'), 'needs os.unshare()')
2386    @unittest.skipUnless(hasattr(os, 'setns'), 'needs os.setns()')
2387    @unittest.skipUnless(os.path.exists('/proc/self/ns/uts'), 'need /proc/self/ns/uts')
2388    @support.requires_linux_version(3, 0, 0)
2389    def test_unshare_setns(self):
2390        code = """if 1:
2391            import errno
2392            import os
2393            import sys
2394            fd = os.open('/proc/self/ns/uts', os.O_RDONLY)
2395            try:
2396                original = os.readlink('/proc/self/ns/uts')
2397                try:
2398                    os.unshare(os.CLONE_NEWUTS)
2399                except OSError as e:
2400                    if e.errno == errno.ENOSPC:
2401                        # skip test if limit is exceeded
2402                        sys.exit()
2403                    raise
2404                new = os.readlink('/proc/self/ns/uts')
2405                if original == new:
2406                    raise Exception('os.unshare failed')
2407                os.setns(fd, os.CLONE_NEWUTS)
2408                restored = os.readlink('/proc/self/ns/uts')
2409                if original != restored:
2410                    raise Exception('os.setns failed')
2411            except PermissionError:
2412                # The calling process did not have the required privileges
2413                # for this operation
2414                pass
2415            except OSError as e:
2416                # Skip the test on these errors:
2417                # - ENOSYS: syscall not available
2418                # - EINVAL: kernel was not configured with the CONFIG_UTS_NS option
2419                # - ENOMEM: not enough memory
2420                if e.errno not in (errno.ENOSYS, errno.EINVAL, errno.ENOMEM):
2421                    raise
2422            finally:
2423                os.close(fd)
2424            """
2425
2426        assert_python_ok("-c", code)
2427
2428
2429def tearDownModule():
2430    support.reap_children()
2431
2432
2433if __name__ == '__main__':
2434    unittest.main()
2435