• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"Test posix functions"
2
3from test import support
4android_not_root = support.android_not_root
5
6# Skip these tests if there is no posix module.
7posix = support.import_module('posix')
8
9import errno
10import sys
11import time
12import os
13import platform
14import pwd
15import stat
16import tempfile
17import unittest
18import warnings
19
20_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
21                              support.TESTFN + '-dummy-symlink')
22
23class PosixTester(unittest.TestCase):
24
25    def setUp(self):
26        # create empty file
27        fp = open(support.TESTFN, 'w+')
28        fp.close()
29        self.teardown_files = [ support.TESTFN ]
30        self._warnings_manager = support.check_warnings()
31        self._warnings_manager.__enter__()
32        warnings.filterwarnings('ignore', '.* potential security risk .*',
33                                RuntimeWarning)
34
35    def tearDown(self):
36        for teardown_file in self.teardown_files:
37            support.unlink(teardown_file)
38        self._warnings_manager.__exit__(None, None, None)
39
40    def testNoArgFunctions(self):
41        # test posix functions which take no arguments and have
42        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
43        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
44                             "times", "getloadavg",
45                             "getegid", "geteuid", "getgid", "getgroups",
46                             "getpid", "getpgrp", "getppid", "getuid", "sync",
47                           ]
48
49        for name in NO_ARG_FUNCTIONS:
50            posix_func = getattr(posix, name, None)
51            if posix_func is not None:
52                posix_func()
53                self.assertRaises(TypeError, posix_func, 1)
54
55    @unittest.skipUnless(hasattr(posix, 'getresuid'),
56                         'test needs posix.getresuid()')
57    def test_getresuid(self):
58        user_ids = posix.getresuid()
59        self.assertEqual(len(user_ids), 3)
60        for val in user_ids:
61            self.assertGreaterEqual(val, 0)
62
63    @unittest.skipUnless(hasattr(posix, 'getresgid'),
64                         'test needs posix.getresgid()')
65    def test_getresgid(self):
66        group_ids = posix.getresgid()
67        self.assertEqual(len(group_ids), 3)
68        for val in group_ids:
69            self.assertGreaterEqual(val, 0)
70
71    @unittest.skipUnless(hasattr(posix, 'setresuid'),
72                         'test needs posix.setresuid()')
73    def test_setresuid(self):
74        current_user_ids = posix.getresuid()
75        self.assertIsNone(posix.setresuid(*current_user_ids))
76        # -1 means don't change that value.
77        self.assertIsNone(posix.setresuid(-1, -1, -1))
78
79    @unittest.skipUnless(hasattr(posix, 'setresuid'),
80                         'test needs posix.setresuid()')
81    def test_setresuid_exception(self):
82        # Don't do this test if someone is silly enough to run us as root.
83        current_user_ids = posix.getresuid()
84        if 0 not in current_user_ids:
85            new_user_ids = (current_user_ids[0]+1, -1, -1)
86            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
87
88    @unittest.skipUnless(hasattr(posix, 'setresgid'),
89                         'test needs posix.setresgid()')
90    def test_setresgid(self):
91        current_group_ids = posix.getresgid()
92        self.assertIsNone(posix.setresgid(*current_group_ids))
93        # -1 means don't change that value.
94        self.assertIsNone(posix.setresgid(-1, -1, -1))
95
96    @unittest.skipUnless(hasattr(posix, 'setresgid'),
97                         'test needs posix.setresgid()')
98    def test_setresgid_exception(self):
99        # Don't do this test if someone is silly enough to run us as root.
100        current_group_ids = posix.getresgid()
101        if 0 not in current_group_ids:
102            new_group_ids = (current_group_ids[0]+1, -1, -1)
103            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
104
105    @unittest.skipUnless(hasattr(posix, 'initgroups'),
106                         "test needs os.initgroups()")
107    def test_initgroups(self):
108        # It takes a string and an integer; check that it raises a TypeError
109        # for other argument lists.
110        self.assertRaises(TypeError, posix.initgroups)
111        self.assertRaises(TypeError, posix.initgroups, None)
112        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
113        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
114
115        # If a non-privileged user invokes it, it should fail with OSError
116        # EPERM.
117        if os.getuid() != 0:
118            try:
119                name = pwd.getpwuid(posix.getuid()).pw_name
120            except KeyError:
121                # the current UID may not have a pwd entry
122                raise unittest.SkipTest("need a pwd entry")
123            try:
124                posix.initgroups(name, 13)
125            except OSError as e:
126                self.assertEqual(e.errno, errno.EPERM)
127            else:
128                self.fail("Expected OSError to be raised by initgroups")
129
130    @unittest.skipUnless(hasattr(posix, 'statvfs'),
131                         'test needs posix.statvfs()')
132    def test_statvfs(self):
133        self.assertTrue(posix.statvfs(os.curdir))
134
135    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
136                         'test needs posix.fstatvfs()')
137    def test_fstatvfs(self):
138        fp = open(support.TESTFN)
139        try:
140            self.assertTrue(posix.fstatvfs(fp.fileno()))
141            self.assertTrue(posix.statvfs(fp.fileno()))
142        finally:
143            fp.close()
144
145    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
146                         'test needs posix.ftruncate()')
147    def test_ftruncate(self):
148        fp = open(support.TESTFN, 'w+')
149        try:
150            # we need to have some data to truncate
151            fp.write('test')
152            fp.flush()
153            posix.ftruncate(fp.fileno(), 0)
154        finally:
155            fp.close()
156
157    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
158    def test_truncate(self):
159        with open(support.TESTFN, 'w') as fp:
160            fp.write('test')
161            fp.flush()
162        posix.truncate(support.TESTFN, 0)
163
164    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
165    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
166    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
167    def test_fexecve(self):
168        fp = os.open(sys.executable, os.O_RDONLY)
169        try:
170            pid = os.fork()
171            if pid == 0:
172                os.chdir(os.path.split(sys.executable)[0])
173                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
174            else:
175                self.assertEqual(os.waitpid(pid, 0), (pid, 0))
176        finally:
177            os.close(fp)
178
179    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
180    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
181    def test_waitid(self):
182        pid = os.fork()
183        if pid == 0:
184            os.chdir(os.path.split(sys.executable)[0])
185            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
186        else:
187            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
188            self.assertEqual(pid, res.si_pid)
189
190    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
191    def test_lockf(self):
192        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
193        try:
194            os.write(fd, b'test')
195            os.lseek(fd, 0, os.SEEK_SET)
196            posix.lockf(fd, posix.F_LOCK, 4)
197            # section is locked
198            posix.lockf(fd, posix.F_ULOCK, 4)
199        finally:
200            os.close(fd)
201
202    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
203    def test_pread(self):
204        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
205        try:
206            os.write(fd, b'test')
207            os.lseek(fd, 0, os.SEEK_SET)
208            self.assertEqual(b'es', posix.pread(fd, 2, 1))
209            # the first pread() shouldn't disturb the file offset
210            self.assertEqual(b'te', posix.read(fd, 2))
211        finally:
212            os.close(fd)
213
214    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
215    def test_pwrite(self):
216        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
217        try:
218            os.write(fd, b'test')
219            os.lseek(fd, 0, os.SEEK_SET)
220            posix.pwrite(fd, b'xx', 1)
221            self.assertEqual(b'txxt', posix.read(fd, 4))
222        finally:
223            os.close(fd)
224
225    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
226        "test needs posix.posix_fallocate()")
227    def test_posix_fallocate(self):
228        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
229        try:
230            posix.posix_fallocate(fd, 0, 10)
231        except OSError as inst:
232            # issue10812, ZFS doesn't appear to support posix_fallocate,
233            # so skip Solaris-based since they are likely to have ZFS.
234            if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
235                raise
236        finally:
237            os.close(fd)
238
239    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
240        "test needs posix.posix_fadvise()")
241    def test_posix_fadvise(self):
242        fd = os.open(support.TESTFN, os.O_RDONLY)
243        try:
244            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
245        finally:
246            os.close(fd)
247
248    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
249    def test_utime_with_fd(self):
250        now = time.time()
251        fd = os.open(support.TESTFN, os.O_RDONLY)
252        try:
253            posix.utime(fd)
254            posix.utime(fd, None)
255            self.assertRaises(TypeError, posix.utime, fd, (None, None))
256            self.assertRaises(TypeError, posix.utime, fd, (now, None))
257            self.assertRaises(TypeError, posix.utime, fd, (None, now))
258            posix.utime(fd, (int(now), int(now)))
259            posix.utime(fd, (now, now))
260            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
261            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
262            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
263            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
264            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
265
266        finally:
267            os.close(fd)
268
269    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
270    def test_utime_nofollow_symlinks(self):
271        now = time.time()
272        posix.utime(support.TESTFN, None, follow_symlinks=False)
273        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)
274        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)
275        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)
276        posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)
277        posix.utime(support.TESTFN, (now, now), follow_symlinks=False)
278        posix.utime(support.TESTFN, follow_symlinks=False)
279
280    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
281    def test_writev(self):
282        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
283        try:
284            n = os.writev(fd, (b'test1', b'tt2', b't3'))
285            self.assertEqual(n, 10)
286
287            os.lseek(fd, 0, os.SEEK_SET)
288            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
289
290            # Issue #20113: empty list of buffers should not crash
291            try:
292                size = posix.writev(fd, [])
293            except OSError:
294                # writev(fd, []) raises OSError(22, "Invalid argument")
295                # on OpenIndiana
296                pass
297            else:
298                self.assertEqual(size, 0)
299        finally:
300            os.close(fd)
301
302    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
303    def test_readv(self):
304        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
305        try:
306            os.write(fd, b'test1tt2t3')
307            os.lseek(fd, 0, os.SEEK_SET)
308            buf = [bytearray(i) for i in [5, 3, 2]]
309            self.assertEqual(posix.readv(fd, buf), 10)
310            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
311
312            # Issue #20113: empty list of buffers should not crash
313            try:
314                size = posix.readv(fd, [])
315            except OSError:
316                # readv(fd, []) raises OSError(22, "Invalid argument")
317                # on OpenIndiana
318                pass
319            else:
320                self.assertEqual(size, 0)
321        finally:
322            os.close(fd)
323
324    @unittest.skipUnless(hasattr(posix, 'dup'),
325                         'test needs posix.dup()')
326    def test_dup(self):
327        fp = open(support.TESTFN)
328        try:
329            fd = posix.dup(fp.fileno())
330            self.assertIsInstance(fd, int)
331            os.close(fd)
332        finally:
333            fp.close()
334
335    @unittest.skipUnless(hasattr(posix, 'confstr'),
336                         'test needs posix.confstr()')
337    def test_confstr(self):
338        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
339        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
340
341    @unittest.skipUnless(hasattr(posix, 'dup2'),
342                         'test needs posix.dup2()')
343    def test_dup2(self):
344        fp1 = open(support.TESTFN)
345        fp2 = open(support.TESTFN)
346        try:
347            posix.dup2(fp1.fileno(), fp2.fileno())
348        finally:
349            fp1.close()
350            fp2.close()
351
352    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
353    @support.requires_linux_version(2, 6, 23)
354    def test_oscloexec(self):
355        fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
356        self.addCleanup(os.close, fd)
357        self.assertFalse(os.get_inheritable(fd))
358
359    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
360                         'test needs posix.O_EXLOCK')
361    def test_osexlock(self):
362        fd = os.open(support.TESTFN,
363                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
364        self.assertRaises(OSError, os.open, support.TESTFN,
365                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
366        os.close(fd)
367
368        if hasattr(posix, "O_SHLOCK"):
369            fd = os.open(support.TESTFN,
370                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
371            self.assertRaises(OSError, os.open, support.TESTFN,
372                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
373            os.close(fd)
374
375    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
376                         'test needs posix.O_SHLOCK')
377    def test_osshlock(self):
378        fd1 = os.open(support.TESTFN,
379                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
380        fd2 = os.open(support.TESTFN,
381                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
382        os.close(fd2)
383        os.close(fd1)
384
385        if hasattr(posix, "O_EXLOCK"):
386            fd = os.open(support.TESTFN,
387                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
388            self.assertRaises(OSError, os.open, support.TESTFN,
389                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
390            os.close(fd)
391
392    @unittest.skipUnless(hasattr(posix, 'fstat'),
393                         'test needs posix.fstat()')
394    def test_fstat(self):
395        fp = open(support.TESTFN)
396        try:
397            self.assertTrue(posix.fstat(fp.fileno()))
398            self.assertTrue(posix.stat(fp.fileno()))
399
400            self.assertRaisesRegex(TypeError,
401                    'should be string, bytes, os.PathLike or integer, not',
402                    posix.stat, float(fp.fileno()))
403        finally:
404            fp.close()
405
406    @unittest.skipUnless(hasattr(posix, 'stat'),
407                         'test needs posix.stat()')
408    def test_stat(self):
409        self.assertTrue(posix.stat(support.TESTFN))
410        self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
411
412        self.assertWarnsRegex(DeprecationWarning,
413                'should be string, bytes, os.PathLike or integer, not',
414                posix.stat, bytearray(os.fsencode(support.TESTFN)))
415        self.assertRaisesRegex(TypeError,
416                'should be string, bytes, os.PathLike or integer, not',
417                posix.stat, None)
418        self.assertRaisesRegex(TypeError,
419                'should be string, bytes, os.PathLike or integer, not',
420                posix.stat, list(support.TESTFN))
421        self.assertRaisesRegex(TypeError,
422                'should be string, bytes, os.PathLike or integer, not',
423                posix.stat, list(os.fsencode(support.TESTFN)))
424
425    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
426    @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
427    def test_mkfifo(self):
428        support.unlink(support.TESTFN)
429        posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
430        self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
431
432    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
433                         "don't have mknod()/S_IFIFO")
434    @unittest.skipIf(android_not_root, "mknod not allowed, non root user")
435    def test_mknod(self):
436        # Test using mknod() to create a FIFO (the only use specified
437        # by POSIX).
438        support.unlink(support.TESTFN)
439        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
440        try:
441            posix.mknod(support.TESTFN, mode, 0)
442        except OSError as e:
443            # Some old systems don't allow unprivileged users to use
444            # mknod(), or only support creating device nodes.
445            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
446        else:
447            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
448
449        # Keyword arguments are also supported
450        support.unlink(support.TESTFN)
451        try:
452            posix.mknod(path=support.TESTFN, mode=mode, device=0,
453                dir_fd=None)
454        except OSError as e:
455            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
456
457    @unittest.skipUnless(hasattr(posix, 'stat'), 'test needs posix.stat()')
458    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
459    def test_makedev(self):
460        st = posix.stat(support.TESTFN)
461        dev = st.st_dev
462        self.assertIsInstance(dev, int)
463        self.assertGreaterEqual(dev, 0)
464
465        major = posix.major(dev)
466        self.assertIsInstance(major, int)
467        self.assertGreaterEqual(major, 0)
468        self.assertEqual(posix.major(dev), major)
469        self.assertRaises(TypeError, posix.major, float(dev))
470        self.assertRaises(TypeError, posix.major)
471        self.assertRaises((ValueError, OverflowError), posix.major, -1)
472
473        minor = posix.minor(dev)
474        self.assertIsInstance(minor, int)
475        self.assertGreaterEqual(minor, 0)
476        self.assertEqual(posix.minor(dev), minor)
477        self.assertRaises(TypeError, posix.minor, float(dev))
478        self.assertRaises(TypeError, posix.minor)
479        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
480
481        self.assertEqual(posix.makedev(major, minor), dev)
482        self.assertRaises(TypeError, posix.makedev, float(major), minor)
483        self.assertRaises(TypeError, posix.makedev, major, float(minor))
484        self.assertRaises(TypeError, posix.makedev, major)
485        self.assertRaises(TypeError, posix.makedev)
486
487    def _test_all_chown_common(self, chown_func, first_param, stat_func):
488        """Common code for chown, fchown and lchown tests."""
489        def check_stat(uid, gid):
490            if stat_func is not None:
491                stat = stat_func(first_param)
492                self.assertEqual(stat.st_uid, uid)
493                self.assertEqual(stat.st_gid, gid)
494        uid = os.getuid()
495        gid = os.getgid()
496        # test a successful chown call
497        chown_func(first_param, uid, gid)
498        check_stat(uid, gid)
499        chown_func(first_param, -1, gid)
500        check_stat(uid, gid)
501        chown_func(first_param, uid, -1)
502        check_stat(uid, gid)
503
504        if uid == 0:
505            # Try an amusingly large uid/gid to make sure we handle
506            # large unsigned values.  (chown lets you use any
507            # uid/gid you like, even if they aren't defined.)
508            #
509            # This problem keeps coming up:
510            #   http://bugs.python.org/issue1747858
511            #   http://bugs.python.org/issue4591
512            #   http://bugs.python.org/issue15301
513            # Hopefully the fix in 4591 fixes it for good!
514            #
515            # This part of the test only runs when run as root.
516            # Only scary people run their tests as root.
517
518            big_value = 2**31
519            chown_func(first_param, big_value, big_value)
520            check_stat(big_value, big_value)
521            chown_func(first_param, -1, -1)
522            check_stat(big_value, big_value)
523            chown_func(first_param, uid, gid)
524            check_stat(uid, gid)
525        elif platform.system() in ('HP-UX', 'SunOS'):
526            # HP-UX and Solaris can allow a non-root user to chown() to root
527            # (issue #5113)
528            raise unittest.SkipTest("Skipping because of non-standard chown() "
529                                    "behavior")
530        else:
531            # non-root cannot chown to root, raises OSError
532            self.assertRaises(OSError, chown_func, first_param, 0, 0)
533            check_stat(uid, gid)
534            self.assertRaises(OSError, chown_func, first_param, 0, -1)
535            check_stat(uid, gid)
536            if 0 not in os.getgroups():
537                self.assertRaises(OSError, chown_func, first_param, -1, 0)
538                check_stat(uid, gid)
539        # test illegal types
540        for t in str, float:
541            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
542            check_stat(uid, gid)
543            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
544            check_stat(uid, gid)
545
546    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
547    def test_chown(self):
548        # raise an OSError if the file does not exist
549        os.unlink(support.TESTFN)
550        self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
551
552        # re-create the file
553        support.create_empty_file(support.TESTFN)
554        self._test_all_chown_common(posix.chown, support.TESTFN,
555                                    getattr(posix, 'stat', None))
556
557    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
558    def test_fchown(self):
559        os.unlink(support.TESTFN)
560
561        # re-create the file
562        test_file = open(support.TESTFN, 'w')
563        try:
564            fd = test_file.fileno()
565            self._test_all_chown_common(posix.fchown, fd,
566                                        getattr(posix, 'fstat', None))
567        finally:
568            test_file.close()
569
570    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
571    def test_lchown(self):
572        os.unlink(support.TESTFN)
573        # create a symlink
574        os.symlink(_DUMMY_SYMLINK, support.TESTFN)
575        self._test_all_chown_common(posix.lchown, support.TESTFN,
576                                    getattr(posix, 'lstat', None))
577
578    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
579    def test_chdir(self):
580        posix.chdir(os.curdir)
581        self.assertRaises(OSError, posix.chdir, support.TESTFN)
582
583    def test_listdir(self):
584        self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
585
586    def test_listdir_default(self):
587        # When listdir is called without argument,
588        # it's the same as listdir(os.curdir).
589        self.assertTrue(support.TESTFN in posix.listdir())
590
591    def test_listdir_bytes(self):
592        # When listdir is called with a bytes object,
593        # the returned strings are of type bytes.
594        self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.'))
595
596    @unittest.skipUnless(posix.listdir in os.supports_fd,
597                         "test needs fd support for posix.listdir()")
598    def test_listdir_fd(self):
599        f = posix.open(posix.getcwd(), posix.O_RDONLY)
600        self.addCleanup(posix.close, f)
601        self.assertEqual(
602            sorted(posix.listdir('.')),
603            sorted(posix.listdir(f))
604            )
605        # Check that the fd offset was reset (issue #13739)
606        self.assertEqual(
607            sorted(posix.listdir('.')),
608            sorted(posix.listdir(f))
609            )
610
611    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
612    def test_access(self):
613        self.assertTrue(posix.access(support.TESTFN, os.R_OK))
614
615    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
616    def test_umask(self):
617        old_mask = posix.umask(0)
618        self.assertIsInstance(old_mask, int)
619        posix.umask(old_mask)
620
621    @unittest.skipUnless(hasattr(posix, 'strerror'),
622                         'test needs posix.strerror()')
623    def test_strerror(self):
624        self.assertTrue(posix.strerror(0))
625
626    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
627    def test_pipe(self):
628        reader, writer = posix.pipe()
629        os.close(reader)
630        os.close(writer)
631
632    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
633    @support.requires_linux_version(2, 6, 27)
634    def test_pipe2(self):
635        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
636        self.assertRaises(TypeError, os.pipe2, 0, 0)
637
638        # try calling with flags = 0, like os.pipe()
639        r, w = os.pipe2(0)
640        os.close(r)
641        os.close(w)
642
643        # test flags
644        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
645        self.addCleanup(os.close, r)
646        self.addCleanup(os.close, w)
647        self.assertFalse(os.get_inheritable(r))
648        self.assertFalse(os.get_inheritable(w))
649        self.assertFalse(os.get_blocking(r))
650        self.assertFalse(os.get_blocking(w))
651        # try reading from an empty pipe: this should fail, not block
652        self.assertRaises(OSError, os.read, r, 1)
653        # try a write big enough to fill-up the pipe: this should either
654        # fail or perform a partial write, not block
655        try:
656            os.write(w, b'x' * support.PIPE_MAX_SIZE)
657        except OSError:
658            pass
659
660    @support.cpython_only
661    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
662    @support.requires_linux_version(2, 6, 27)
663    def test_pipe2_c_limits(self):
664        # Issue 15989
665        import _testcapi
666        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
667        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
668
669    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
670    def test_utime(self):
671        now = time.time()
672        posix.utime(support.TESTFN, None)
673        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
674        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
675        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
676        posix.utime(support.TESTFN, (int(now), int(now)))
677        posix.utime(support.TESTFN, (now, now))
678
679    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
680        st = os.stat(target_file)
681        self.assertTrue(hasattr(st, 'st_flags'))
682
683        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
684        flags = st.st_flags | stat.UF_IMMUTABLE
685        try:
686            chflags_func(target_file, flags, **kwargs)
687        except OSError as err:
688            if err.errno != errno.EOPNOTSUPP:
689                raise
690            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
691            self.skipTest(msg)
692
693        try:
694            new_st = os.stat(target_file)
695            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
696            try:
697                fd = open(target_file, 'w+')
698            except OSError as e:
699                self.assertEqual(e.errno, errno.EPERM)
700        finally:
701            posix.chflags(target_file, st.st_flags)
702
703    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
704    def test_chflags(self):
705        self._test_chflags_regular_file(posix.chflags, support.TESTFN)
706
707    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
708    def test_lchflags_regular_file(self):
709        self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
710        self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)
711
712    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
713    def test_lchflags_symlink(self):
714        testfn_st = os.stat(support.TESTFN)
715
716        self.assertTrue(hasattr(testfn_st, 'st_flags'))
717
718        os.symlink(support.TESTFN, _DUMMY_SYMLINK)
719        self.teardown_files.append(_DUMMY_SYMLINK)
720        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
721
722        def chflags_nofollow(path, flags):
723            return posix.chflags(path, flags, follow_symlinks=False)
724
725        for fn in (posix.lchflags, chflags_nofollow):
726            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
727            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
728            try:
729                fn(_DUMMY_SYMLINK, flags)
730            except OSError as err:
731                if err.errno != errno.EOPNOTSUPP:
732                    raise
733                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
734                self.skipTest(msg)
735            try:
736                new_testfn_st = os.stat(support.TESTFN)
737                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
738
739                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
740                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
741                                 new_dummy_symlink_st.st_flags)
742            finally:
743                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
744
745    def test_environ(self):
746        if os.name == "nt":
747            item_type = str
748        else:
749            item_type = bytes
750        for k, v in posix.environ.items():
751            self.assertEqual(type(k), item_type)
752            self.assertEqual(type(v), item_type)
753
754    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
755    def test_getcwd_long_pathnames(self):
756        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
757        curdir = os.getcwd()
758        base_path = os.path.abspath(support.TESTFN) + '.getcwd'
759
760        try:
761            os.mkdir(base_path)
762            os.chdir(base_path)
763        except:
764            #  Just returning nothing instead of the SkipTest exception, because
765            #  the test results in Error in that case.  Is that ok?
766            #  raise unittest.SkipTest("cannot create directory for testing")
767            return
768
769            def _create_and_do_getcwd(dirname, current_path_length = 0):
770                try:
771                    os.mkdir(dirname)
772                except:
773                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
774
775                os.chdir(dirname)
776                try:
777                    os.getcwd()
778                    if current_path_length < 1027:
779                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
780                finally:
781                    os.chdir('..')
782                    os.rmdir(dirname)
783
784            _create_and_do_getcwd(dirname)
785
786        finally:
787            os.chdir(curdir)
788            support.rmtree(base_path)
789
790    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
791    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
792    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
793    def test_getgrouplist(self):
794        user = pwd.getpwuid(os.getuid())[0]
795        group = pwd.getpwuid(os.getuid())[3]
796        self.assertIn(group, posix.getgrouplist(user, group))
797
798
799    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
800    def test_getgroups(self):
801        with os.popen('id -G 2>/dev/null') as idg:
802            groups = idg.read().strip()
803            ret = idg.close()
804
805        try:
806            idg_groups = set(int(g) for g in groups.split())
807        except ValueError:
808            idg_groups = set()
809        if ret is not None or not idg_groups:
810            raise unittest.SkipTest("need working 'id -G'")
811
812        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
813        if sys.platform == 'darwin':
814            import sysconfig
815            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
816            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
817                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
818
819        # 'id -G' and 'os.getgroups()' should return the same
820        # groups, ignoring order, duplicates, and the effective gid.
821        # #10822/#26944 - It is implementation defined whether
822        # posix.getgroups() includes the effective gid.
823        symdiff = idg_groups.symmetric_difference(posix.getgroups())
824        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
825
826    # tests for the posix *at functions follow
827
828    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
829    def test_access_dir_fd(self):
830        f = posix.open(posix.getcwd(), posix.O_RDONLY)
831        try:
832            self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
833        finally:
834            posix.close(f)
835
836    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
837    def test_chmod_dir_fd(self):
838        os.chmod(support.TESTFN, stat.S_IRUSR)
839
840        f = posix.open(posix.getcwd(), posix.O_RDONLY)
841        try:
842            posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
843
844            s = posix.stat(support.TESTFN)
845            self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
846        finally:
847            posix.close(f)
848
849    @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
850    def test_chown_dir_fd(self):
851        support.unlink(support.TESTFN)
852        support.create_empty_file(support.TESTFN)
853
854        f = posix.open(posix.getcwd(), posix.O_RDONLY)
855        try:
856            posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
857        finally:
858            posix.close(f)
859
860    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
861    def test_stat_dir_fd(self):
862        support.unlink(support.TESTFN)
863        with open(support.TESTFN, 'w') as outfile:
864            outfile.write("testline\n")
865
866        f = posix.open(posix.getcwd(), posix.O_RDONLY)
867        try:
868            s1 = posix.stat(support.TESTFN)
869            s2 = posix.stat(support.TESTFN, dir_fd=f)
870            self.assertEqual(s1, s2)
871            s2 = posix.stat(support.TESTFN, dir_fd=None)
872            self.assertEqual(s1, s2)
873            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
874                    posix.stat, support.TESTFN, dir_fd=posix.getcwd())
875            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
876                    posix.stat, support.TESTFN, dir_fd=float(f))
877            self.assertRaises(OverflowError,
878                    posix.stat, support.TESTFN, dir_fd=10**20)
879        finally:
880            posix.close(f)
881
882    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
883    def test_utime_dir_fd(self):
884        f = posix.open(posix.getcwd(), posix.O_RDONLY)
885        try:
886            now = time.time()
887            posix.utime(support.TESTFN, None, dir_fd=f)
888            posix.utime(support.TESTFN, dir_fd=f)
889            self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
890            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
891            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
892            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
893            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
894            posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
895            posix.utime(support.TESTFN, (now, now), dir_fd=f)
896            posix.utime(support.TESTFN,
897                    (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
898            posix.utime(support.TESTFN, dir_fd=f,
899                            times=(int(now), int((now - int(now)) * 1e9)))
900
901            # try dir_fd and follow_symlinks together
902            if os.utime in os.supports_follow_symlinks:
903                try:
904                    posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
905                except ValueError:
906                    # whoops!  using both together not supported on this platform.
907                    pass
908
909        finally:
910            posix.close(f)
911
912    @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
913    @unittest.skipIf(android_not_root, "hard link not allowed, non root user")
914    def test_link_dir_fd(self):
915        f = posix.open(posix.getcwd(), posix.O_RDONLY)
916        try:
917            posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
918            # should have same inodes
919            self.assertEqual(posix.stat(support.TESTFN)[1],
920                posix.stat(support.TESTFN + 'link')[1])
921        finally:
922            posix.close(f)
923            support.unlink(support.TESTFN + 'link')
924
925    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
926    def test_mkdir_dir_fd(self):
927        f = posix.open(posix.getcwd(), posix.O_RDONLY)
928        try:
929            posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
930            posix.stat(support.TESTFN + 'dir') # should not raise exception
931        finally:
932            posix.close(f)
933            support.rmtree(support.TESTFN + 'dir')
934
935    @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
936                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
937    @unittest.skipIf(android_not_root, "mknod not allowed, non root user")
938    def test_mknod_dir_fd(self):
939        # Test using mknodat() to create a FIFO (the only use specified
940        # by POSIX).
941        support.unlink(support.TESTFN)
942        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
943        f = posix.open(posix.getcwd(), posix.O_RDONLY)
944        try:
945            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
946        except OSError as e:
947            # Some old systems don't allow unprivileged users to use
948            # mknod(), or only support creating device nodes.
949            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
950        else:
951            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
952        finally:
953            posix.close(f)
954
955    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
956    def test_open_dir_fd(self):
957        support.unlink(support.TESTFN)
958        with open(support.TESTFN, 'w') as outfile:
959            outfile.write("testline\n")
960        a = posix.open(posix.getcwd(), posix.O_RDONLY)
961        b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
962        try:
963            res = posix.read(b, 9).decode(encoding="utf-8")
964            self.assertEqual("testline\n", res)
965        finally:
966            posix.close(a)
967            posix.close(b)
968
969    @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
970    def test_readlink_dir_fd(self):
971        os.symlink(support.TESTFN, support.TESTFN + 'link')
972        f = posix.open(posix.getcwd(), posix.O_RDONLY)
973        try:
974            self.assertEqual(posix.readlink(support.TESTFN + 'link'),
975                posix.readlink(support.TESTFN + 'link', dir_fd=f))
976        finally:
977            support.unlink(support.TESTFN + 'link')
978            posix.close(f)
979
980    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
981    def test_rename_dir_fd(self):
982        support.unlink(support.TESTFN)
983        support.create_empty_file(support.TESTFN + 'ren')
984        f = posix.open(posix.getcwd(), posix.O_RDONLY)
985        try:
986            posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
987        except:
988            posix.rename(support.TESTFN + 'ren', support.TESTFN)
989            raise
990        else:
991            posix.stat(support.TESTFN) # should not raise exception
992        finally:
993            posix.close(f)
994
995    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
996    def test_symlink_dir_fd(self):
997        f = posix.open(posix.getcwd(), posix.O_RDONLY)
998        try:
999            posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
1000            self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
1001        finally:
1002            posix.close(f)
1003            support.unlink(support.TESTFN + 'link')
1004
1005    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1006    def test_unlink_dir_fd(self):
1007        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1008        support.create_empty_file(support.TESTFN + 'del')
1009        posix.stat(support.TESTFN + 'del') # should not raise exception
1010        try:
1011            posix.unlink(support.TESTFN + 'del', dir_fd=f)
1012        except:
1013            support.unlink(support.TESTFN + 'del')
1014            raise
1015        else:
1016            self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
1017        finally:
1018            posix.close(f)
1019
1020    @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1021    @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
1022    def test_mkfifo_dir_fd(self):
1023        support.unlink(support.TESTFN)
1024        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1025        try:
1026            posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1027            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
1028        finally:
1029            posix.close(f)
1030
1031    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1032                                           "don't have scheduling support")
1033    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1034                                                  "don't have sched affinity support")
1035
1036    @requires_sched_h
1037    def test_sched_yield(self):
1038        # This has no error conditions (at least on Linux).
1039        posix.sched_yield()
1040
1041    @requires_sched_h
1042    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1043                         "requires sched_get_priority_max()")
1044    def test_sched_priority(self):
1045        # Round-robin usually has interesting priorities.
1046        pol = posix.SCHED_RR
1047        lo = posix.sched_get_priority_min(pol)
1048        hi = posix.sched_get_priority_max(pol)
1049        self.assertIsInstance(lo, int)
1050        self.assertIsInstance(hi, int)
1051        self.assertGreaterEqual(hi, lo)
1052        # OSX evidently just returns 15 without checking the argument.
1053        if sys.platform != "darwin":
1054            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1055            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1056
1057    @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
1058    def test_get_and_set_scheduler_and_param(self):
1059        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1060                               if name.startswith("SCHED_")]
1061        mine = posix.sched_getscheduler(0)
1062        self.assertIn(mine, possible_schedulers)
1063        try:
1064            parent = posix.sched_getscheduler(os.getppid())
1065        except OSError as e:
1066            if e.errno != errno.EPERM:
1067                raise
1068        else:
1069            self.assertIn(parent, possible_schedulers)
1070        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1071        self.assertRaises(OSError, posix.sched_getparam, -1)
1072        param = posix.sched_getparam(0)
1073        self.assertIsInstance(param.sched_priority, int)
1074
1075        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1076        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1077        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1078        if not sys.platform.startswith(('freebsd', 'netbsd')):
1079            try:
1080                posix.sched_setscheduler(0, mine, param)
1081                posix.sched_setparam(0, param)
1082            except OSError as e:
1083                if e.errno != errno.EPERM:
1084                    raise
1085            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1086
1087        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1088        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1089        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1090        param = posix.sched_param(None)
1091        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1092        large = 214748364700
1093        param = posix.sched_param(large)
1094        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1095        param = posix.sched_param(sched_priority=-large)
1096        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1097
1098    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1099    def test_sched_rr_get_interval(self):
1100        try:
1101            interval = posix.sched_rr_get_interval(0)
1102        except OSError as e:
1103            # This likely means that sched_rr_get_interval is only valid for
1104            # processes with the SCHED_RR scheduler in effect.
1105            if e.errno != errno.EINVAL:
1106                raise
1107            self.skipTest("only works on SCHED_RR processes")
1108        self.assertIsInstance(interval, float)
1109        # Reasonable constraints, I think.
1110        self.assertGreaterEqual(interval, 0.)
1111        self.assertLess(interval, 1.)
1112
1113    @requires_sched_affinity
1114    def test_sched_getaffinity(self):
1115        mask = posix.sched_getaffinity(0)
1116        self.assertIsInstance(mask, set)
1117        self.assertGreaterEqual(len(mask), 1)
1118        self.assertRaises(OSError, posix.sched_getaffinity, -1)
1119        for cpu in mask:
1120            self.assertIsInstance(cpu, int)
1121            self.assertGreaterEqual(cpu, 0)
1122            self.assertLess(cpu, 1 << 32)
1123
1124    @requires_sched_affinity
1125    def test_sched_setaffinity(self):
1126        mask = posix.sched_getaffinity(0)
1127        if len(mask) > 1:
1128            # Empty masks are forbidden
1129            mask.pop()
1130        posix.sched_setaffinity(0, mask)
1131        self.assertEqual(posix.sched_getaffinity(0), mask)
1132        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1133        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1134        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1135        self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1136
1137    def test_rtld_constants(self):
1138        # check presence of major RTLD_* constants
1139        posix.RTLD_LAZY
1140        posix.RTLD_NOW
1141        posix.RTLD_GLOBAL
1142        posix.RTLD_LOCAL
1143
1144    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1145                         "test needs an OS that reports file holes")
1146    def test_fs_holes(self):
1147        # Even if the filesystem doesn't report holes,
1148        # if the OS supports it the SEEK_* constants
1149        # will be defined and will have a consistent
1150        # behaviour:
1151        # os.SEEK_DATA = current position
1152        # os.SEEK_HOLE = end of file position
1153        with open(support.TESTFN, 'r+b') as fp:
1154            fp.write(b"hello")
1155            fp.flush()
1156            size = fp.tell()
1157            fno = fp.fileno()
1158            try :
1159                for i in range(size):
1160                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1161                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1162                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1163                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1164            except OSError :
1165                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1166                # but it is not true.
1167                # For instance:
1168                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1169                raise unittest.SkipTest("OSError raised!")
1170
1171    def test_path_error2(self):
1172        """
1173        Test functions that call path_error2(), providing two filenames in their exceptions.
1174        """
1175        for name in ("rename", "replace", "link"):
1176            function = getattr(os, name, None)
1177            if function is None:
1178                continue
1179
1180            for dst in ("noodly2", support.TESTFN):
1181                try:
1182                    function('doesnotexistfilename', dst)
1183                except OSError as e:
1184                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1185                    break
1186            else:
1187                self.fail("No valid path_error2() test for os." + name)
1188
1189    def test_path_with_null_character(self):
1190        fn = support.TESTFN
1191        fn_with_NUL = fn + '\0'
1192        self.addCleanup(support.unlink, fn)
1193        support.unlink(fn)
1194        fd = None
1195        try:
1196            with self.assertRaises(ValueError):
1197                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1198        finally:
1199            if fd is not None:
1200                os.close(fd)
1201        self.assertFalse(os.path.exists(fn))
1202        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1203        self.assertFalse(os.path.exists(fn))
1204        open(fn, 'wb').close()
1205        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1206
1207    def test_path_with_null_byte(self):
1208        fn = os.fsencode(support.TESTFN)
1209        fn_with_NUL = fn + b'\0'
1210        self.addCleanup(support.unlink, fn)
1211        support.unlink(fn)
1212        fd = None
1213        try:
1214            with self.assertRaises(ValueError):
1215                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1216        finally:
1217            if fd is not None:
1218                os.close(fd)
1219        self.assertFalse(os.path.exists(fn))
1220        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1221        self.assertFalse(os.path.exists(fn))
1222        open(fn, 'wb').close()
1223        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1224
1225class PosixGroupsTester(unittest.TestCase):
1226
1227    def setUp(self):
1228        if posix.getuid() != 0:
1229            raise unittest.SkipTest("not enough privileges")
1230        if not hasattr(posix, 'getgroups'):
1231            raise unittest.SkipTest("need posix.getgroups")
1232        if sys.platform == 'darwin':
1233            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1234        self.saved_groups = posix.getgroups()
1235
1236    def tearDown(self):
1237        if hasattr(posix, 'setgroups'):
1238            posix.setgroups(self.saved_groups)
1239        elif hasattr(posix, 'initgroups'):
1240            name = pwd.getpwuid(posix.getuid()).pw_name
1241            posix.initgroups(name, self.saved_groups[0])
1242
1243    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1244                         "test needs posix.initgroups()")
1245    def test_initgroups(self):
1246        # find missing group
1247
1248        g = max(self.saved_groups or [0]) + 1
1249        name = pwd.getpwuid(posix.getuid()).pw_name
1250        posix.initgroups(name, g)
1251        self.assertIn(g, posix.getgroups())
1252
1253    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1254                         "test needs posix.setgroups()")
1255    def test_setgroups(self):
1256        for groups in [[0], list(range(16))]:
1257            posix.setgroups(groups)
1258            self.assertListEqual(groups, posix.getgroups())
1259
1260def test_main():
1261    try:
1262        support.run_unittest(PosixTester, PosixGroupsTester)
1263    finally:
1264        support.reap_children()
1265
1266if __name__ == '__main__':
1267    test_main()
1268