• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import asynchat
6import asyncore
7import codecs
8import contextlib
9import decimal
10import errno
11import fnmatch
12import fractions
13import itertools
14import locale
15import mmap
16import os
17import pickle
18import shutil
19import signal
20import socket
21import stat
22import subprocess
23import sys
24import sysconfig
25import tempfile
26import threading
27import time
28import types
29import unittest
30import uuid
31import warnings
32from test import support
33from test.support import socket_helper
34from platform import win32_is_iot
35
36try:
37    import resource
38except ImportError:
39    resource = None
40try:
41    import fcntl
42except ImportError:
43    fcntl = None
44try:
45    import _winapi
46except ImportError:
47    _winapi = None
48try:
49    import pwd
50    all_users = [u.pw_uid for u in pwd.getpwall()]
51except (ImportError, AttributeError):
52    all_users = []
53try:
54    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
55except ImportError:
56    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
57
58from test.support.script_helper import assert_python_ok
59from test.support import unix_shell, FakePath
60
61
62root_in_posix = False
63if hasattr(os, 'geteuid'):
64    root_in_posix = (os.geteuid() == 0)
65
66# Detect whether we're on a Linux system that uses the (now outdated
67# and unmaintained) linuxthreads threading library.  There's an issue
68# when combining linuxthreads with a failed execv call: see
69# http://bugs.python.org/issue4970.
70if hasattr(sys, 'thread_info') and sys.thread_info.version:
71    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
72else:
73    USING_LINUXTHREADS = False
74
75# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
76HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
77
78
79def requires_os_func(name):
80    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
81
82
83def create_file(filename, content=b'content'):
84    with open(filename, "xb", 0) as fp:
85        fp.write(content)
86
87
88class MiscTests(unittest.TestCase):
89    def test_getcwd(self):
90        cwd = os.getcwd()
91        self.assertIsInstance(cwd, str)
92
93    def test_getcwd_long_path(self):
94        # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
95        # Windows, MAX_PATH is defined as 260 characters, but Windows supports
96        # longer path if longer paths support is enabled. Internally, the os
97        # module uses MAXPATHLEN which is at least 1024.
98        #
99        # Use a directory name of 200 characters to fit into Windows MAX_PATH
100        # limit.
101        #
102        # On Windows, the test can stop when trying to create a path longer
103        # than MAX_PATH if long paths support is disabled:
104        # see RtlAreLongPathsEnabled().
105        min_len = 2000   # characters
106        dirlen = 200     # characters
107        dirname = 'python_test_dir_'
108        dirname = dirname + ('a' * (dirlen - len(dirname)))
109
110        with tempfile.TemporaryDirectory() as tmpdir:
111            with support.change_cwd(tmpdir) as path:
112                expected = path
113
114                while True:
115                    cwd = os.getcwd()
116                    self.assertEqual(cwd, expected)
117
118                    need = min_len - (len(cwd) + len(os.path.sep))
119                    if need <= 0:
120                        break
121                    if len(dirname) > need and need > 0:
122                        dirname = dirname[:need]
123
124                    path = os.path.join(path, dirname)
125                    try:
126                        os.mkdir(path)
127                        # On Windows, chdir() can fail
128                        # even if mkdir() succeeded
129                        os.chdir(path)
130                    except FileNotFoundError:
131                        # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
132                        # ERROR_FILENAME_EXCED_RANGE (206) errors
133                        # ("The filename or extension is too long")
134                        break
135                    except OSError as exc:
136                        if exc.errno == errno.ENAMETOOLONG:
137                            break
138                        else:
139                            raise
140
141                    expected = path
142
143                if support.verbose:
144                    print(f"Tested current directory length: {len(cwd)}")
145
146    def test_getcwdb(self):
147        cwd = os.getcwdb()
148        self.assertIsInstance(cwd, bytes)
149        self.assertEqual(os.fsdecode(cwd), os.getcwd())
150
151
152# Tests creating TESTFN
153class FileTests(unittest.TestCase):
154    def setUp(self):
155        if os.path.lexists(support.TESTFN):
156            os.unlink(support.TESTFN)
157    tearDown = setUp
158
159    def test_access(self):
160        f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
161        os.close(f)
162        self.assertTrue(os.access(support.TESTFN, os.W_OK))
163
164    def test_closerange(self):
165        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
166        # We must allocate two consecutive file descriptors, otherwise
167        # it will mess up other file descriptors (perhaps even the three
168        # standard ones).
169        second = os.dup(first)
170        try:
171            retries = 0
172            while second != first + 1:
173                os.close(first)
174                retries += 1
175                if retries > 10:
176                    # XXX test skipped
177                    self.skipTest("couldn't allocate two consecutive fds")
178                first, second = second, os.dup(second)
179        finally:
180            os.close(second)
181        # close a fd that is open, and one that isn't
182        os.closerange(first, first + 2)
183        self.assertRaises(OSError, os.write, first, b"a")
184
185    @support.cpython_only
186    def test_rename(self):
187        path = support.TESTFN
188        old = sys.getrefcount(path)
189        self.assertRaises(TypeError, os.rename, path, 0)
190        new = sys.getrefcount(path)
191        self.assertEqual(old, new)
192
193    def test_read(self):
194        with open(support.TESTFN, "w+b") as fobj:
195            fobj.write(b"spam")
196            fobj.flush()
197            fd = fobj.fileno()
198            os.lseek(fd, 0, 0)
199            s = os.read(fd, 4)
200            self.assertEqual(type(s), bytes)
201            self.assertEqual(s, b"spam")
202
203    @support.cpython_only
204    # Skip the test on 32-bit platforms: the number of bytes must fit in a
205    # Py_ssize_t type
206    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
207                         "needs INT_MAX < PY_SSIZE_T_MAX")
208    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
209    def test_large_read(self, size):
210        self.addCleanup(support.unlink, support.TESTFN)
211        create_file(support.TESTFN, b'test')
212
213        # Issue #21932: Make sure that os.read() does not raise an
214        # OverflowError for size larger than INT_MAX
215        with open(support.TESTFN, "rb") as fp:
216            data = os.read(fp.fileno(), size)
217
218        # The test does not try to read more than 2 GiB at once because the
219        # operating system is free to return less bytes than requested.
220        self.assertEqual(data, b'test')
221
222    def test_write(self):
223        # os.write() accepts bytes- and buffer-like objects but not strings
224        fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
225        self.assertRaises(TypeError, os.write, fd, "beans")
226        os.write(fd, b"bacon\n")
227        os.write(fd, bytearray(b"eggs\n"))
228        os.write(fd, memoryview(b"spam\n"))
229        os.close(fd)
230        with open(support.TESTFN, "rb") as fobj:
231            self.assertEqual(fobj.read().splitlines(),
232                [b"bacon", b"eggs", b"spam"])
233
234    def write_windows_console(self, *args):
235        retcode = subprocess.call(args,
236            # use a new console to not flood the test output
237            creationflags=subprocess.CREATE_NEW_CONSOLE,
238            # use a shell to hide the console window (SW_HIDE)
239            shell=True)
240        self.assertEqual(retcode, 0)
241
242    @unittest.skipUnless(sys.platform == 'win32',
243                         'test specific to the Windows console')
244    def test_write_windows_console(self):
245        # Issue #11395: the Windows console returns an error (12: not enough
246        # space error) on writing into stdout if stdout mode is binary and the
247        # length is greater than 66,000 bytes (or less, depending on heap
248        # usage).
249        code = "print('x' * 100000)"
250        self.write_windows_console(sys.executable, "-c", code)
251        self.write_windows_console(sys.executable, "-u", "-c", code)
252
253    def fdopen_helper(self, *args):
254        fd = os.open(support.TESTFN, os.O_RDONLY)
255        f = os.fdopen(fd, *args)
256        f.close()
257
258    def test_fdopen(self):
259        fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
260        os.close(fd)
261
262        self.fdopen_helper()
263        self.fdopen_helper('r')
264        self.fdopen_helper('r', 100)
265
266    def test_replace(self):
267        TESTFN2 = support.TESTFN + ".2"
268        self.addCleanup(support.unlink, support.TESTFN)
269        self.addCleanup(support.unlink, TESTFN2)
270
271        create_file(support.TESTFN, b"1")
272        create_file(TESTFN2, b"2")
273
274        os.replace(support.TESTFN, TESTFN2)
275        self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
276        with open(TESTFN2, 'r') as f:
277            self.assertEqual(f.read(), "1")
278
279    def test_open_keywords(self):
280        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
281            dir_fd=None)
282        os.close(f)
283
284    def test_symlink_keywords(self):
285        symlink = support.get_attribute(os, "symlink")
286        try:
287            symlink(src='target', dst=support.TESTFN,
288                target_is_directory=False, dir_fd=None)
289        except (NotImplementedError, OSError):
290            pass  # No OS support or unprivileged user
291
292    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
293    def test_copy_file_range_invalid_values(self):
294        with self.assertRaises(ValueError):
295            os.copy_file_range(0, 1, -10)
296
297    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
298    def test_copy_file_range(self):
299        TESTFN2 = support.TESTFN + ".3"
300        data = b'0123456789'
301
302        create_file(support.TESTFN, data)
303        self.addCleanup(support.unlink, support.TESTFN)
304
305        in_file = open(support.TESTFN, 'rb')
306        self.addCleanup(in_file.close)
307        in_fd = in_file.fileno()
308
309        out_file = open(TESTFN2, 'w+b')
310        self.addCleanup(support.unlink, TESTFN2)
311        self.addCleanup(out_file.close)
312        out_fd = out_file.fileno()
313
314        try:
315            i = os.copy_file_range(in_fd, out_fd, 5)
316        except OSError as e:
317            # Handle the case in which Python was compiled
318            # in a system with the syscall but without support
319            # in the kernel.
320            if e.errno != errno.ENOSYS:
321                raise
322            self.skipTest(e)
323        else:
324            # The number of copied bytes can be less than
325            # the number of bytes originally requested.
326            self.assertIn(i, range(0, 6));
327
328            with open(TESTFN2, 'rb') as in_file:
329                self.assertEqual(in_file.read(), data[:i])
330
331    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
332    def test_copy_file_range_offset(self):
333        TESTFN4 = support.TESTFN + ".4"
334        data = b'0123456789'
335        bytes_to_copy = 6
336        in_skip = 3
337        out_seek = 5
338
339        create_file(support.TESTFN, data)
340        self.addCleanup(support.unlink, support.TESTFN)
341
342        in_file = open(support.TESTFN, 'rb')
343        self.addCleanup(in_file.close)
344        in_fd = in_file.fileno()
345
346        out_file = open(TESTFN4, 'w+b')
347        self.addCleanup(support.unlink, TESTFN4)
348        self.addCleanup(out_file.close)
349        out_fd = out_file.fileno()
350
351        try:
352            i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
353                                   offset_src=in_skip,
354                                   offset_dst=out_seek)
355        except OSError as e:
356            # Handle the case in which Python was compiled
357            # in a system with the syscall but without support
358            # in the kernel.
359            if e.errno != errno.ENOSYS:
360                raise
361            self.skipTest(e)
362        else:
363            # The number of copied bytes can be less than
364            # the number of bytes originally requested.
365            self.assertIn(i, range(0, bytes_to_copy+1));
366
367            with open(TESTFN4, 'rb') as in_file:
368                read = in_file.read()
369            # seeked bytes (5) are zero'ed
370            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
371            # 012 are skipped (in_skip)
372            # 345678 are copied in the file (in_skip + bytes_to_copy)
373            self.assertEqual(read[out_seek:],
374                             data[in_skip:in_skip+i])
375
376# Test attributes on return values from os.*stat* family.
377class StatAttributeTests(unittest.TestCase):
378    def setUp(self):
379        self.fname = support.TESTFN
380        self.addCleanup(support.unlink, self.fname)
381        create_file(self.fname, b"ABC")
382
383    def check_stat_attributes(self, fname):
384        result = os.stat(fname)
385
386        # Make sure direct access works
387        self.assertEqual(result[stat.ST_SIZE], 3)
388        self.assertEqual(result.st_size, 3)
389
390        # Make sure all the attributes are there
391        members = dir(result)
392        for name in dir(stat):
393            if name[:3] == 'ST_':
394                attr = name.lower()
395                if name.endswith("TIME"):
396                    def trunc(x): return int(x)
397                else:
398                    def trunc(x): return x
399                self.assertEqual(trunc(getattr(result, attr)),
400                                  result[getattr(stat, name)])
401                self.assertIn(attr, members)
402
403        # Make sure that the st_?time and st_?time_ns fields roughly agree
404        # (they should always agree up to around tens-of-microseconds)
405        for name in 'st_atime st_mtime st_ctime'.split():
406            floaty = int(getattr(result, name) * 100000)
407            nanosecondy = getattr(result, name + "_ns") // 10000
408            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
409
410        try:
411            result[200]
412            self.fail("No exception raised")
413        except IndexError:
414            pass
415
416        # Make sure that assignment fails
417        try:
418            result.st_mode = 1
419            self.fail("No exception raised")
420        except AttributeError:
421            pass
422
423        try:
424            result.st_rdev = 1
425            self.fail("No exception raised")
426        except (AttributeError, TypeError):
427            pass
428
429        try:
430            result.parrot = 1
431            self.fail("No exception raised")
432        except AttributeError:
433            pass
434
435        # Use the stat_result constructor with a too-short tuple.
436        try:
437            result2 = os.stat_result((10,))
438            self.fail("No exception raised")
439        except TypeError:
440            pass
441
442        # Use the constructor with a too-long tuple.
443        try:
444            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
445        except TypeError:
446            pass
447
448    def test_stat_attributes(self):
449        self.check_stat_attributes(self.fname)
450
451    def test_stat_attributes_bytes(self):
452        try:
453            fname = self.fname.encode(sys.getfilesystemencoding())
454        except UnicodeEncodeError:
455            self.skipTest("cannot encode %a for the filesystem" % self.fname)
456        self.check_stat_attributes(fname)
457
458    def test_stat_result_pickle(self):
459        result = os.stat(self.fname)
460        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
461            p = pickle.dumps(result, proto)
462            self.assertIn(b'stat_result', p)
463            if proto < 4:
464                self.assertIn(b'cos\nstat_result\n', p)
465            unpickled = pickle.loads(p)
466            self.assertEqual(result, unpickled)
467
468    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
469    def test_statvfs_attributes(self):
470        result = os.statvfs(self.fname)
471
472        # Make sure direct access works
473        self.assertEqual(result.f_bfree, result[3])
474
475        # Make sure all the attributes are there.
476        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
477                    'ffree', 'favail', 'flag', 'namemax')
478        for value, member in enumerate(members):
479            self.assertEqual(getattr(result, 'f_' + member), result[value])
480
481        self.assertTrue(isinstance(result.f_fsid, int))
482
483        # Test that the size of the tuple doesn't change
484        self.assertEqual(len(result), 10)
485
486        # Make sure that assignment really fails
487        try:
488            result.f_bfree = 1
489            self.fail("No exception raised")
490        except AttributeError:
491            pass
492
493        try:
494            result.parrot = 1
495            self.fail("No exception raised")
496        except AttributeError:
497            pass
498
499        # Use the constructor with a too-short tuple.
500        try:
501            result2 = os.statvfs_result((10,))
502            self.fail("No exception raised")
503        except TypeError:
504            pass
505
506        # Use the constructor with a too-long tuple.
507        try:
508            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
509        except TypeError:
510            pass
511
512    @unittest.skipUnless(hasattr(os, 'statvfs'),
513                         "need os.statvfs()")
514    def test_statvfs_result_pickle(self):
515        result = os.statvfs(self.fname)
516
517        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
518            p = pickle.dumps(result, proto)
519            self.assertIn(b'statvfs_result', p)
520            if proto < 4:
521                self.assertIn(b'cos\nstatvfs_result\n', p)
522            unpickled = pickle.loads(p)
523            self.assertEqual(result, unpickled)
524
525    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
526    def test_1686475(self):
527        # Verify that an open file can be stat'ed
528        try:
529            os.stat(r"c:\pagefile.sys")
530        except FileNotFoundError:
531            self.skipTest(r'c:\pagefile.sys does not exist')
532        except OSError as e:
533            self.fail("Could not stat pagefile.sys")
534
535    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
536    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
537    def test_15261(self):
538        # Verify that stat'ing a closed fd does not cause crash
539        r, w = os.pipe()
540        try:
541            os.stat(r)          # should not raise error
542        finally:
543            os.close(r)
544            os.close(w)
545        with self.assertRaises(OSError) as ctx:
546            os.stat(r)
547        self.assertEqual(ctx.exception.errno, errno.EBADF)
548
549    def check_file_attributes(self, result):
550        self.assertTrue(hasattr(result, 'st_file_attributes'))
551        self.assertTrue(isinstance(result.st_file_attributes, int))
552        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
553
554    @unittest.skipUnless(sys.platform == "win32",
555                         "st_file_attributes is Win32 specific")
556    def test_file_attributes(self):
557        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
558        result = os.stat(self.fname)
559        self.check_file_attributes(result)
560        self.assertEqual(
561            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
562            0)
563
564        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
565        dirname = support.TESTFN + "dir"
566        os.mkdir(dirname)
567        self.addCleanup(os.rmdir, dirname)
568
569        result = os.stat(dirname)
570        self.check_file_attributes(result)
571        self.assertEqual(
572            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
573            stat.FILE_ATTRIBUTE_DIRECTORY)
574
575    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
576    def test_access_denied(self):
577        # Default to FindFirstFile WIN32_FIND_DATA when access is
578        # denied. See issue 28075.
579        # os.environ['TEMP'] should be located on a volume that
580        # supports file ACLs.
581        fname = os.path.join(os.environ['TEMP'], self.fname)
582        self.addCleanup(support.unlink, fname)
583        create_file(fname, b'ABC')
584        # Deny the right to [S]YNCHRONIZE on the file to
585        # force CreateFile to fail with ERROR_ACCESS_DENIED.
586        DETACHED_PROCESS = 8
587        subprocess.check_call(
588            # bpo-30584: Use security identifier *S-1-5-32-545 instead
589            # of localized "Users" to not depend on the locale.
590            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
591            creationflags=DETACHED_PROCESS
592        )
593        result = os.stat(fname)
594        self.assertNotEqual(result.st_size, 0)
595
596    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
597    def test_stat_block_device(self):
598        # bpo-38030: os.stat fails for block devices
599        # Test a filename like "//./C:"
600        fname = "//./" + os.path.splitdrive(os.getcwd())[0]
601        result = os.stat(fname)
602        self.assertEqual(result.st_mode, stat.S_IFBLK)
603
604
605class UtimeTests(unittest.TestCase):
606    def setUp(self):
607        self.dirname = support.TESTFN
608        self.fname = os.path.join(self.dirname, "f1")
609
610        self.addCleanup(support.rmtree, self.dirname)
611        os.mkdir(self.dirname)
612        create_file(self.fname)
613
614    def support_subsecond(self, filename):
615        # Heuristic to check if the filesystem supports timestamp with
616        # subsecond resolution: check if float and int timestamps are different
617        st = os.stat(filename)
618        return ((st.st_atime != st[7])
619                or (st.st_mtime != st[8])
620                or (st.st_ctime != st[9]))
621
622    def _test_utime(self, set_time, filename=None):
623        if not filename:
624            filename = self.fname
625
626        support_subsecond = self.support_subsecond(filename)
627        if support_subsecond:
628            # Timestamp with a resolution of 1 microsecond (10^-6).
629            #
630            # The resolution of the C internal function used by os.utime()
631            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
632            # test with a resolution of 1 ns requires more work:
633            # see the issue #15745.
634            atime_ns = 1002003000   # 1.002003 seconds
635            mtime_ns = 4005006000   # 4.005006 seconds
636        else:
637            # use a resolution of 1 second
638            atime_ns = 5 * 10**9
639            mtime_ns = 8 * 10**9
640
641        set_time(filename, (atime_ns, mtime_ns))
642        st = os.stat(filename)
643
644        if support_subsecond:
645            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
646            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
647        else:
648            self.assertEqual(st.st_atime, atime_ns * 1e-9)
649            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
650        self.assertEqual(st.st_atime_ns, atime_ns)
651        self.assertEqual(st.st_mtime_ns, mtime_ns)
652
653    def test_utime(self):
654        def set_time(filename, ns):
655            # test the ns keyword parameter
656            os.utime(filename, ns=ns)
657        self._test_utime(set_time)
658
659    @staticmethod
660    def ns_to_sec(ns):
661        # Convert a number of nanosecond (int) to a number of seconds (float).
662        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
663        # issue, os.utime() rounds towards minus infinity.
664        return (ns * 1e-9) + 0.5e-9
665
666    def test_utime_by_indexed(self):
667        # pass times as floating point seconds as the second indexed parameter
668        def set_time(filename, ns):
669            atime_ns, mtime_ns = ns
670            atime = self.ns_to_sec(atime_ns)
671            mtime = self.ns_to_sec(mtime_ns)
672            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
673            # or utime(time_t)
674            os.utime(filename, (atime, mtime))
675        self._test_utime(set_time)
676
677    def test_utime_by_times(self):
678        def set_time(filename, ns):
679            atime_ns, mtime_ns = ns
680            atime = self.ns_to_sec(atime_ns)
681            mtime = self.ns_to_sec(mtime_ns)
682            # test the times keyword parameter
683            os.utime(filename, times=(atime, mtime))
684        self._test_utime(set_time)
685
686    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
687                         "follow_symlinks support for utime required "
688                         "for this test.")
689    def test_utime_nofollow_symlinks(self):
690        def set_time(filename, ns):
691            # use follow_symlinks=False to test utimensat(timespec)
692            # or lutimes(timeval)
693            os.utime(filename, ns=ns, follow_symlinks=False)
694        self._test_utime(set_time)
695
696    @unittest.skipUnless(os.utime in os.supports_fd,
697                         "fd support for utime required for this test.")
698    def test_utime_fd(self):
699        def set_time(filename, ns):
700            with open(filename, 'wb', 0) as fp:
701                # use a file descriptor to test futimens(timespec)
702                # or futimes(timeval)
703                os.utime(fp.fileno(), ns=ns)
704        self._test_utime(set_time)
705
706    @unittest.skipUnless(os.utime in os.supports_dir_fd,
707                         "dir_fd support for utime required for this test.")
708    def test_utime_dir_fd(self):
709        def set_time(filename, ns):
710            dirname, name = os.path.split(filename)
711            dirfd = os.open(dirname, os.O_RDONLY)
712            try:
713                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
714                os.utime(name, dir_fd=dirfd, ns=ns)
715            finally:
716                os.close(dirfd)
717        self._test_utime(set_time)
718
719    def test_utime_directory(self):
720        def set_time(filename, ns):
721            # test calling os.utime() on a directory
722            os.utime(filename, ns=ns)
723        self._test_utime(set_time, filename=self.dirname)
724
725    def _test_utime_current(self, set_time):
726        # Get the system clock
727        current = time.time()
728
729        # Call os.utime() to set the timestamp to the current system clock
730        set_time(self.fname)
731
732        if not self.support_subsecond(self.fname):
733            delta = 1.0
734        else:
735            # On Windows, the usual resolution of time.time() is 15.6 ms.
736            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
737            #
738            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
739            # also 50 ms on other platforms.
740            delta = 0.050
741        st = os.stat(self.fname)
742        msg = ("st_time=%r, current=%r, dt=%r"
743               % (st.st_mtime, current, st.st_mtime - current))
744        self.assertAlmostEqual(st.st_mtime, current,
745                               delta=delta, msg=msg)
746
747    def test_utime_current(self):
748        def set_time(filename):
749            # Set to the current time in the new way
750            os.utime(self.fname)
751        self._test_utime_current(set_time)
752
753    def test_utime_current_old(self):
754        def set_time(filename):
755            # Set to the current time in the old explicit way.
756            os.utime(self.fname, None)
757        self._test_utime_current(set_time)
758
759    def get_file_system(self, path):
760        if sys.platform == 'win32':
761            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
762            import ctypes
763            kernel32 = ctypes.windll.kernel32
764            buf = ctypes.create_unicode_buffer("", 100)
765            ok = kernel32.GetVolumeInformationW(root, None, 0,
766                                                None, None, None,
767                                                buf, len(buf))
768            if ok:
769                return buf.value
770        # return None if the filesystem is unknown
771
772    def test_large_time(self):
773        # Many filesystems are limited to the year 2038. At least, the test
774        # pass with NTFS filesystem.
775        if self.get_file_system(self.dirname) != "NTFS":
776            self.skipTest("requires NTFS")
777
778        large = 5000000000   # some day in 2128
779        os.utime(self.fname, (large, large))
780        self.assertEqual(os.stat(self.fname).st_mtime, large)
781
782    def test_utime_invalid_arguments(self):
783        # seconds and nanoseconds parameters are mutually exclusive
784        with self.assertRaises(ValueError):
785            os.utime(self.fname, (5, 5), ns=(5, 5))
786        with self.assertRaises(TypeError):
787            os.utime(self.fname, [5, 5])
788        with self.assertRaises(TypeError):
789            os.utime(self.fname, (5,))
790        with self.assertRaises(TypeError):
791            os.utime(self.fname, (5, 5, 5))
792        with self.assertRaises(TypeError):
793            os.utime(self.fname, ns=[5, 5])
794        with self.assertRaises(TypeError):
795            os.utime(self.fname, ns=(5,))
796        with self.assertRaises(TypeError):
797            os.utime(self.fname, ns=(5, 5, 5))
798
799        if os.utime not in os.supports_follow_symlinks:
800            with self.assertRaises(NotImplementedError):
801                os.utime(self.fname, (5, 5), follow_symlinks=False)
802        if os.utime not in os.supports_fd:
803            with open(self.fname, 'wb', 0) as fp:
804                with self.assertRaises(TypeError):
805                    os.utime(fp.fileno(), (5, 5))
806        if os.utime not in os.supports_dir_fd:
807            with self.assertRaises(NotImplementedError):
808                os.utime(self.fname, (5, 5), dir_fd=0)
809
810    @support.cpython_only
811    def test_issue31577(self):
812        # The interpreter shouldn't crash in case utime() received a bad
813        # ns argument.
814        def get_bad_int(divmod_ret_val):
815            class BadInt:
816                def __divmod__(*args):
817                    return divmod_ret_val
818            return BadInt()
819        with self.assertRaises(TypeError):
820            os.utime(self.fname, ns=(get_bad_int(42), 1))
821        with self.assertRaises(TypeError):
822            os.utime(self.fname, ns=(get_bad_int(()), 1))
823        with self.assertRaises(TypeError):
824            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
825
826
827from test import mapping_tests
828
829class EnvironTests(mapping_tests.BasicTestMappingProtocol):
830    """check that os.environ object conform to mapping protocol"""
831    type2test = None
832
833    def setUp(self):
834        self.__save = dict(os.environ)
835        if os.supports_bytes_environ:
836            self.__saveb = dict(os.environb)
837        for key, value in self._reference().items():
838            os.environ[key] = value
839
840    def tearDown(self):
841        os.environ.clear()
842        os.environ.update(self.__save)
843        if os.supports_bytes_environ:
844            os.environb.clear()
845            os.environb.update(self.__saveb)
846
847    def _reference(self):
848        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
849
850    def _empty_mapping(self):
851        os.environ.clear()
852        return os.environ
853
854    # Bug 1110478
855    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
856                         'requires a shell')
857    def test_update2(self):
858        os.environ.clear()
859        os.environ.update(HELLO="World")
860        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
861            value = popen.read().strip()
862            self.assertEqual(value, "World")
863
864    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
865                         'requires a shell')
866    def test_os_popen_iter(self):
867        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
868                      % unix_shell) as popen:
869            it = iter(popen)
870            self.assertEqual(next(it), "line1\n")
871            self.assertEqual(next(it), "line2\n")
872            self.assertEqual(next(it), "line3\n")
873            self.assertRaises(StopIteration, next, it)
874
875    # Verify environ keys and values from the OS are of the
876    # correct str type.
877    def test_keyvalue_types(self):
878        for key, val in os.environ.items():
879            self.assertEqual(type(key), str)
880            self.assertEqual(type(val), str)
881
882    def test_items(self):
883        for key, value in self._reference().items():
884            self.assertEqual(os.environ.get(key), value)
885
886    # Issue 7310
887    def test___repr__(self):
888        """Check that the repr() of os.environ looks like environ({...})."""
889        env = os.environ
890        self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
891            '{!r}: {!r}'.format(key, value)
892            for key, value in env.items())))
893
894    def test_get_exec_path(self):
895        defpath_list = os.defpath.split(os.pathsep)
896        test_path = ['/monty', '/python', '', '/flying/circus']
897        test_env = {'PATH': os.pathsep.join(test_path)}
898
899        saved_environ = os.environ
900        try:
901            os.environ = dict(test_env)
902            # Test that defaulting to os.environ works.
903            self.assertSequenceEqual(test_path, os.get_exec_path())
904            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
905        finally:
906            os.environ = saved_environ
907
908        # No PATH environment variable
909        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
910        # Empty PATH environment variable
911        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
912        # Supplied PATH environment variable
913        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
914
915        if os.supports_bytes_environ:
916            # env cannot contain 'PATH' and b'PATH' keys
917            try:
918                # ignore BytesWarning warning
919                with warnings.catch_warnings(record=True):
920                    mixed_env = {'PATH': '1', b'PATH': b'2'}
921            except BytesWarning:
922                # mixed_env cannot be created with python -bb
923                pass
924            else:
925                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
926
927            # bytes key and/or value
928            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
929                ['abc'])
930            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
931                ['abc'])
932            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
933                ['abc'])
934
935    @unittest.skipUnless(os.supports_bytes_environ,
936                         "os.environb required for this test.")
937    def test_environb(self):
938        # os.environ -> os.environb
939        value = 'euro\u20ac'
940        try:
941            value_bytes = value.encode(sys.getfilesystemencoding(),
942                                       'surrogateescape')
943        except UnicodeEncodeError:
944            msg = "U+20AC character is not encodable to %s" % (
945                sys.getfilesystemencoding(),)
946            self.skipTest(msg)
947        os.environ['unicode'] = value
948        self.assertEqual(os.environ['unicode'], value)
949        self.assertEqual(os.environb[b'unicode'], value_bytes)
950
951        # os.environb -> os.environ
952        value = b'\xff'
953        os.environb[b'bytes'] = value
954        self.assertEqual(os.environb[b'bytes'], value)
955        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
956        self.assertEqual(os.environ['bytes'], value_str)
957
958    def test_putenv_unsetenv(self):
959        name = "PYTHONTESTVAR"
960        value = "testvalue"
961        code = f'import os; print(repr(os.environ.get({name!r})))'
962
963        with support.EnvironmentVarGuard() as env:
964            env.pop(name, None)
965
966            os.putenv(name, value)
967            proc = subprocess.run([sys.executable, '-c', code], check=True,
968                                  stdout=subprocess.PIPE, text=True)
969            self.assertEqual(proc.stdout.rstrip(), repr(value))
970
971            os.unsetenv(name)
972            proc = subprocess.run([sys.executable, '-c', code], check=True,
973                                  stdout=subprocess.PIPE, text=True)
974            self.assertEqual(proc.stdout.rstrip(), repr(None))
975
976    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
977    @support.requires_mac_ver(10, 6)
978    def test_putenv_unsetenv_error(self):
979        # Empty variable name is invalid.
980        # "=" and null character are not allowed in a variable name.
981        for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
982            self.assertRaises((OSError, ValueError), os.putenv, name, "value")
983            self.assertRaises((OSError, ValueError), os.unsetenv, name)
984
985        if sys.platform == "win32":
986            # On Windows, an environment variable string ("name=value" string)
987            # is limited to 32,767 characters
988            longstr = 'x' * 32_768
989            self.assertRaises(ValueError, os.putenv, longstr, "1")
990            self.assertRaises(ValueError, os.putenv, "X", longstr)
991            self.assertRaises(ValueError, os.unsetenv, longstr)
992
993    def test_key_type(self):
994        missing = 'missingkey'
995        self.assertNotIn(missing, os.environ)
996
997        with self.assertRaises(KeyError) as cm:
998            os.environ[missing]
999        self.assertIs(cm.exception.args[0], missing)
1000        self.assertTrue(cm.exception.__suppress_context__)
1001
1002        with self.assertRaises(KeyError) as cm:
1003            del os.environ[missing]
1004        self.assertIs(cm.exception.args[0], missing)
1005        self.assertTrue(cm.exception.__suppress_context__)
1006
1007    def _test_environ_iteration(self, collection):
1008        iterator = iter(collection)
1009        new_key = "__new_key__"
1010
1011        next(iterator)  # start iteration over os.environ.items
1012
1013        # add a new key in os.environ mapping
1014        os.environ[new_key] = "test_environ_iteration"
1015
1016        try:
1017            next(iterator)  # force iteration over modified mapping
1018            self.assertEqual(os.environ[new_key], "test_environ_iteration")
1019        finally:
1020            del os.environ[new_key]
1021
1022    def test_iter_error_when_changing_os_environ(self):
1023        self._test_environ_iteration(os.environ)
1024
1025    def test_iter_error_when_changing_os_environ_items(self):
1026        self._test_environ_iteration(os.environ.items())
1027
1028    def test_iter_error_when_changing_os_environ_values(self):
1029        self._test_environ_iteration(os.environ.values())
1030
1031    def _test_underlying_process_env(self, var, expected):
1032        if not (unix_shell and os.path.exists(unix_shell)):
1033            return
1034
1035        with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1036            value = popen.read().strip()
1037
1038        self.assertEqual(expected, value)
1039
1040    def test_or_operator(self):
1041        overridden_key = '_TEST_VAR_'
1042        original_value = 'original_value'
1043        os.environ[overridden_key] = original_value
1044
1045        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1046        expected = dict(os.environ)
1047        expected.update(new_vars_dict)
1048
1049        actual = os.environ | new_vars_dict
1050        self.assertDictEqual(expected, actual)
1051        self.assertEqual('3', actual[overridden_key])
1052
1053        new_vars_items = new_vars_dict.items()
1054        self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1055
1056        self._test_underlying_process_env('_A_', '')
1057        self._test_underlying_process_env(overridden_key, original_value)
1058
1059    def test_ior_operator(self):
1060        overridden_key = '_TEST_VAR_'
1061        os.environ[overridden_key] = 'original_value'
1062
1063        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1064        expected = dict(os.environ)
1065        expected.update(new_vars_dict)
1066
1067        os.environ |= new_vars_dict
1068        self.assertEqual(expected, os.environ)
1069        self.assertEqual('3', os.environ[overridden_key])
1070
1071        self._test_underlying_process_env('_A_', '1')
1072        self._test_underlying_process_env(overridden_key, '3')
1073
1074    def test_ior_operator_invalid_dicts(self):
1075        os_environ_copy = os.environ.copy()
1076        with self.assertRaises(TypeError):
1077            dict_with_bad_key = {1: '_A_'}
1078            os.environ |= dict_with_bad_key
1079
1080        with self.assertRaises(TypeError):
1081            dict_with_bad_val = {'_A_': 1}
1082            os.environ |= dict_with_bad_val
1083
1084        # Check nothing was added.
1085        self.assertEqual(os_environ_copy, os.environ)
1086
1087    def test_ior_operator_key_value_iterable(self):
1088        overridden_key = '_TEST_VAR_'
1089        os.environ[overridden_key] = 'original_value'
1090
1091        new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1092        expected = dict(os.environ)
1093        expected.update(new_vars_items)
1094
1095        os.environ |= new_vars_items
1096        self.assertEqual(expected, os.environ)
1097        self.assertEqual('3', os.environ[overridden_key])
1098
1099        self._test_underlying_process_env('_A_', '1')
1100        self._test_underlying_process_env(overridden_key, '3')
1101
1102    def test_ror_operator(self):
1103        overridden_key = '_TEST_VAR_'
1104        original_value = 'original_value'
1105        os.environ[overridden_key] = original_value
1106
1107        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1108        expected = dict(new_vars_dict)
1109        expected.update(os.environ)
1110
1111        actual = new_vars_dict | os.environ
1112        self.assertDictEqual(expected, actual)
1113        self.assertEqual(original_value, actual[overridden_key])
1114
1115        new_vars_items = new_vars_dict.items()
1116        self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1117
1118        self._test_underlying_process_env('_A_', '')
1119        self._test_underlying_process_env(overridden_key, original_value)
1120
1121
1122class WalkTests(unittest.TestCase):
1123    """Tests for os.walk()."""
1124
1125    # Wrapper to hide minor differences between os.walk and os.fwalk
1126    # to tests both functions with the same code base
1127    def walk(self, top, **kwargs):
1128        if 'follow_symlinks' in kwargs:
1129            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1130        return os.walk(top, **kwargs)
1131
1132    def setUp(self):
1133        join = os.path.join
1134        self.addCleanup(support.rmtree, support.TESTFN)
1135
1136        # Build:
1137        #     TESTFN/
1138        #       TEST1/              a file kid and two directory kids
1139        #         tmp1
1140        #         SUB1/             a file kid and a directory kid
1141        #           tmp2
1142        #           SUB11/          no kids
1143        #         SUB2/             a file kid and a dirsymlink kid
1144        #           tmp3
1145        #           SUB21/          not readable
1146        #             tmp5
1147        #           link/           a symlink to TESTFN.2
1148        #           broken_link
1149        #           broken_link2
1150        #           broken_link3
1151        #       TEST2/
1152        #         tmp4              a lone file
1153        self.walk_path = join(support.TESTFN, "TEST1")
1154        self.sub1_path = join(self.walk_path, "SUB1")
1155        self.sub11_path = join(self.sub1_path, "SUB11")
1156        sub2_path = join(self.walk_path, "SUB2")
1157        sub21_path = join(sub2_path, "SUB21")
1158        tmp1_path = join(self.walk_path, "tmp1")
1159        tmp2_path = join(self.sub1_path, "tmp2")
1160        tmp3_path = join(sub2_path, "tmp3")
1161        tmp5_path = join(sub21_path, "tmp3")
1162        self.link_path = join(sub2_path, "link")
1163        t2_path = join(support.TESTFN, "TEST2")
1164        tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
1165        broken_link_path = join(sub2_path, "broken_link")
1166        broken_link2_path = join(sub2_path, "broken_link2")
1167        broken_link3_path = join(sub2_path, "broken_link3")
1168
1169        # Create stuff.
1170        os.makedirs(self.sub11_path)
1171        os.makedirs(sub2_path)
1172        os.makedirs(sub21_path)
1173        os.makedirs(t2_path)
1174
1175        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
1176            with open(path, "x", encoding='utf-8') as f:
1177                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
1178
1179        if support.can_symlink():
1180            os.symlink(os.path.abspath(t2_path), self.link_path)
1181            os.symlink('broken', broken_link_path, True)
1182            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1183            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
1184            self.sub2_tree = (sub2_path, ["SUB21", "link"],
1185                              ["broken_link", "broken_link2", "broken_link3",
1186                               "tmp3"])
1187        else:
1188            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
1189
1190        os.chmod(sub21_path, 0)
1191        try:
1192            os.listdir(sub21_path)
1193        except PermissionError:
1194            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1195        else:
1196            os.chmod(sub21_path, stat.S_IRWXU)
1197            os.unlink(tmp5_path)
1198            os.rmdir(sub21_path)
1199            del self.sub2_tree[1][:1]
1200
1201    def test_walk_topdown(self):
1202        # Walk top-down.
1203        all = list(self.walk(self.walk_path))
1204
1205        self.assertEqual(len(all), 4)
1206        # We can't know which order SUB1 and SUB2 will appear in.
1207        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
1208        #     flipped:  TESTFN, SUB2, SUB1, SUB11
1209        flipped = all[0][1][0] != "SUB1"
1210        all[0][1].sort()
1211        all[3 - 2 * flipped][-1].sort()
1212        all[3 - 2 * flipped][1].sort()
1213        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1214        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1215        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1216        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
1217
1218    def test_walk_prune(self, walk_path=None):
1219        if walk_path is None:
1220            walk_path = self.walk_path
1221        # Prune the search.
1222        all = []
1223        for root, dirs, files in self.walk(walk_path):
1224            all.append((root, dirs, files))
1225            # Don't descend into SUB1.
1226            if 'SUB1' in dirs:
1227                # Note that this also mutates the dirs we appended to all!
1228                dirs.remove('SUB1')
1229
1230        self.assertEqual(len(all), 2)
1231        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
1232
1233        all[1][-1].sort()
1234        all[1][1].sort()
1235        self.assertEqual(all[1], self.sub2_tree)
1236
1237    def test_file_like_path(self):
1238        self.test_walk_prune(FakePath(self.walk_path))
1239
1240    def test_walk_bottom_up(self):
1241        # Walk bottom-up.
1242        all = list(self.walk(self.walk_path, topdown=False))
1243
1244        self.assertEqual(len(all), 4, all)
1245        # We can't know which order SUB1 and SUB2 will appear in.
1246        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
1247        #     flipped:  SUB2, SUB11, SUB1, TESTFN
1248        flipped = all[3][1][0] != "SUB1"
1249        all[3][1].sort()
1250        all[2 - 2 * flipped][-1].sort()
1251        all[2 - 2 * flipped][1].sort()
1252        self.assertEqual(all[3],
1253                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1254        self.assertEqual(all[flipped],
1255                         (self.sub11_path, [], []))
1256        self.assertEqual(all[flipped + 1],
1257                         (self.sub1_path, ["SUB11"], ["tmp2"]))
1258        self.assertEqual(all[2 - 2 * flipped],
1259                         self.sub2_tree)
1260
1261    def test_walk_symlink(self):
1262        if not support.can_symlink():
1263            self.skipTest("need symlink support")
1264
1265        # Walk, following symlinks.
1266        walk_it = self.walk(self.walk_path, follow_symlinks=True)
1267        for root, dirs, files in walk_it:
1268            if root == self.link_path:
1269                self.assertEqual(dirs, [])
1270                self.assertEqual(files, ["tmp4"])
1271                break
1272        else:
1273            self.fail("Didn't follow symlink with followlinks=True")
1274
1275    def test_walk_bad_dir(self):
1276        # Walk top-down.
1277        errors = []
1278        walk_it = self.walk(self.walk_path, onerror=errors.append)
1279        root, dirs, files = next(walk_it)
1280        self.assertEqual(errors, [])
1281        dir1 = 'SUB1'
1282        path1 = os.path.join(root, dir1)
1283        path1new = os.path.join(root, dir1 + '.new')
1284        os.rename(path1, path1new)
1285        try:
1286            roots = [r for r, d, f in walk_it]
1287            self.assertTrue(errors)
1288            self.assertNotIn(path1, roots)
1289            self.assertNotIn(path1new, roots)
1290            for dir2 in dirs:
1291                if dir2 != dir1:
1292                    self.assertIn(os.path.join(root, dir2), roots)
1293        finally:
1294            os.rename(path1new, path1)
1295
1296    def test_walk_many_open_files(self):
1297        depth = 30
1298        base = os.path.join(support.TESTFN, 'deep')
1299        p = os.path.join(base, *(['d']*depth))
1300        os.makedirs(p)
1301
1302        iters = [self.walk(base, topdown=False) for j in range(100)]
1303        for i in range(depth + 1):
1304            expected = (p, ['d'] if i else [], [])
1305            for it in iters:
1306                self.assertEqual(next(it), expected)
1307            p = os.path.dirname(p)
1308
1309        iters = [self.walk(base, topdown=True) for j in range(100)]
1310        p = base
1311        for i in range(depth + 1):
1312            expected = (p, ['d'] if i < depth else [], [])
1313            for it in iters:
1314                self.assertEqual(next(it), expected)
1315            p = os.path.join(p, 'd')
1316
1317
1318@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1319class FwalkTests(WalkTests):
1320    """Tests for os.fwalk()."""
1321
1322    def walk(self, top, **kwargs):
1323        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1324            yield (root, dirs, files)
1325
1326    def fwalk(self, *args, **kwargs):
1327        return os.fwalk(*args, **kwargs)
1328
1329    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1330        """
1331        compare with walk() results.
1332        """
1333        walk_kwargs = walk_kwargs.copy()
1334        fwalk_kwargs = fwalk_kwargs.copy()
1335        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1336            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1337            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1338
1339            expected = {}
1340            for root, dirs, files in os.walk(**walk_kwargs):
1341                expected[root] = (set(dirs), set(files))
1342
1343            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1344                self.assertIn(root, expected)
1345                self.assertEqual(expected[root], (set(dirs), set(files)))
1346
1347    def test_compare_to_walk(self):
1348        kwargs = {'top': support.TESTFN}
1349        self._compare_to_walk(kwargs, kwargs)
1350
1351    def test_dir_fd(self):
1352        try:
1353            fd = os.open(".", os.O_RDONLY)
1354            walk_kwargs = {'top': support.TESTFN}
1355            fwalk_kwargs = walk_kwargs.copy()
1356            fwalk_kwargs['dir_fd'] = fd
1357            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1358        finally:
1359            os.close(fd)
1360
1361    def test_yields_correct_dir_fd(self):
1362        # check returned file descriptors
1363        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1364            args = support.TESTFN, topdown, None
1365            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1366                # check that the FD is valid
1367                os.fstat(rootfd)
1368                # redundant check
1369                os.stat(rootfd)
1370                # check that listdir() returns consistent information
1371                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1372
1373    def test_fd_leak(self):
1374        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1375        # we both check that calling fwalk() a large number of times doesn't
1376        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1377        minfd = os.dup(1)
1378        os.close(minfd)
1379        for i in range(256):
1380            for x in self.fwalk(support.TESTFN):
1381                pass
1382        newfd = os.dup(1)
1383        self.addCleanup(os.close, newfd)
1384        self.assertEqual(newfd, minfd)
1385
1386    # fwalk() keeps file descriptors open
1387    test_walk_many_open_files = None
1388
1389
1390class BytesWalkTests(WalkTests):
1391    """Tests for os.walk() with bytes."""
1392    def walk(self, top, **kwargs):
1393        if 'follow_symlinks' in kwargs:
1394            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1395        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1396            root = os.fsdecode(broot)
1397            dirs = list(map(os.fsdecode, bdirs))
1398            files = list(map(os.fsdecode, bfiles))
1399            yield (root, dirs, files)
1400            bdirs[:] = list(map(os.fsencode, dirs))
1401            bfiles[:] = list(map(os.fsencode, files))
1402
1403@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1404class BytesFwalkTests(FwalkTests):
1405    """Tests for os.walk() with bytes."""
1406    def fwalk(self, top='.', *args, **kwargs):
1407        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1408            root = os.fsdecode(broot)
1409            dirs = list(map(os.fsdecode, bdirs))
1410            files = list(map(os.fsdecode, bfiles))
1411            yield (root, dirs, files, topfd)
1412            bdirs[:] = list(map(os.fsencode, dirs))
1413            bfiles[:] = list(map(os.fsencode, files))
1414
1415
1416class MakedirTests(unittest.TestCase):
1417    def setUp(self):
1418        os.mkdir(support.TESTFN)
1419
1420    def test_makedir(self):
1421        base = support.TESTFN
1422        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1423        os.makedirs(path)             # Should work
1424        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1425        os.makedirs(path)
1426
1427        # Try paths with a '.' in them
1428        self.assertRaises(OSError, os.makedirs, os.curdir)
1429        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1430        os.makedirs(path)
1431        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1432                            'dir5', 'dir6')
1433        os.makedirs(path)
1434
1435    def test_mode(self):
1436        with support.temp_umask(0o002):
1437            base = support.TESTFN
1438            parent = os.path.join(base, 'dir1')
1439            path = os.path.join(parent, 'dir2')
1440            os.makedirs(path, 0o555)
1441            self.assertTrue(os.path.exists(path))
1442            self.assertTrue(os.path.isdir(path))
1443            if os.name != 'nt':
1444                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1445                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1446
1447    def test_exist_ok_existing_directory(self):
1448        path = os.path.join(support.TESTFN, 'dir1')
1449        mode = 0o777
1450        old_mask = os.umask(0o022)
1451        os.makedirs(path, mode)
1452        self.assertRaises(OSError, os.makedirs, path, mode)
1453        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1454        os.makedirs(path, 0o776, exist_ok=True)
1455        os.makedirs(path, mode=mode, exist_ok=True)
1456        os.umask(old_mask)
1457
1458        # Issue #25583: A drive root could raise PermissionError on Windows
1459        os.makedirs(os.path.abspath('/'), exist_ok=True)
1460
1461    def test_exist_ok_s_isgid_directory(self):
1462        path = os.path.join(support.TESTFN, 'dir1')
1463        S_ISGID = stat.S_ISGID
1464        mode = 0o777
1465        old_mask = os.umask(0o022)
1466        try:
1467            existing_testfn_mode = stat.S_IMODE(
1468                    os.lstat(support.TESTFN).st_mode)
1469            try:
1470                os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
1471            except PermissionError:
1472                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1473            if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1474                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1475            # The os should apply S_ISGID from the parent dir for us, but
1476            # this test need not depend on that behavior.  Be explicit.
1477            os.makedirs(path, mode | S_ISGID)
1478            # http://bugs.python.org/issue14992
1479            # Should not fail when the bit is already set.
1480            os.makedirs(path, mode, exist_ok=True)
1481            # remove the bit.
1482            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1483            # May work even when the bit is not already set when demanded.
1484            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1485        finally:
1486            os.umask(old_mask)
1487
1488    def test_exist_ok_existing_regular_file(self):
1489        base = support.TESTFN
1490        path = os.path.join(support.TESTFN, 'dir1')
1491        with open(path, 'w') as f:
1492            f.write('abc')
1493        self.assertRaises(OSError, os.makedirs, path)
1494        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1495        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1496        os.remove(path)
1497
1498    def tearDown(self):
1499        path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
1500                            'dir4', 'dir5', 'dir6')
1501        # If the tests failed, the bottom-most directory ('../dir6')
1502        # may not have been created, so we look for the outermost directory
1503        # that exists.
1504        while not os.path.exists(path) and path != support.TESTFN:
1505            path = os.path.dirname(path)
1506
1507        os.removedirs(path)
1508
1509
1510@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1511class ChownFileTests(unittest.TestCase):
1512
1513    @classmethod
1514    def setUpClass(cls):
1515        os.mkdir(support.TESTFN)
1516
1517    def test_chown_uid_gid_arguments_must_be_index(self):
1518        stat = os.stat(support.TESTFN)
1519        uid = stat.st_uid
1520        gid = stat.st_gid
1521        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1522            self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1523            self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1524        self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1525        self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1526
1527    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1528    def test_chown_gid(self):
1529        groups = os.getgroups()
1530        if len(groups) < 2:
1531            self.skipTest("test needs at least 2 groups")
1532
1533        gid_1, gid_2 = groups[:2]
1534        uid = os.stat(support.TESTFN).st_uid
1535
1536        os.chown(support.TESTFN, uid, gid_1)
1537        gid = os.stat(support.TESTFN).st_gid
1538        self.assertEqual(gid, gid_1)
1539
1540        os.chown(support.TESTFN, uid, gid_2)
1541        gid = os.stat(support.TESTFN).st_gid
1542        self.assertEqual(gid, gid_2)
1543
1544    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1545                         "test needs root privilege and more than one user")
1546    def test_chown_with_root(self):
1547        uid_1, uid_2 = all_users[:2]
1548        gid = os.stat(support.TESTFN).st_gid
1549        os.chown(support.TESTFN, uid_1, gid)
1550        uid = os.stat(support.TESTFN).st_uid
1551        self.assertEqual(uid, uid_1)
1552        os.chown(support.TESTFN, uid_2, gid)
1553        uid = os.stat(support.TESTFN).st_uid
1554        self.assertEqual(uid, uid_2)
1555
1556    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1557                         "test needs non-root account and more than one user")
1558    def test_chown_without_permission(self):
1559        uid_1, uid_2 = all_users[:2]
1560        gid = os.stat(support.TESTFN).st_gid
1561        with self.assertRaises(PermissionError):
1562            os.chown(support.TESTFN, uid_1, gid)
1563            os.chown(support.TESTFN, uid_2, gid)
1564
1565    @classmethod
1566    def tearDownClass(cls):
1567        os.rmdir(support.TESTFN)
1568
1569
1570class RemoveDirsTests(unittest.TestCase):
1571    def setUp(self):
1572        os.makedirs(support.TESTFN)
1573
1574    def tearDown(self):
1575        support.rmtree(support.TESTFN)
1576
1577    def test_remove_all(self):
1578        dira = os.path.join(support.TESTFN, 'dira')
1579        os.mkdir(dira)
1580        dirb = os.path.join(dira, 'dirb')
1581        os.mkdir(dirb)
1582        os.removedirs(dirb)
1583        self.assertFalse(os.path.exists(dirb))
1584        self.assertFalse(os.path.exists(dira))
1585        self.assertFalse(os.path.exists(support.TESTFN))
1586
1587    def test_remove_partial(self):
1588        dira = os.path.join(support.TESTFN, 'dira')
1589        os.mkdir(dira)
1590        dirb = os.path.join(dira, 'dirb')
1591        os.mkdir(dirb)
1592        create_file(os.path.join(dira, 'file.txt'))
1593        os.removedirs(dirb)
1594        self.assertFalse(os.path.exists(dirb))
1595        self.assertTrue(os.path.exists(dira))
1596        self.assertTrue(os.path.exists(support.TESTFN))
1597
1598    def test_remove_nothing(self):
1599        dira = os.path.join(support.TESTFN, 'dira')
1600        os.mkdir(dira)
1601        dirb = os.path.join(dira, 'dirb')
1602        os.mkdir(dirb)
1603        create_file(os.path.join(dirb, 'file.txt'))
1604        with self.assertRaises(OSError):
1605            os.removedirs(dirb)
1606        self.assertTrue(os.path.exists(dirb))
1607        self.assertTrue(os.path.exists(dira))
1608        self.assertTrue(os.path.exists(support.TESTFN))
1609
1610
1611class DevNullTests(unittest.TestCase):
1612    def test_devnull(self):
1613        with open(os.devnull, 'wb', 0) as f:
1614            f.write(b'hello')
1615            f.close()
1616        with open(os.devnull, 'rb') as f:
1617            self.assertEqual(f.read(), b'')
1618
1619
1620class URandomTests(unittest.TestCase):
1621    def test_urandom_length(self):
1622        self.assertEqual(len(os.urandom(0)), 0)
1623        self.assertEqual(len(os.urandom(1)), 1)
1624        self.assertEqual(len(os.urandom(10)), 10)
1625        self.assertEqual(len(os.urandom(100)), 100)
1626        self.assertEqual(len(os.urandom(1000)), 1000)
1627
1628    def test_urandom_value(self):
1629        data1 = os.urandom(16)
1630        self.assertIsInstance(data1, bytes)
1631        data2 = os.urandom(16)
1632        self.assertNotEqual(data1, data2)
1633
1634    def get_urandom_subprocess(self, count):
1635        code = '\n'.join((
1636            'import os, sys',
1637            'data = os.urandom(%s)' % count,
1638            'sys.stdout.buffer.write(data)',
1639            'sys.stdout.buffer.flush()'))
1640        out = assert_python_ok('-c', code)
1641        stdout = out[1]
1642        self.assertEqual(len(stdout), count)
1643        return stdout
1644
1645    def test_urandom_subprocess(self):
1646        data1 = self.get_urandom_subprocess(16)
1647        data2 = self.get_urandom_subprocess(16)
1648        self.assertNotEqual(data1, data2)
1649
1650
1651@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1652class GetRandomTests(unittest.TestCase):
1653    @classmethod
1654    def setUpClass(cls):
1655        try:
1656            os.getrandom(1)
1657        except OSError as exc:
1658            if exc.errno == errno.ENOSYS:
1659                # Python compiled on a more recent Linux version
1660                # than the current Linux kernel
1661                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1662            else:
1663                raise
1664
1665    def test_getrandom_type(self):
1666        data = os.getrandom(16)
1667        self.assertIsInstance(data, bytes)
1668        self.assertEqual(len(data), 16)
1669
1670    def test_getrandom0(self):
1671        empty = os.getrandom(0)
1672        self.assertEqual(empty, b'')
1673
1674    def test_getrandom_random(self):
1675        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1676
1677        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1678        # resource /dev/random
1679
1680    def test_getrandom_nonblock(self):
1681        # The call must not fail. Check also that the flag exists
1682        try:
1683            os.getrandom(1, os.GRND_NONBLOCK)
1684        except BlockingIOError:
1685            # System urandom is not initialized yet
1686            pass
1687
1688    def test_getrandom_value(self):
1689        data1 = os.getrandom(16)
1690        data2 = os.getrandom(16)
1691        self.assertNotEqual(data1, data2)
1692
1693
1694# os.urandom() doesn't use a file descriptor when it is implemented with the
1695# getentropy() function, the getrandom() function or the getrandom() syscall
1696OS_URANDOM_DONT_USE_FD = (
1697    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1698    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1699    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1700
1701@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1702                 "os.random() does not use a file descriptor")
1703@unittest.skipIf(sys.platform == "vxworks",
1704                 "VxWorks can't set RLIMIT_NOFILE to 1")
1705class URandomFDTests(unittest.TestCase):
1706    @unittest.skipUnless(resource, "test requires the resource module")
1707    def test_urandom_failure(self):
1708        # Check urandom() failing when it is not able to open /dev/random.
1709        # We spawn a new process to make the test more robust (if getrlimit()
1710        # failed to restore the file descriptor limit after this, the whole
1711        # test suite would crash; this actually happened on the OS X Tiger
1712        # buildbot).
1713        code = """if 1:
1714            import errno
1715            import os
1716            import resource
1717
1718            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1719            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1720            try:
1721                os.urandom(16)
1722            except OSError as e:
1723                assert e.errno == errno.EMFILE, e.errno
1724            else:
1725                raise AssertionError("OSError not raised")
1726            """
1727        assert_python_ok('-c', code)
1728
1729    def test_urandom_fd_closed(self):
1730        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1731        # closed.
1732        code = """if 1:
1733            import os
1734            import sys
1735            import test.support
1736            os.urandom(4)
1737            with test.support.SuppressCrashReport():
1738                os.closerange(3, 256)
1739            sys.stdout.buffer.write(os.urandom(4))
1740            """
1741        rc, out, err = assert_python_ok('-Sc', code)
1742
1743    def test_urandom_fd_reopened(self):
1744        # Issue #21207: urandom() should detect its fd to /dev/urandom
1745        # changed to something else, and reopen it.
1746        self.addCleanup(support.unlink, support.TESTFN)
1747        create_file(support.TESTFN, b"x" * 256)
1748
1749        code = """if 1:
1750            import os
1751            import sys
1752            import test.support
1753            os.urandom(4)
1754            with test.support.SuppressCrashReport():
1755                for fd in range(3, 256):
1756                    try:
1757                        os.close(fd)
1758                    except OSError:
1759                        pass
1760                    else:
1761                        # Found the urandom fd (XXX hopefully)
1762                        break
1763                os.closerange(3, 256)
1764            with open({TESTFN!r}, 'rb') as f:
1765                new_fd = f.fileno()
1766                # Issue #26935: posix allows new_fd and fd to be equal but
1767                # some libc implementations have dup2 return an error in this
1768                # case.
1769                if new_fd != fd:
1770                    os.dup2(new_fd, fd)
1771                sys.stdout.buffer.write(os.urandom(4))
1772                sys.stdout.buffer.write(os.urandom(4))
1773            """.format(TESTFN=support.TESTFN)
1774        rc, out, err = assert_python_ok('-Sc', code)
1775        self.assertEqual(len(out), 8)
1776        self.assertNotEqual(out[0:4], out[4:8])
1777        rc, out2, err2 = assert_python_ok('-Sc', code)
1778        self.assertEqual(len(out2), 8)
1779        self.assertNotEqual(out2, out)
1780
1781
1782@contextlib.contextmanager
1783def _execvpe_mockup(defpath=None):
1784    """
1785    Stubs out execv and execve functions when used as context manager.
1786    Records exec calls. The mock execv and execve functions always raise an
1787    exception as they would normally never return.
1788    """
1789    # A list of tuples containing (function name, first arg, args)
1790    # of calls to execv or execve that have been made.
1791    calls = []
1792
1793    def mock_execv(name, *args):
1794        calls.append(('execv', name, args))
1795        raise RuntimeError("execv called")
1796
1797    def mock_execve(name, *args):
1798        calls.append(('execve', name, args))
1799        raise OSError(errno.ENOTDIR, "execve called")
1800
1801    try:
1802        orig_execv = os.execv
1803        orig_execve = os.execve
1804        orig_defpath = os.defpath
1805        os.execv = mock_execv
1806        os.execve = mock_execve
1807        if defpath is not None:
1808            os.defpath = defpath
1809        yield calls
1810    finally:
1811        os.execv = orig_execv
1812        os.execve = orig_execve
1813        os.defpath = orig_defpath
1814
1815@unittest.skipUnless(hasattr(os, 'execv'),
1816                     "need os.execv()")
1817class ExecTests(unittest.TestCase):
1818    @unittest.skipIf(USING_LINUXTHREADS,
1819                     "avoid triggering a linuxthreads bug: see issue #4970")
1820    def test_execvpe_with_bad_program(self):
1821        self.assertRaises(OSError, os.execvpe, 'no such app-',
1822                          ['no such app-'], None)
1823
1824    def test_execv_with_bad_arglist(self):
1825        self.assertRaises(ValueError, os.execv, 'notepad', ())
1826        self.assertRaises(ValueError, os.execv, 'notepad', [])
1827        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1828        self.assertRaises(ValueError, os.execv, 'notepad', [''])
1829
1830    def test_execvpe_with_bad_arglist(self):
1831        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1832        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1833        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
1834
1835    @unittest.skipUnless(hasattr(os, '_execvpe'),
1836                         "No internal os._execvpe function to test.")
1837    def _test_internal_execvpe(self, test_type):
1838        program_path = os.sep + 'absolutepath'
1839        if test_type is bytes:
1840            program = b'executable'
1841            fullpath = os.path.join(os.fsencode(program_path), program)
1842            native_fullpath = fullpath
1843            arguments = [b'progname', 'arg1', 'arg2']
1844        else:
1845            program = 'executable'
1846            arguments = ['progname', 'arg1', 'arg2']
1847            fullpath = os.path.join(program_path, program)
1848            if os.name != "nt":
1849                native_fullpath = os.fsencode(fullpath)
1850            else:
1851                native_fullpath = fullpath
1852        env = {'spam': 'beans'}
1853
1854        # test os._execvpe() with an absolute path
1855        with _execvpe_mockup() as calls:
1856            self.assertRaises(RuntimeError,
1857                os._execvpe, fullpath, arguments)
1858            self.assertEqual(len(calls), 1)
1859            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1860
1861        # test os._execvpe() with a relative path:
1862        # os.get_exec_path() returns defpath
1863        with _execvpe_mockup(defpath=program_path) as calls:
1864            self.assertRaises(OSError,
1865                os._execvpe, program, arguments, env=env)
1866            self.assertEqual(len(calls), 1)
1867            self.assertSequenceEqual(calls[0],
1868                ('execve', native_fullpath, (arguments, env)))
1869
1870        # test os._execvpe() with a relative path:
1871        # os.get_exec_path() reads the 'PATH' variable
1872        with _execvpe_mockup() as calls:
1873            env_path = env.copy()
1874            if test_type is bytes:
1875                env_path[b'PATH'] = program_path
1876            else:
1877                env_path['PATH'] = program_path
1878            self.assertRaises(OSError,
1879                os._execvpe, program, arguments, env=env_path)
1880            self.assertEqual(len(calls), 1)
1881            self.assertSequenceEqual(calls[0],
1882                ('execve', native_fullpath, (arguments, env_path)))
1883
1884    def test_internal_execvpe_str(self):
1885        self._test_internal_execvpe(str)
1886        if os.name != "nt":
1887            self._test_internal_execvpe(bytes)
1888
1889    def test_execve_invalid_env(self):
1890        args = [sys.executable, '-c', 'pass']
1891
1892        # null character in the environment variable name
1893        newenv = os.environ.copy()
1894        newenv["FRUIT\0VEGETABLE"] = "cabbage"
1895        with self.assertRaises(ValueError):
1896            os.execve(args[0], args, newenv)
1897
1898        # null character in the environment variable value
1899        newenv = os.environ.copy()
1900        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1901        with self.assertRaises(ValueError):
1902            os.execve(args[0], args, newenv)
1903
1904        # equal character in the environment variable name
1905        newenv = os.environ.copy()
1906        newenv["FRUIT=ORANGE"] = "lemon"
1907        with self.assertRaises(ValueError):
1908            os.execve(args[0], args, newenv)
1909
1910    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1911    def test_execve_with_empty_path(self):
1912        # bpo-32890: Check GetLastError() misuse
1913        try:
1914            os.execve('', ['arg'], {})
1915        except OSError as e:
1916            self.assertTrue(e.winerror is None or e.winerror != 0)
1917        else:
1918            self.fail('No OSError raised')
1919
1920
1921@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1922class Win32ErrorTests(unittest.TestCase):
1923    def setUp(self):
1924        try:
1925            os.stat(support.TESTFN)
1926        except FileNotFoundError:
1927            exists = False
1928        except OSError as exc:
1929            exists = True
1930            self.fail("file %s must not exist; os.stat failed with %s"
1931                      % (support.TESTFN, exc))
1932        else:
1933            self.fail("file %s must not exist" % support.TESTFN)
1934
1935    def test_rename(self):
1936        self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
1937
1938    def test_remove(self):
1939        self.assertRaises(OSError, os.remove, support.TESTFN)
1940
1941    def test_chdir(self):
1942        self.assertRaises(OSError, os.chdir, support.TESTFN)
1943
1944    def test_mkdir(self):
1945        self.addCleanup(support.unlink, support.TESTFN)
1946
1947        with open(support.TESTFN, "x") as f:
1948            self.assertRaises(OSError, os.mkdir, support.TESTFN)
1949
1950    def test_utime(self):
1951        self.assertRaises(OSError, os.utime, support.TESTFN, None)
1952
1953    def test_chmod(self):
1954        self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
1955
1956
1957class TestInvalidFD(unittest.TestCase):
1958    singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
1959               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1960    #singles.append("close")
1961    #We omit close because it doesn't raise an exception on some platforms
1962    def get_single(f):
1963        def helper(self):
1964            if  hasattr(os, f):
1965                self.check(getattr(os, f))
1966        return helper
1967    for f in singles:
1968        locals()["test_"+f] = get_single(f)
1969
1970    def check(self, f, *args):
1971        try:
1972            f(support.make_bad_fd(), *args)
1973        except OSError as e:
1974            self.assertEqual(e.errno, errno.EBADF)
1975        else:
1976            self.fail("%r didn't raise an OSError with a bad file descriptor"
1977                      % f)
1978
1979    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
1980    def test_isatty(self):
1981        self.assertEqual(os.isatty(support.make_bad_fd()), False)
1982
1983    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
1984    def test_closerange(self):
1985        fd = support.make_bad_fd()
1986        # Make sure none of the descriptors we are about to close are
1987        # currently valid (issue 6542).
1988        for i in range(10):
1989            try: os.fstat(fd+i)
1990            except OSError:
1991                pass
1992            else:
1993                break
1994        if i < 2:
1995            raise unittest.SkipTest(
1996                "Unable to acquire a range of invalid file descriptors")
1997        self.assertEqual(os.closerange(fd, fd + i-1), None)
1998
1999    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2000    def test_dup2(self):
2001        self.check(os.dup2, 20)
2002
2003    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
2004    def test_fchmod(self):
2005        self.check(os.fchmod, 0)
2006
2007    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
2008    def test_fchown(self):
2009        self.check(os.fchown, -1, -1)
2010
2011    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
2012    def test_fpathconf(self):
2013        self.check(os.pathconf, "PC_NAME_MAX")
2014        self.check(os.fpathconf, "PC_NAME_MAX")
2015
2016    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
2017    def test_ftruncate(self):
2018        self.check(os.truncate, 0)
2019        self.check(os.ftruncate, 0)
2020
2021    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
2022    def test_lseek(self):
2023        self.check(os.lseek, 0, 0)
2024
2025    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
2026    def test_read(self):
2027        self.check(os.read, 1)
2028
2029    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2030    def test_readv(self):
2031        buf = bytearray(10)
2032        self.check(os.readv, [buf])
2033
2034    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
2035    def test_tcsetpgrpt(self):
2036        self.check(os.tcsetpgrp, 0)
2037
2038    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
2039    def test_write(self):
2040        self.check(os.write, b" ")
2041
2042    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2043    def test_writev(self):
2044        self.check(os.writev, [b'abc'])
2045
2046    def test_inheritable(self):
2047        self.check(os.get_inheritable)
2048        self.check(os.set_inheritable, True)
2049
2050    @unittest.skipUnless(hasattr(os, 'get_blocking'),
2051                         'needs os.get_blocking() and os.set_blocking()')
2052    def test_blocking(self):
2053        self.check(os.get_blocking)
2054        self.check(os.set_blocking, True)
2055
2056
2057class LinkTests(unittest.TestCase):
2058    def setUp(self):
2059        self.file1 = support.TESTFN
2060        self.file2 = os.path.join(support.TESTFN + "2")
2061
2062    def tearDown(self):
2063        for file in (self.file1, self.file2):
2064            if os.path.exists(file):
2065                os.unlink(file)
2066
2067    def _test_link(self, file1, file2):
2068        create_file(file1)
2069
2070        try:
2071            os.link(file1, file2)
2072        except PermissionError as e:
2073            self.skipTest('os.link(): %s' % e)
2074        with open(file1, "r") as f1, open(file2, "r") as f2:
2075            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2076
2077    def test_link(self):
2078        self._test_link(self.file1, self.file2)
2079
2080    def test_link_bytes(self):
2081        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2082                        bytes(self.file2, sys.getfilesystemencoding()))
2083
2084    def test_unicode_name(self):
2085        try:
2086            os.fsencode("\xf1")
2087        except UnicodeError:
2088            raise unittest.SkipTest("Unable to encode for this platform.")
2089
2090        self.file1 += "\xf1"
2091        self.file2 = self.file1 + "2"
2092        self._test_link(self.file1, self.file2)
2093
2094@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2095class PosixUidGidTests(unittest.TestCase):
2096    # uid_t and gid_t are 32-bit unsigned integers on Linux
2097    UID_OVERFLOW = (1 << 32)
2098    GID_OVERFLOW = (1 << 32)
2099
2100    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2101    def test_setuid(self):
2102        if os.getuid() != 0:
2103            self.assertRaises(OSError, os.setuid, 0)
2104        self.assertRaises(TypeError, os.setuid, 'not an int')
2105        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
2106
2107    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2108    def test_setgid(self):
2109        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2110            self.assertRaises(OSError, os.setgid, 0)
2111        self.assertRaises(TypeError, os.setgid, 'not an int')
2112        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
2113
2114    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2115    def test_seteuid(self):
2116        if os.getuid() != 0:
2117            self.assertRaises(OSError, os.seteuid, 0)
2118        self.assertRaises(TypeError, os.setegid, 'not an int')
2119        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
2120
2121    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2122    def test_setegid(self):
2123        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2124            self.assertRaises(OSError, os.setegid, 0)
2125        self.assertRaises(TypeError, os.setegid, 'not an int')
2126        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
2127
2128    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2129    def test_setreuid(self):
2130        if os.getuid() != 0:
2131            self.assertRaises(OSError, os.setreuid, 0, 0)
2132        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2133        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2134        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2135        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
2136
2137    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2138    def test_setreuid_neg1(self):
2139        # Needs to accept -1.  We run this in a subprocess to avoid
2140        # altering the test runner's process state (issue8045).
2141        subprocess.check_call([
2142                sys.executable, '-c',
2143                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
2144
2145    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2146    def test_setregid(self):
2147        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2148            self.assertRaises(OSError, os.setregid, 0, 0)
2149        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2150        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2151        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2152        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
2153
2154    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2155    def test_setregid_neg1(self):
2156        # Needs to accept -1.  We run this in a subprocess to avoid
2157        # altering the test runner's process state (issue8045).
2158        subprocess.check_call([
2159                sys.executable, '-c',
2160                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
2161
2162@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2163class Pep383Tests(unittest.TestCase):
2164    def setUp(self):
2165        if support.TESTFN_UNENCODABLE:
2166            self.dir = support.TESTFN_UNENCODABLE
2167        elif support.TESTFN_NONASCII:
2168            self.dir = support.TESTFN_NONASCII
2169        else:
2170            self.dir = support.TESTFN
2171        self.bdir = os.fsencode(self.dir)
2172
2173        bytesfn = []
2174        def add_filename(fn):
2175            try:
2176                fn = os.fsencode(fn)
2177            except UnicodeEncodeError:
2178                return
2179            bytesfn.append(fn)
2180        add_filename(support.TESTFN_UNICODE)
2181        if support.TESTFN_UNENCODABLE:
2182            add_filename(support.TESTFN_UNENCODABLE)
2183        if support.TESTFN_NONASCII:
2184            add_filename(support.TESTFN_NONASCII)
2185        if not bytesfn:
2186            self.skipTest("couldn't create any non-ascii filename")
2187
2188        self.unicodefn = set()
2189        os.mkdir(self.dir)
2190        try:
2191            for fn in bytesfn:
2192                support.create_empty_file(os.path.join(self.bdir, fn))
2193                fn = os.fsdecode(fn)
2194                if fn in self.unicodefn:
2195                    raise ValueError("duplicate filename")
2196                self.unicodefn.add(fn)
2197        except:
2198            shutil.rmtree(self.dir)
2199            raise
2200
2201    def tearDown(self):
2202        shutil.rmtree(self.dir)
2203
2204    def test_listdir(self):
2205        expected = self.unicodefn
2206        found = set(os.listdir(self.dir))
2207        self.assertEqual(found, expected)
2208        # test listdir without arguments
2209        current_directory = os.getcwd()
2210        try:
2211            os.chdir(os.sep)
2212            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2213        finally:
2214            os.chdir(current_directory)
2215
2216    def test_open(self):
2217        for fn in self.unicodefn:
2218            f = open(os.path.join(self.dir, fn), 'rb')
2219            f.close()
2220
2221    @unittest.skipUnless(hasattr(os, 'statvfs'),
2222                            "need os.statvfs()")
2223    def test_statvfs(self):
2224        # issue #9645
2225        for fn in self.unicodefn:
2226            # should not fail with file not found error
2227            fullname = os.path.join(self.dir, fn)
2228            os.statvfs(fullname)
2229
2230    def test_stat(self):
2231        for fn in self.unicodefn:
2232            os.stat(os.path.join(self.dir, fn))
2233
2234@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2235class Win32KillTests(unittest.TestCase):
2236    def _kill(self, sig):
2237        # Start sys.executable as a subprocess and communicate from the
2238        # subprocess to the parent that the interpreter is ready. When it
2239        # becomes ready, send *sig* via os.kill to the subprocess and check
2240        # that the return code is equal to *sig*.
2241        import ctypes
2242        from ctypes import wintypes
2243        import msvcrt
2244
2245        # Since we can't access the contents of the process' stdout until the
2246        # process has exited, use PeekNamedPipe to see what's inside stdout
2247        # without waiting. This is done so we can tell that the interpreter
2248        # is started and running at a point where it could handle a signal.
2249        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2250        PeekNamedPipe.restype = wintypes.BOOL
2251        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2252                                  ctypes.POINTER(ctypes.c_char), # stdout buf
2253                                  wintypes.DWORD, # Buffer size
2254                                  ctypes.POINTER(wintypes.DWORD), # bytes read
2255                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
2256                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
2257        msg = "running"
2258        proc = subprocess.Popen([sys.executable, "-c",
2259                                 "import sys;"
2260                                 "sys.stdout.write('{}');"
2261                                 "sys.stdout.flush();"
2262                                 "input()".format(msg)],
2263                                stdout=subprocess.PIPE,
2264                                stderr=subprocess.PIPE,
2265                                stdin=subprocess.PIPE)
2266        self.addCleanup(proc.stdout.close)
2267        self.addCleanup(proc.stderr.close)
2268        self.addCleanup(proc.stdin.close)
2269
2270        count, max = 0, 100
2271        while count < max and proc.poll() is None:
2272            # Create a string buffer to store the result of stdout from the pipe
2273            buf = ctypes.create_string_buffer(len(msg))
2274            # Obtain the text currently in proc.stdout
2275            # Bytes read/avail/left are left as NULL and unused
2276            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2277                                 buf, ctypes.sizeof(buf), None, None, None)
2278            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2279            if buf.value:
2280                self.assertEqual(msg, buf.value.decode())
2281                break
2282            time.sleep(0.1)
2283            count += 1
2284        else:
2285            self.fail("Did not receive communication from the subprocess")
2286
2287        os.kill(proc.pid, sig)
2288        self.assertEqual(proc.wait(), sig)
2289
2290    def test_kill_sigterm(self):
2291        # SIGTERM doesn't mean anything special, but make sure it works
2292        self._kill(signal.SIGTERM)
2293
2294    def test_kill_int(self):
2295        # os.kill on Windows can take an int which gets set as the exit code
2296        self._kill(100)
2297
2298    def _kill_with_event(self, event, name):
2299        tagname = "test_os_%s" % uuid.uuid1()
2300        m = mmap.mmap(-1, 1, tagname)
2301        m[0] = 0
2302        # Run a script which has console control handling enabled.
2303        proc = subprocess.Popen([sys.executable,
2304                   os.path.join(os.path.dirname(__file__),
2305                                "win_console_handler.py"), tagname],
2306                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2307        # Let the interpreter startup before we send signals. See #3137.
2308        count, max = 0, 100
2309        while count < max and proc.poll() is None:
2310            if m[0] == 1:
2311                break
2312            time.sleep(0.1)
2313            count += 1
2314        else:
2315            # Forcefully kill the process if we weren't able to signal it.
2316            os.kill(proc.pid, signal.SIGINT)
2317            self.fail("Subprocess didn't finish initialization")
2318        os.kill(proc.pid, event)
2319        # proc.send_signal(event) could also be done here.
2320        # Allow time for the signal to be passed and the process to exit.
2321        time.sleep(0.5)
2322        if not proc.poll():
2323            # Forcefully kill the process if we weren't able to signal it.
2324            os.kill(proc.pid, signal.SIGINT)
2325            self.fail("subprocess did not stop on {}".format(name))
2326
2327    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2328    def test_CTRL_C_EVENT(self):
2329        from ctypes import wintypes
2330        import ctypes
2331
2332        # Make a NULL value by creating a pointer with no argument.
2333        NULL = ctypes.POINTER(ctypes.c_int)()
2334        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2335        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2336                                          wintypes.BOOL)
2337        SetConsoleCtrlHandler.restype = wintypes.BOOL
2338
2339        # Calling this with NULL and FALSE causes the calling process to
2340        # handle Ctrl+C, rather than ignore it. This property is inherited
2341        # by subprocesses.
2342        SetConsoleCtrlHandler(NULL, 0)
2343
2344        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2345
2346    def test_CTRL_BREAK_EVENT(self):
2347        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2348
2349
2350@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2351class Win32ListdirTests(unittest.TestCase):
2352    """Test listdir on Windows."""
2353
2354    def setUp(self):
2355        self.created_paths = []
2356        for i in range(2):
2357            dir_name = 'SUB%d' % i
2358            dir_path = os.path.join(support.TESTFN, dir_name)
2359            file_name = 'FILE%d' % i
2360            file_path = os.path.join(support.TESTFN, file_name)
2361            os.makedirs(dir_path)
2362            with open(file_path, 'w', encoding='utf-8') as f:
2363                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2364            self.created_paths.extend([dir_name, file_name])
2365        self.created_paths.sort()
2366
2367    def tearDown(self):
2368        shutil.rmtree(support.TESTFN)
2369
2370    def test_listdir_no_extended_path(self):
2371        """Test when the path is not an "extended" path."""
2372        # unicode
2373        self.assertEqual(
2374                sorted(os.listdir(support.TESTFN)),
2375                self.created_paths)
2376
2377        # bytes
2378        self.assertEqual(
2379                sorted(os.listdir(os.fsencode(support.TESTFN))),
2380                [os.fsencode(path) for path in self.created_paths])
2381
2382    def test_listdir_extended_path(self):
2383        """Test when the path starts with '\\\\?\\'."""
2384        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2385        # unicode
2386        path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2387        self.assertEqual(
2388                sorted(os.listdir(path)),
2389                self.created_paths)
2390
2391        # bytes
2392        path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2393        self.assertEqual(
2394                sorted(os.listdir(path)),
2395                [os.fsencode(path) for path in self.created_paths])
2396
2397
2398@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2399class ReadlinkTests(unittest.TestCase):
2400    filelink = 'readlinktest'
2401    filelink_target = os.path.abspath(__file__)
2402    filelinkb = os.fsencode(filelink)
2403    filelinkb_target = os.fsencode(filelink_target)
2404
2405    def assertPathEqual(self, left, right):
2406        left = os.path.normcase(left)
2407        right = os.path.normcase(right)
2408        if sys.platform == 'win32':
2409            # Bad practice to blindly strip the prefix as it may be required to
2410            # correctly refer to the file, but we're only comparing paths here.
2411            has_prefix = lambda p: p.startswith(
2412                b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2413            if has_prefix(left):
2414                left = left[4:]
2415            if has_prefix(right):
2416                right = right[4:]
2417        self.assertEqual(left, right)
2418
2419    def setUp(self):
2420        self.assertTrue(os.path.exists(self.filelink_target))
2421        self.assertTrue(os.path.exists(self.filelinkb_target))
2422        self.assertFalse(os.path.exists(self.filelink))
2423        self.assertFalse(os.path.exists(self.filelinkb))
2424
2425    def test_not_symlink(self):
2426        filelink_target = FakePath(self.filelink_target)
2427        self.assertRaises(OSError, os.readlink, self.filelink_target)
2428        self.assertRaises(OSError, os.readlink, filelink_target)
2429
2430    def test_missing_link(self):
2431        self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2432        self.assertRaises(FileNotFoundError, os.readlink,
2433                          FakePath('missing-link'))
2434
2435    @support.skip_unless_symlink
2436    def test_pathlike(self):
2437        os.symlink(self.filelink_target, self.filelink)
2438        self.addCleanup(support.unlink, self.filelink)
2439        filelink = FakePath(self.filelink)
2440        self.assertPathEqual(os.readlink(filelink), self.filelink_target)
2441
2442    @support.skip_unless_symlink
2443    def test_pathlike_bytes(self):
2444        os.symlink(self.filelinkb_target, self.filelinkb)
2445        self.addCleanup(support.unlink, self.filelinkb)
2446        path = os.readlink(FakePath(self.filelinkb))
2447        self.assertPathEqual(path, self.filelinkb_target)
2448        self.assertIsInstance(path, bytes)
2449
2450    @support.skip_unless_symlink
2451    def test_bytes(self):
2452        os.symlink(self.filelinkb_target, self.filelinkb)
2453        self.addCleanup(support.unlink, self.filelinkb)
2454        path = os.readlink(self.filelinkb)
2455        self.assertPathEqual(path, self.filelinkb_target)
2456        self.assertIsInstance(path, bytes)
2457
2458
2459@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2460@support.skip_unless_symlink
2461class Win32SymlinkTests(unittest.TestCase):
2462    filelink = 'filelinktest'
2463    filelink_target = os.path.abspath(__file__)
2464    dirlink = 'dirlinktest'
2465    dirlink_target = os.path.dirname(filelink_target)
2466    missing_link = 'missing link'
2467
2468    def setUp(self):
2469        assert os.path.exists(self.dirlink_target)
2470        assert os.path.exists(self.filelink_target)
2471        assert not os.path.exists(self.dirlink)
2472        assert not os.path.exists(self.filelink)
2473        assert not os.path.exists(self.missing_link)
2474
2475    def tearDown(self):
2476        if os.path.exists(self.filelink):
2477            os.remove(self.filelink)
2478        if os.path.exists(self.dirlink):
2479            os.rmdir(self.dirlink)
2480        if os.path.lexists(self.missing_link):
2481            os.remove(self.missing_link)
2482
2483    def test_directory_link(self):
2484        os.symlink(self.dirlink_target, self.dirlink)
2485        self.assertTrue(os.path.exists(self.dirlink))
2486        self.assertTrue(os.path.isdir(self.dirlink))
2487        self.assertTrue(os.path.islink(self.dirlink))
2488        self.check_stat(self.dirlink, self.dirlink_target)
2489
2490    def test_file_link(self):
2491        os.symlink(self.filelink_target, self.filelink)
2492        self.assertTrue(os.path.exists(self.filelink))
2493        self.assertTrue(os.path.isfile(self.filelink))
2494        self.assertTrue(os.path.islink(self.filelink))
2495        self.check_stat(self.filelink, self.filelink_target)
2496
2497    def _create_missing_dir_link(self):
2498        'Create a "directory" link to a non-existent target'
2499        linkname = self.missing_link
2500        if os.path.lexists(linkname):
2501            os.remove(linkname)
2502        target = r'c:\\target does not exist.29r3c740'
2503        assert not os.path.exists(target)
2504        target_is_dir = True
2505        os.symlink(target, linkname, target_is_dir)
2506
2507    def test_remove_directory_link_to_missing_target(self):
2508        self._create_missing_dir_link()
2509        # For compatibility with Unix, os.remove will check the
2510        #  directory status and call RemoveDirectory if the symlink
2511        #  was created with target_is_dir==True.
2512        os.remove(self.missing_link)
2513
2514    def test_isdir_on_directory_link_to_missing_target(self):
2515        self._create_missing_dir_link()
2516        self.assertFalse(os.path.isdir(self.missing_link))
2517
2518    def test_rmdir_on_directory_link_to_missing_target(self):
2519        self._create_missing_dir_link()
2520        os.rmdir(self.missing_link)
2521
2522    def check_stat(self, link, target):
2523        self.assertEqual(os.stat(link), os.stat(target))
2524        self.assertNotEqual(os.lstat(link), os.stat(link))
2525
2526        bytes_link = os.fsencode(link)
2527        self.assertEqual(os.stat(bytes_link), os.stat(target))
2528        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2529
2530    def test_12084(self):
2531        level1 = os.path.abspath(support.TESTFN)
2532        level2 = os.path.join(level1, "level2")
2533        level3 = os.path.join(level2, "level3")
2534        self.addCleanup(support.rmtree, level1)
2535
2536        os.mkdir(level1)
2537        os.mkdir(level2)
2538        os.mkdir(level3)
2539
2540        file1 = os.path.abspath(os.path.join(level1, "file1"))
2541        create_file(file1)
2542
2543        orig_dir = os.getcwd()
2544        try:
2545            os.chdir(level2)
2546            link = os.path.join(level2, "link")
2547            os.symlink(os.path.relpath(file1), "link")
2548            self.assertIn("link", os.listdir(os.getcwd()))
2549
2550            # Check os.stat calls from the same dir as the link
2551            self.assertEqual(os.stat(file1), os.stat("link"))
2552
2553            # Check os.stat calls from a dir below the link
2554            os.chdir(level1)
2555            self.assertEqual(os.stat(file1),
2556                             os.stat(os.path.relpath(link)))
2557
2558            # Check os.stat calls from a dir above the link
2559            os.chdir(level3)
2560            self.assertEqual(os.stat(file1),
2561                             os.stat(os.path.relpath(link)))
2562        finally:
2563            os.chdir(orig_dir)
2564
2565    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2566                            and os.path.exists(r'C:\ProgramData'),
2567                            'Test directories not found')
2568    def test_29248(self):
2569        # os.symlink() calls CreateSymbolicLink, which creates
2570        # the reparse data buffer with the print name stored
2571        # first, so the offset is always 0. CreateSymbolicLink
2572        # stores the "PrintName" DOS path (e.g. "C:\") first,
2573        # with an offset of 0, followed by the "SubstituteName"
2574        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2575        # the other hand, seems to have been created manually
2576        # with an inverted order.
2577        target = os.readlink(r'C:\Users\All Users')
2578        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2579
2580    def test_buffer_overflow(self):
2581        # Older versions would have a buffer overflow when detecting
2582        # whether a link source was a directory. This test ensures we
2583        # no longer crash, but does not otherwise validate the behavior
2584        segment = 'X' * 27
2585        path = os.path.join(*[segment] * 10)
2586        test_cases = [
2587            # overflow with absolute src
2588            ('\\' + path, segment),
2589            # overflow dest with relative src
2590            (segment, path),
2591            # overflow when joining src
2592            (path[:180], path[:180]),
2593        ]
2594        for src, dest in test_cases:
2595            try:
2596                os.symlink(src, dest)
2597            except FileNotFoundError:
2598                pass
2599            else:
2600                try:
2601                    os.remove(dest)
2602                except OSError:
2603                    pass
2604            # Also test with bytes, since that is a separate code path.
2605            try:
2606                os.symlink(os.fsencode(src), os.fsencode(dest))
2607            except FileNotFoundError:
2608                pass
2609            else:
2610                try:
2611                    os.remove(dest)
2612                except OSError:
2613                    pass
2614
2615    def test_appexeclink(self):
2616        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2617        if not os.path.isdir(root):
2618            self.skipTest("test requires a WindowsApps directory")
2619
2620        aliases = [os.path.join(root, a)
2621                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
2622
2623        for alias in aliases:
2624            if support.verbose:
2625                print()
2626                print("Testing with", alias)
2627            st = os.lstat(alias)
2628            self.assertEqual(st, os.stat(alias))
2629            self.assertFalse(stat.S_ISLNK(st.st_mode))
2630            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2631            # testing the first one we see is sufficient
2632            break
2633        else:
2634            self.skipTest("test requires an app execution alias")
2635
2636@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2637class Win32JunctionTests(unittest.TestCase):
2638    junction = 'junctiontest'
2639    junction_target = os.path.dirname(os.path.abspath(__file__))
2640
2641    def setUp(self):
2642        assert os.path.exists(self.junction_target)
2643        assert not os.path.lexists(self.junction)
2644
2645    def tearDown(self):
2646        if os.path.lexists(self.junction):
2647            os.unlink(self.junction)
2648
2649    def test_create_junction(self):
2650        _winapi.CreateJunction(self.junction_target, self.junction)
2651        self.assertTrue(os.path.lexists(self.junction))
2652        self.assertTrue(os.path.exists(self.junction))
2653        self.assertTrue(os.path.isdir(self.junction))
2654        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2655        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
2656
2657        # bpo-37834: Junctions are not recognized as links.
2658        self.assertFalse(os.path.islink(self.junction))
2659        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2660                         os.path.normcase(os.readlink(self.junction)))
2661
2662    def test_unlink_removes_junction(self):
2663        _winapi.CreateJunction(self.junction_target, self.junction)
2664        self.assertTrue(os.path.exists(self.junction))
2665        self.assertTrue(os.path.lexists(self.junction))
2666
2667        os.unlink(self.junction)
2668        self.assertFalse(os.path.exists(self.junction))
2669
2670@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2671class Win32NtTests(unittest.TestCase):
2672    def test_getfinalpathname_handles(self):
2673        nt = support.import_module('nt')
2674        ctypes = support.import_module('ctypes')
2675        import ctypes.wintypes
2676
2677        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2678        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2679
2680        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2681        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2682                                                 ctypes.wintypes.LPDWORD)
2683
2684        # This is a pseudo-handle that doesn't need to be closed
2685        hproc = kernel.GetCurrentProcess()
2686
2687        handle_count = ctypes.wintypes.DWORD()
2688        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2689        self.assertEqual(1, ok)
2690
2691        before_count = handle_count.value
2692
2693        # The first two test the error path, __file__ tests the success path
2694        filenames = [
2695            r'\\?\C:',
2696            r'\\?\NUL',
2697            r'\\?\CONIN',
2698            __file__,
2699        ]
2700
2701        for _ in range(10):
2702            for name in filenames:
2703                try:
2704                    nt._getfinalpathname(name)
2705                except Exception:
2706                    # Failure is expected
2707                    pass
2708                try:
2709                    os.stat(name)
2710                except Exception:
2711                    pass
2712
2713        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2714        self.assertEqual(1, ok)
2715
2716        handle_delta = handle_count.value - before_count
2717
2718        self.assertEqual(0, handle_delta)
2719
2720@support.skip_unless_symlink
2721class NonLocalSymlinkTests(unittest.TestCase):
2722
2723    def setUp(self):
2724        r"""
2725        Create this structure:
2726
2727        base
2728         \___ some_dir
2729        """
2730        os.makedirs('base/some_dir')
2731
2732    def tearDown(self):
2733        shutil.rmtree('base')
2734
2735    def test_directory_link_nonlocal(self):
2736        """
2737        The symlink target should resolve relative to the link, not relative
2738        to the current directory.
2739
2740        Then, link base/some_link -> base/some_dir and ensure that some_link
2741        is resolved as a directory.
2742
2743        In issue13772, it was discovered that directory detection failed if
2744        the symlink target was not specified relative to the current
2745        directory, which was a defect in the implementation.
2746        """
2747        src = os.path.join('base', 'some_link')
2748        os.symlink('some_dir', src)
2749        assert os.path.isdir(src)
2750
2751
2752class FSEncodingTests(unittest.TestCase):
2753    def test_nop(self):
2754        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2755        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
2756
2757    def test_identity(self):
2758        # assert fsdecode(fsencode(x)) == x
2759        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2760            try:
2761                bytesfn = os.fsencode(fn)
2762            except UnicodeEncodeError:
2763                continue
2764            self.assertEqual(os.fsdecode(bytesfn), fn)
2765
2766
2767
2768class DeviceEncodingTests(unittest.TestCase):
2769
2770    def test_bad_fd(self):
2771        # Return None when an fd doesn't actually exist.
2772        self.assertIsNone(os.device_encoding(123456))
2773
2774    @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
2775            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
2776            'test requires a tty and either Windows or nl_langinfo(CODESET)')
2777    def test_device_encoding(self):
2778        encoding = os.device_encoding(0)
2779        self.assertIsNotNone(encoding)
2780        self.assertTrue(codecs.lookup(encoding))
2781
2782
2783class PidTests(unittest.TestCase):
2784    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2785    def test_getppid(self):
2786        p = subprocess.Popen([sys.executable, '-c',
2787                              'import os; print(os.getppid())'],
2788                             stdout=subprocess.PIPE)
2789        stdout, _ = p.communicate()
2790        # We are the parent of our subprocess
2791        self.assertEqual(int(stdout), os.getpid())
2792
2793    def check_waitpid(self, code, exitcode, callback=None):
2794        if sys.platform == 'win32':
2795            # On Windows, os.spawnv() simply joins arguments with spaces:
2796            # arguments need to be quoted
2797            args = [f'"{sys.executable}"', '-c', f'"{code}"']
2798        else:
2799            args = [sys.executable, '-c', code]
2800        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
2801
2802        if callback is not None:
2803            callback(pid)
2804
2805        # don't use support.wait_process() to test directly os.waitpid()
2806        # and os.waitstatus_to_exitcode()
2807        pid2, status = os.waitpid(pid, 0)
2808        self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2809        self.assertEqual(pid2, pid)
2810
2811    def test_waitpid(self):
2812        self.check_waitpid(code='pass', exitcode=0)
2813
2814    def test_waitstatus_to_exitcode(self):
2815        exitcode = 23
2816        code = f'import sys; sys.exit({exitcode})'
2817        self.check_waitpid(code, exitcode=exitcode)
2818
2819        with self.assertRaises(TypeError):
2820            os.waitstatus_to_exitcode(0.0)
2821
2822    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2823    def test_waitpid_windows(self):
2824        # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2825        # with exit code larger than INT_MAX.
2826        STATUS_CONTROL_C_EXIT = 0xC000013A
2827        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2828        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2829
2830    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2831    def test_waitstatus_to_exitcode_windows(self):
2832        max_exitcode = 2 ** 32 - 1
2833        for exitcode in (0, 1, 5, max_exitcode):
2834            self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2835                             exitcode)
2836
2837        # invalid values
2838        with self.assertRaises(ValueError):
2839            os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2840        with self.assertRaises(OverflowError):
2841            os.waitstatus_to_exitcode(-1)
2842
2843    # Skip the test on Windows
2844    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2845    def test_waitstatus_to_exitcode_kill(self):
2846        code = f'import time; time.sleep({support.LONG_TIMEOUT})'
2847        signum = signal.SIGKILL
2848
2849        def kill_process(pid):
2850            os.kill(pid, signum)
2851
2852        self.check_waitpid(code, exitcode=-signum, callback=kill_process)
2853
2854
2855class SpawnTests(unittest.TestCase):
2856    def create_args(self, *, with_env=False, use_bytes=False):
2857        self.exitcode = 17
2858
2859        filename = support.TESTFN
2860        self.addCleanup(support.unlink, filename)
2861
2862        if not with_env:
2863            code = 'import sys; sys.exit(%s)' % self.exitcode
2864        else:
2865            self.env = dict(os.environ)
2866            # create an unique key
2867            self.key = str(uuid.uuid4())
2868            self.env[self.key] = self.key
2869            # read the variable from os.environ to check that it exists
2870            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2871                    % (self.key, self.exitcode))
2872
2873        with open(filename, "w") as fp:
2874            fp.write(code)
2875
2876        args = [sys.executable, filename]
2877        if use_bytes:
2878            args = [os.fsencode(a) for a in args]
2879            self.env = {os.fsencode(k): os.fsencode(v)
2880                        for k, v in self.env.items()}
2881
2882        return args
2883
2884    @requires_os_func('spawnl')
2885    def test_spawnl(self):
2886        args = self.create_args()
2887        exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2888        self.assertEqual(exitcode, self.exitcode)
2889
2890    @requires_os_func('spawnle')
2891    def test_spawnle(self):
2892        args = self.create_args(with_env=True)
2893        exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2894        self.assertEqual(exitcode, self.exitcode)
2895
2896    @requires_os_func('spawnlp')
2897    def test_spawnlp(self):
2898        args = self.create_args()
2899        exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2900        self.assertEqual(exitcode, self.exitcode)
2901
2902    @requires_os_func('spawnlpe')
2903    def test_spawnlpe(self):
2904        args = self.create_args(with_env=True)
2905        exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2906        self.assertEqual(exitcode, self.exitcode)
2907
2908    @requires_os_func('spawnv')
2909    def test_spawnv(self):
2910        args = self.create_args()
2911        exitcode = os.spawnv(os.P_WAIT, args[0], args)
2912        self.assertEqual(exitcode, self.exitcode)
2913
2914        # Test for PyUnicode_FSConverter()
2915        exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
2916        self.assertEqual(exitcode, self.exitcode)
2917
2918    @requires_os_func('spawnve')
2919    def test_spawnve(self):
2920        args = self.create_args(with_env=True)
2921        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2922        self.assertEqual(exitcode, self.exitcode)
2923
2924    @requires_os_func('spawnvp')
2925    def test_spawnvp(self):
2926        args = self.create_args()
2927        exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2928        self.assertEqual(exitcode, self.exitcode)
2929
2930    @requires_os_func('spawnvpe')
2931    def test_spawnvpe(self):
2932        args = self.create_args(with_env=True)
2933        exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2934        self.assertEqual(exitcode, self.exitcode)
2935
2936    @requires_os_func('spawnv')
2937    def test_nowait(self):
2938        args = self.create_args()
2939        pid = os.spawnv(os.P_NOWAIT, args[0], args)
2940        support.wait_process(pid, exitcode=self.exitcode)
2941
2942    @requires_os_func('spawnve')
2943    def test_spawnve_bytes(self):
2944        # Test bytes handling in parse_arglist and parse_envlist (#28114)
2945        args = self.create_args(with_env=True, use_bytes=True)
2946        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2947        self.assertEqual(exitcode, self.exitcode)
2948
2949    @requires_os_func('spawnl')
2950    def test_spawnl_noargs(self):
2951        args = self.create_args()
2952        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
2953        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
2954
2955    @requires_os_func('spawnle')
2956    def test_spawnle_noargs(self):
2957        args = self.create_args()
2958        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
2959        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
2960
2961    @requires_os_func('spawnv')
2962    def test_spawnv_noargs(self):
2963        args = self.create_args()
2964        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2965        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
2966        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2967        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
2968
2969    @requires_os_func('spawnve')
2970    def test_spawnve_noargs(self):
2971        args = self.create_args()
2972        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2973        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
2974        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2975        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
2976
2977    def _test_invalid_env(self, spawn):
2978        args = [sys.executable, '-c', 'pass']
2979
2980        # null character in the environment variable name
2981        newenv = os.environ.copy()
2982        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2983        try:
2984            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2985        except ValueError:
2986            pass
2987        else:
2988            self.assertEqual(exitcode, 127)
2989
2990        # null character in the environment variable value
2991        newenv = os.environ.copy()
2992        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2993        try:
2994            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
2995        except ValueError:
2996            pass
2997        else:
2998            self.assertEqual(exitcode, 127)
2999
3000        # equal character in the environment variable name
3001        newenv = os.environ.copy()
3002        newenv["FRUIT=ORANGE"] = "lemon"
3003        try:
3004            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3005        except ValueError:
3006            pass
3007        else:
3008            self.assertEqual(exitcode, 127)
3009
3010        # equal character in the environment variable value
3011        filename = support.TESTFN
3012        self.addCleanup(support.unlink, filename)
3013        with open(filename, "w") as fp:
3014            fp.write('import sys, os\n'
3015                     'if os.getenv("FRUIT") != "orange=lemon":\n'
3016                     '    raise AssertionError')
3017        args = [sys.executable, filename]
3018        newenv = os.environ.copy()
3019        newenv["FRUIT"] = "orange=lemon"
3020        exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3021        self.assertEqual(exitcode, 0)
3022
3023    @requires_os_func('spawnve')
3024    def test_spawnve_invalid_env(self):
3025        self._test_invalid_env(os.spawnve)
3026
3027    @requires_os_func('spawnvpe')
3028    def test_spawnvpe_invalid_env(self):
3029        self._test_invalid_env(os.spawnvpe)
3030
3031
3032# The introduction of this TestCase caused at least two different errors on
3033# *nix buildbots. Temporarily skip this to let the buildbots move along.
3034@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
3035@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3036class LoginTests(unittest.TestCase):
3037    def test_getlogin(self):
3038        user_name = os.getlogin()
3039        self.assertNotEqual(len(user_name), 0)
3040
3041
3042@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3043                     "needs os.getpriority and os.setpriority")
3044class ProgramPriorityTests(unittest.TestCase):
3045    """Tests for os.getpriority() and os.setpriority()."""
3046
3047    def test_set_get_priority(self):
3048
3049        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3050        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3051        try:
3052            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3053            if base >= 19 and new_prio <= 19:
3054                raise unittest.SkipTest("unable to reliably test setpriority "
3055                                        "at current nice level of %s" % base)
3056            else:
3057                self.assertEqual(new_prio, base + 1)
3058        finally:
3059            try:
3060                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3061            except OSError as err:
3062                if err.errno != errno.EACCES:
3063                    raise
3064
3065
3066class SendfileTestServer(asyncore.dispatcher, threading.Thread):
3067
3068    class Handler(asynchat.async_chat):
3069
3070        def __init__(self, conn):
3071            asynchat.async_chat.__init__(self, conn)
3072            self.in_buffer = []
3073            self.accumulate = True
3074            self.closed = False
3075            self.push(b"220 ready\r\n")
3076
3077        def handle_read(self):
3078            data = self.recv(4096)
3079            if self.accumulate:
3080                self.in_buffer.append(data)
3081
3082        def get_data(self):
3083            return b''.join(self.in_buffer)
3084
3085        def handle_close(self):
3086            self.close()
3087            self.closed = True
3088
3089        def handle_error(self):
3090            raise
3091
3092    def __init__(self, address):
3093        threading.Thread.__init__(self)
3094        asyncore.dispatcher.__init__(self)
3095        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3096        self.bind(address)
3097        self.listen(5)
3098        self.host, self.port = self.socket.getsockname()[:2]
3099        self.handler_instance = None
3100        self._active = False
3101        self._active_lock = threading.Lock()
3102
3103    # --- public API
3104
3105    @property
3106    def running(self):
3107        return self._active
3108
3109    def start(self):
3110        assert not self.running
3111        self.__flag = threading.Event()
3112        threading.Thread.start(self)
3113        self.__flag.wait()
3114
3115    def stop(self):
3116        assert self.running
3117        self._active = False
3118        self.join()
3119
3120    def wait(self):
3121        # wait for handler connection to be closed, then stop the server
3122        while not getattr(self.handler_instance, "closed", False):
3123            time.sleep(0.001)
3124        self.stop()
3125
3126    # --- internals
3127
3128    def run(self):
3129        self._active = True
3130        self.__flag.set()
3131        while self._active and asyncore.socket_map:
3132            self._active_lock.acquire()
3133            asyncore.loop(timeout=0.001, count=1)
3134            self._active_lock.release()
3135        asyncore.close_all()
3136
3137    def handle_accept(self):
3138        conn, addr = self.accept()
3139        self.handler_instance = self.Handler(conn)
3140
3141    def handle_connect(self):
3142        self.close()
3143    handle_read = handle_connect
3144
3145    def writable(self):
3146        return 0
3147
3148    def handle_error(self):
3149        raise
3150
3151
3152@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3153class TestSendfile(unittest.TestCase):
3154
3155    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
3156    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
3157                               not sys.platform.startswith("solaris") and \
3158                               not sys.platform.startswith("sunos")
3159    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3160            'requires headers and trailers support')
3161    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3162            'test is only meaningful on 32-bit builds')
3163
3164    @classmethod
3165    def setUpClass(cls):
3166        cls.key = support.threading_setup()
3167        create_file(support.TESTFN, cls.DATA)
3168
3169    @classmethod
3170    def tearDownClass(cls):
3171        support.threading_cleanup(*cls.key)
3172        support.unlink(support.TESTFN)
3173
3174    def setUp(self):
3175        self.server = SendfileTestServer((socket_helper.HOST, 0))
3176        self.server.start()
3177        self.client = socket.socket()
3178        self.client.connect((self.server.host, self.server.port))
3179        self.client.settimeout(1)
3180        # synchronize by waiting for "220 ready" response
3181        self.client.recv(1024)
3182        self.sockno = self.client.fileno()
3183        self.file = open(support.TESTFN, 'rb')
3184        self.fileno = self.file.fileno()
3185
3186    def tearDown(self):
3187        self.file.close()
3188        self.client.close()
3189        if self.server.running:
3190            self.server.stop()
3191        self.server = None
3192
3193    def sendfile_wrapper(self, *args, **kwargs):
3194        """A higher level wrapper representing how an application is
3195        supposed to use sendfile().
3196        """
3197        while True:
3198            try:
3199                return os.sendfile(*args, **kwargs)
3200            except OSError as err:
3201                if err.errno == errno.ECONNRESET:
3202                    # disconnected
3203                    raise
3204                elif err.errno in (errno.EAGAIN, errno.EBUSY):
3205                    # we have to retry send data
3206                    continue
3207                else:
3208                    raise
3209
3210    def test_send_whole_file(self):
3211        # normal send
3212        total_sent = 0
3213        offset = 0
3214        nbytes = 4096
3215        while total_sent < len(self.DATA):
3216            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3217            if sent == 0:
3218                break
3219            offset += sent
3220            total_sent += sent
3221            self.assertTrue(sent <= nbytes)
3222            self.assertEqual(offset, total_sent)
3223
3224        self.assertEqual(total_sent, len(self.DATA))
3225        self.client.shutdown(socket.SHUT_RDWR)
3226        self.client.close()
3227        self.server.wait()
3228        data = self.server.handler_instance.get_data()
3229        self.assertEqual(len(data), len(self.DATA))
3230        self.assertEqual(data, self.DATA)
3231
3232    def test_send_at_certain_offset(self):
3233        # start sending a file at a certain offset
3234        total_sent = 0
3235        offset = len(self.DATA) // 2
3236        must_send = len(self.DATA) - offset
3237        nbytes = 4096
3238        while total_sent < must_send:
3239            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3240            if sent == 0:
3241                break
3242            offset += sent
3243            total_sent += sent
3244            self.assertTrue(sent <= nbytes)
3245
3246        self.client.shutdown(socket.SHUT_RDWR)
3247        self.client.close()
3248        self.server.wait()
3249        data = self.server.handler_instance.get_data()
3250        expected = self.DATA[len(self.DATA) // 2:]
3251        self.assertEqual(total_sent, len(expected))
3252        self.assertEqual(len(data), len(expected))
3253        self.assertEqual(data, expected)
3254
3255    def test_offset_overflow(self):
3256        # specify an offset > file size
3257        offset = len(self.DATA) + 4096
3258        try:
3259            sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3260        except OSError as e:
3261            # Solaris can raise EINVAL if offset >= file length, ignore.
3262            if e.errno != errno.EINVAL:
3263                raise
3264        else:
3265            self.assertEqual(sent, 0)
3266        self.client.shutdown(socket.SHUT_RDWR)
3267        self.client.close()
3268        self.server.wait()
3269        data = self.server.handler_instance.get_data()
3270        self.assertEqual(data, b'')
3271
3272    def test_invalid_offset(self):
3273        with self.assertRaises(OSError) as cm:
3274            os.sendfile(self.sockno, self.fileno, -1, 4096)
3275        self.assertEqual(cm.exception.errno, errno.EINVAL)
3276
3277    def test_keywords(self):
3278        # Keyword arguments should be supported
3279        os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3280                    offset=0, count=4096)
3281        if self.SUPPORT_HEADERS_TRAILERS:
3282            os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3283                        offset=0, count=4096,
3284                        headers=(), trailers=(), flags=0)
3285
3286    # --- headers / trailers tests
3287
3288    @requires_headers_trailers
3289    def test_headers(self):
3290        total_sent = 0
3291        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
3292        sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
3293                            headers=[b"x" * 512, b"y" * 256])
3294        self.assertLessEqual(sent, 512 + 256 + 4096)
3295        total_sent += sent
3296        offset = 4096
3297        while total_sent < len(expected_data):
3298            nbytes = min(len(expected_data) - total_sent, 4096)
3299            sent = self.sendfile_wrapper(self.sockno, self.fileno,
3300                                                    offset, nbytes)
3301            if sent == 0:
3302                break
3303            self.assertLessEqual(sent, nbytes)
3304            total_sent += sent
3305            offset += sent
3306
3307        self.assertEqual(total_sent, len(expected_data))
3308        self.client.close()
3309        self.server.wait()
3310        data = self.server.handler_instance.get_data()
3311        self.assertEqual(hash(data), hash(expected_data))
3312
3313    @requires_headers_trailers
3314    def test_trailers(self):
3315        TESTFN2 = support.TESTFN + "2"
3316        file_data = b"abcdef"
3317
3318        self.addCleanup(support.unlink, TESTFN2)
3319        create_file(TESTFN2, file_data)
3320
3321        with open(TESTFN2, 'rb') as f:
3322            os.sendfile(self.sockno, f.fileno(), 0, 5,
3323                        trailers=[b"123456", b"789"])
3324            self.client.close()
3325            self.server.wait()
3326            data = self.server.handler_instance.get_data()
3327            self.assertEqual(data, b"abcde123456789")
3328
3329    @requires_headers_trailers
3330    @requires_32b
3331    def test_headers_overflow_32bits(self):
3332        self.server.handler_instance.accumulate = False
3333        with self.assertRaises(OSError) as cm:
3334            os.sendfile(self.sockno, self.fileno, 0, 0,
3335                        headers=[b"x" * 2**16] * 2**15)
3336        self.assertEqual(cm.exception.errno, errno.EINVAL)
3337
3338    @requires_headers_trailers
3339    @requires_32b
3340    def test_trailers_overflow_32bits(self):
3341        self.server.handler_instance.accumulate = False
3342        with self.assertRaises(OSError) as cm:
3343            os.sendfile(self.sockno, self.fileno, 0, 0,
3344                        trailers=[b"x" * 2**16] * 2**15)
3345        self.assertEqual(cm.exception.errno, errno.EINVAL)
3346
3347    @requires_headers_trailers
3348    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3349                         'test needs os.SF_NODISKIO')
3350    def test_flags(self):
3351        try:
3352            os.sendfile(self.sockno, self.fileno, 0, 4096,
3353                        flags=os.SF_NODISKIO)
3354        except OSError as err:
3355            if err.errno not in (errno.EBUSY, errno.EAGAIN):
3356                raise
3357
3358
3359def supports_extended_attributes():
3360    if not hasattr(os, "setxattr"):
3361        return False
3362
3363    try:
3364        with open(support.TESTFN, "xb", 0) as fp:
3365            try:
3366                os.setxattr(fp.fileno(), b"user.test", b"")
3367            except OSError:
3368                return False
3369    finally:
3370        support.unlink(support.TESTFN)
3371
3372    return True
3373
3374
3375@unittest.skipUnless(supports_extended_attributes(),
3376                     "no non-broken extended attribute support")
3377# Kernels < 2.6.39 don't respect setxattr flags.
3378@support.requires_linux_version(2, 6, 39)
3379class ExtendedAttributeTests(unittest.TestCase):
3380
3381    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
3382        fn = support.TESTFN
3383        self.addCleanup(support.unlink, fn)
3384        create_file(fn)
3385
3386        with self.assertRaises(OSError) as cm:
3387            getxattr(fn, s("user.test"), **kwargs)
3388        self.assertEqual(cm.exception.errno, errno.ENODATA)
3389
3390        init_xattr = listxattr(fn)
3391        self.assertIsInstance(init_xattr, list)
3392
3393        setxattr(fn, s("user.test"), b"", **kwargs)
3394        xattr = set(init_xattr)
3395        xattr.add("user.test")
3396        self.assertEqual(set(listxattr(fn)), xattr)
3397        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3398        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3399        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3400
3401        with self.assertRaises(OSError) as cm:
3402            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3403        self.assertEqual(cm.exception.errno, errno.EEXIST)
3404
3405        with self.assertRaises(OSError) as cm:
3406            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3407        self.assertEqual(cm.exception.errno, errno.ENODATA)
3408
3409        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3410        xattr.add("user.test2")
3411        self.assertEqual(set(listxattr(fn)), xattr)
3412        removexattr(fn, s("user.test"), **kwargs)
3413
3414        with self.assertRaises(OSError) as cm:
3415            getxattr(fn, s("user.test"), **kwargs)
3416        self.assertEqual(cm.exception.errno, errno.ENODATA)
3417
3418        xattr.remove("user.test")
3419        self.assertEqual(set(listxattr(fn)), xattr)
3420        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3421        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3422        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3423        removexattr(fn, s("user.test"), **kwargs)
3424        many = sorted("user.test{}".format(i) for i in range(100))
3425        for thing in many:
3426            setxattr(fn, thing, b"x", **kwargs)
3427        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3428
3429    def _check_xattrs(self, *args, **kwargs):
3430        self._check_xattrs_str(str, *args, **kwargs)
3431        support.unlink(support.TESTFN)
3432
3433        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3434        support.unlink(support.TESTFN)
3435
3436    def test_simple(self):
3437        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3438                           os.listxattr)
3439
3440    def test_lpath(self):
3441        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3442                           os.listxattr, follow_symlinks=False)
3443
3444    def test_fds(self):
3445        def getxattr(path, *args):
3446            with open(path, "rb") as fp:
3447                return os.getxattr(fp.fileno(), *args)
3448        def setxattr(path, *args):
3449            with open(path, "wb", 0) as fp:
3450                os.setxattr(fp.fileno(), *args)
3451        def removexattr(path, *args):
3452            with open(path, "wb", 0) as fp:
3453                os.removexattr(fp.fileno(), *args)
3454        def listxattr(path, *args):
3455            with open(path, "rb") as fp:
3456                return os.listxattr(fp.fileno(), *args)
3457        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3458
3459
3460@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3461class TermsizeTests(unittest.TestCase):
3462    def test_does_not_crash(self):
3463        """Check if get_terminal_size() returns a meaningful value.
3464
3465        There's no easy portable way to actually check the size of the
3466        terminal, so let's check if it returns something sensible instead.
3467        """
3468        try:
3469            size = os.get_terminal_size()
3470        except OSError as e:
3471            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3472                # Under win32 a generic OSError can be thrown if the
3473                # handle cannot be retrieved
3474                self.skipTest("failed to query terminal size")
3475            raise
3476
3477        self.assertGreaterEqual(size.columns, 0)
3478        self.assertGreaterEqual(size.lines, 0)
3479
3480    def test_stty_match(self):
3481        """Check if stty returns the same results
3482
3483        stty actually tests stdin, so get_terminal_size is invoked on
3484        stdin explicitly. If stty succeeded, then get_terminal_size()
3485        should work too.
3486        """
3487        try:
3488            size = (
3489                subprocess.check_output(
3490                    ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3491                ).split()
3492            )
3493        except (FileNotFoundError, subprocess.CalledProcessError,
3494                PermissionError):
3495            self.skipTest("stty invocation failed")
3496        expected = (int(size[1]), int(size[0])) # reversed order
3497
3498        try:
3499            actual = os.get_terminal_size(sys.__stdin__.fileno())
3500        except OSError as e:
3501            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3502                # Under win32 a generic OSError can be thrown if the
3503                # handle cannot be retrieved
3504                self.skipTest("failed to query terminal size")
3505            raise
3506        self.assertEqual(expected, actual)
3507
3508
3509@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3510@support.requires_linux_version(3, 17)
3511class MemfdCreateTests(unittest.TestCase):
3512    def test_memfd_create(self):
3513        fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3514        self.assertNotEqual(fd, -1)
3515        self.addCleanup(os.close, fd)
3516        self.assertFalse(os.get_inheritable(fd))
3517        with open(fd, "wb", closefd=False) as f:
3518            f.write(b'memfd_create')
3519            self.assertEqual(f.tell(), 12)
3520
3521        fd2 = os.memfd_create("Hi")
3522        self.addCleanup(os.close, fd2)
3523        self.assertFalse(os.get_inheritable(fd2))
3524
3525
3526class OSErrorTests(unittest.TestCase):
3527    def setUp(self):
3528        class Str(str):
3529            pass
3530
3531        self.bytes_filenames = []
3532        self.unicode_filenames = []
3533        if support.TESTFN_UNENCODABLE is not None:
3534            decoded = support.TESTFN_UNENCODABLE
3535        else:
3536            decoded = support.TESTFN
3537        self.unicode_filenames.append(decoded)
3538        self.unicode_filenames.append(Str(decoded))
3539        if support.TESTFN_UNDECODABLE is not None:
3540            encoded = support.TESTFN_UNDECODABLE
3541        else:
3542            encoded = os.fsencode(support.TESTFN)
3543        self.bytes_filenames.append(encoded)
3544        self.bytes_filenames.append(bytearray(encoded))
3545        self.bytes_filenames.append(memoryview(encoded))
3546
3547        self.filenames = self.bytes_filenames + self.unicode_filenames
3548
3549    def test_oserror_filename(self):
3550        funcs = [
3551            (self.filenames, os.chdir,),
3552            (self.filenames, os.chmod, 0o777),
3553            (self.filenames, os.lstat,),
3554            (self.filenames, os.open, os.O_RDONLY),
3555            (self.filenames, os.rmdir,),
3556            (self.filenames, os.stat,),
3557            (self.filenames, os.unlink,),
3558        ]
3559        if sys.platform == "win32":
3560            funcs.extend((
3561                (self.bytes_filenames, os.rename, b"dst"),
3562                (self.bytes_filenames, os.replace, b"dst"),
3563                (self.unicode_filenames, os.rename, "dst"),
3564                (self.unicode_filenames, os.replace, "dst"),
3565                (self.unicode_filenames, os.listdir, ),
3566            ))
3567        else:
3568            funcs.extend((
3569                (self.filenames, os.listdir,),
3570                (self.filenames, os.rename, "dst"),
3571                (self.filenames, os.replace, "dst"),
3572            ))
3573        if hasattr(os, "chown"):
3574            funcs.append((self.filenames, os.chown, 0, 0))
3575        if hasattr(os, "lchown"):
3576            funcs.append((self.filenames, os.lchown, 0, 0))
3577        if hasattr(os, "truncate"):
3578            funcs.append((self.filenames, os.truncate, 0))
3579        if hasattr(os, "chflags"):
3580            funcs.append((self.filenames, os.chflags, 0))
3581        if hasattr(os, "lchflags"):
3582            funcs.append((self.filenames, os.lchflags, 0))
3583        if hasattr(os, "chroot"):
3584            funcs.append((self.filenames, os.chroot,))
3585        if hasattr(os, "link"):
3586            if sys.platform == "win32":
3587                funcs.append((self.bytes_filenames, os.link, b"dst"))
3588                funcs.append((self.unicode_filenames, os.link, "dst"))
3589            else:
3590                funcs.append((self.filenames, os.link, "dst"))
3591        if hasattr(os, "listxattr"):
3592            funcs.extend((
3593                (self.filenames, os.listxattr,),
3594                (self.filenames, os.getxattr, "user.test"),
3595                (self.filenames, os.setxattr, "user.test", b'user'),
3596                (self.filenames, os.removexattr, "user.test"),
3597            ))
3598        if hasattr(os, "lchmod"):
3599            funcs.append((self.filenames, os.lchmod, 0o777))
3600        if hasattr(os, "readlink"):
3601            funcs.append((self.filenames, os.readlink,))
3602
3603
3604        for filenames, func, *func_args in funcs:
3605            for name in filenames:
3606                try:
3607                    if isinstance(name, (str, bytes)):
3608                        func(name, *func_args)
3609                    else:
3610                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3611                            func(name, *func_args)
3612                except OSError as err:
3613                    self.assertIs(err.filename, name, str(func))
3614                except UnicodeDecodeError:
3615                    pass
3616                else:
3617                    self.fail("No exception thrown by {}".format(func))
3618
3619class CPUCountTests(unittest.TestCase):
3620    def test_cpu_count(self):
3621        cpus = os.cpu_count()
3622        if cpus is not None:
3623            self.assertIsInstance(cpus, int)
3624            self.assertGreater(cpus, 0)
3625        else:
3626            self.skipTest("Could not determine the number of CPUs")
3627
3628
3629class FDInheritanceTests(unittest.TestCase):
3630    def test_get_set_inheritable(self):
3631        fd = os.open(__file__, os.O_RDONLY)
3632        self.addCleanup(os.close, fd)
3633        self.assertEqual(os.get_inheritable(fd), False)
3634
3635        os.set_inheritable(fd, True)
3636        self.assertEqual(os.get_inheritable(fd), True)
3637
3638    @unittest.skipIf(fcntl is None, "need fcntl")
3639    def test_get_inheritable_cloexec(self):
3640        fd = os.open(__file__, os.O_RDONLY)
3641        self.addCleanup(os.close, fd)
3642        self.assertEqual(os.get_inheritable(fd), False)
3643
3644        # clear FD_CLOEXEC flag
3645        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3646        flags &= ~fcntl.FD_CLOEXEC
3647        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3648
3649        self.assertEqual(os.get_inheritable(fd), True)
3650
3651    @unittest.skipIf(fcntl is None, "need fcntl")
3652    def test_set_inheritable_cloexec(self):
3653        fd = os.open(__file__, os.O_RDONLY)
3654        self.addCleanup(os.close, fd)
3655        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3656                         fcntl.FD_CLOEXEC)
3657
3658        os.set_inheritable(fd, True)
3659        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3660                         0)
3661
3662    @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3663    def test_get_set_inheritable_o_path(self):
3664        fd = os.open(__file__, os.O_PATH)
3665        self.addCleanup(os.close, fd)
3666        self.assertEqual(os.get_inheritable(fd), False)
3667
3668        os.set_inheritable(fd, True)
3669        self.assertEqual(os.get_inheritable(fd), True)
3670
3671        os.set_inheritable(fd, False)
3672        self.assertEqual(os.get_inheritable(fd), False)
3673
3674    def test_get_set_inheritable_badf(self):
3675        fd = support.make_bad_fd()
3676
3677        with self.assertRaises(OSError) as ctx:
3678            os.get_inheritable(fd)
3679        self.assertEqual(ctx.exception.errno, errno.EBADF)
3680
3681        with self.assertRaises(OSError) as ctx:
3682            os.set_inheritable(fd, True)
3683        self.assertEqual(ctx.exception.errno, errno.EBADF)
3684
3685        with self.assertRaises(OSError) as ctx:
3686            os.set_inheritable(fd, False)
3687        self.assertEqual(ctx.exception.errno, errno.EBADF)
3688
3689    def test_open(self):
3690        fd = os.open(__file__, os.O_RDONLY)
3691        self.addCleanup(os.close, fd)
3692        self.assertEqual(os.get_inheritable(fd), False)
3693
3694    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3695    def test_pipe(self):
3696        rfd, wfd = os.pipe()
3697        self.addCleanup(os.close, rfd)
3698        self.addCleanup(os.close, wfd)
3699        self.assertEqual(os.get_inheritable(rfd), False)
3700        self.assertEqual(os.get_inheritable(wfd), False)
3701
3702    def test_dup(self):
3703        fd1 = os.open(__file__, os.O_RDONLY)
3704        self.addCleanup(os.close, fd1)
3705
3706        fd2 = os.dup(fd1)
3707        self.addCleanup(os.close, fd2)
3708        self.assertEqual(os.get_inheritable(fd2), False)
3709
3710    def test_dup_standard_stream(self):
3711        fd = os.dup(1)
3712        self.addCleanup(os.close, fd)
3713        self.assertGreater(fd, 0)
3714
3715    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3716    def test_dup_nul(self):
3717        # os.dup() was creating inheritable fds for character files.
3718        fd1 = os.open('NUL', os.O_RDONLY)
3719        self.addCleanup(os.close, fd1)
3720        fd2 = os.dup(fd1)
3721        self.addCleanup(os.close, fd2)
3722        self.assertFalse(os.get_inheritable(fd2))
3723
3724    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3725    def test_dup2(self):
3726        fd = os.open(__file__, os.O_RDONLY)
3727        self.addCleanup(os.close, fd)
3728
3729        # inheritable by default
3730        fd2 = os.open(__file__, os.O_RDONLY)
3731        self.addCleanup(os.close, fd2)
3732        self.assertEqual(os.dup2(fd, fd2), fd2)
3733        self.assertTrue(os.get_inheritable(fd2))
3734
3735        # force non-inheritable
3736        fd3 = os.open(__file__, os.O_RDONLY)
3737        self.addCleanup(os.close, fd3)
3738        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3739        self.assertFalse(os.get_inheritable(fd3))
3740
3741    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3742    def test_openpty(self):
3743        master_fd, slave_fd = os.openpty()
3744        self.addCleanup(os.close, master_fd)
3745        self.addCleanup(os.close, slave_fd)
3746        self.assertEqual(os.get_inheritable(master_fd), False)
3747        self.assertEqual(os.get_inheritable(slave_fd), False)
3748
3749
3750class PathTConverterTests(unittest.TestCase):
3751    # tuples of (function name, allows fd arguments, additional arguments to
3752    # function, cleanup function)
3753    functions = [
3754        ('stat', True, (), None),
3755        ('lstat', False, (), None),
3756        ('access', False, (os.F_OK,), None),
3757        ('chflags', False, (0,), None),
3758        ('lchflags', False, (0,), None),
3759        ('open', False, (0,), getattr(os, 'close', None)),
3760    ]
3761
3762    def test_path_t_converter(self):
3763        str_filename = support.TESTFN
3764        if os.name == 'nt':
3765            bytes_fspath = bytes_filename = None
3766        else:
3767            bytes_filename = os.fsencode(support.TESTFN)
3768            bytes_fspath = FakePath(bytes_filename)
3769        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
3770        self.addCleanup(support.unlink, support.TESTFN)
3771        self.addCleanup(os.close, fd)
3772
3773        int_fspath = FakePath(fd)
3774        str_fspath = FakePath(str_filename)
3775
3776        for name, allow_fd, extra_args, cleanup_fn in self.functions:
3777            with self.subTest(name=name):
3778                try:
3779                    fn = getattr(os, name)
3780                except AttributeError:
3781                    continue
3782
3783                for path in (str_filename, bytes_filename, str_fspath,
3784                             bytes_fspath):
3785                    if path is None:
3786                        continue
3787                    with self.subTest(name=name, path=path):
3788                        result = fn(path, *extra_args)
3789                        if cleanup_fn is not None:
3790                            cleanup_fn(result)
3791
3792                with self.assertRaisesRegex(
3793                        TypeError, 'to return str or bytes'):
3794                    fn(int_fspath, *extra_args)
3795
3796                if allow_fd:
3797                    result = fn(fd, *extra_args)  # should not fail
3798                    if cleanup_fn is not None:
3799                        cleanup_fn(result)
3800                else:
3801                    with self.assertRaisesRegex(
3802                            TypeError,
3803                            'os.PathLike'):
3804                        fn(fd, *extra_args)
3805
3806    def test_path_t_converter_and_custom_class(self):
3807        msg = r'__fspath__\(\) to return str or bytes, not %s'
3808        with self.assertRaisesRegex(TypeError, msg % r'int'):
3809            os.stat(FakePath(2))
3810        with self.assertRaisesRegex(TypeError, msg % r'float'):
3811            os.stat(FakePath(2.34))
3812        with self.assertRaisesRegex(TypeError, msg % r'object'):
3813            os.stat(FakePath(object()))
3814
3815
3816@unittest.skipUnless(hasattr(os, 'get_blocking'),
3817                     'needs os.get_blocking() and os.set_blocking()')
3818class BlockingTests(unittest.TestCase):
3819    def test_blocking(self):
3820        fd = os.open(__file__, os.O_RDONLY)
3821        self.addCleanup(os.close, fd)
3822        self.assertEqual(os.get_blocking(fd), True)
3823
3824        os.set_blocking(fd, False)
3825        self.assertEqual(os.get_blocking(fd), False)
3826
3827        os.set_blocking(fd, True)
3828        self.assertEqual(os.get_blocking(fd), True)
3829
3830
3831
3832class ExportsTests(unittest.TestCase):
3833    def test_os_all(self):
3834        self.assertIn('open', os.__all__)
3835        self.assertIn('walk', os.__all__)
3836
3837
3838class TestDirEntry(unittest.TestCase):
3839    def setUp(self):
3840        self.path = os.path.realpath(support.TESTFN)
3841        self.addCleanup(support.rmtree, self.path)
3842        os.mkdir(self.path)
3843
3844    def test_uninstantiable(self):
3845        self.assertRaises(TypeError, os.DirEntry)
3846
3847    def test_unpickable(self):
3848        filename = create_file(os.path.join(self.path, "file.txt"), b'python')
3849        entry = [entry for entry in os.scandir(self.path)].pop()
3850        self.assertIsInstance(entry, os.DirEntry)
3851        self.assertEqual(entry.name, "file.txt")
3852        import pickle
3853        self.assertRaises(TypeError, pickle.dumps, entry, filename)
3854
3855
3856class TestScandir(unittest.TestCase):
3857    check_no_resource_warning = support.check_no_resource_warning
3858
3859    def setUp(self):
3860        self.path = os.path.realpath(support.TESTFN)
3861        self.bytes_path = os.fsencode(self.path)
3862        self.addCleanup(support.rmtree, self.path)
3863        os.mkdir(self.path)
3864
3865    def create_file(self, name="file.txt"):
3866        path = self.bytes_path if isinstance(name, bytes) else self.path
3867        filename = os.path.join(path, name)
3868        create_file(filename, b'python')
3869        return filename
3870
3871    def get_entries(self, names):
3872        entries = dict((entry.name, entry)
3873                       for entry in os.scandir(self.path))
3874        self.assertEqual(sorted(entries.keys()), names)
3875        return entries
3876
3877    def assert_stat_equal(self, stat1, stat2, skip_fields):
3878        if skip_fields:
3879            for attr in dir(stat1):
3880                if not attr.startswith("st_"):
3881                    continue
3882                if attr in ("st_dev", "st_ino", "st_nlink"):
3883                    continue
3884                self.assertEqual(getattr(stat1, attr),
3885                                 getattr(stat2, attr),
3886                                 (stat1, stat2, attr))
3887        else:
3888            self.assertEqual(stat1, stat2)
3889
3890    def test_uninstantiable(self):
3891        scandir_iter = os.scandir(self.path)
3892        self.assertRaises(TypeError, type(scandir_iter))
3893        scandir_iter.close()
3894
3895    def test_unpickable(self):
3896        filename = self.create_file("file.txt")
3897        scandir_iter = os.scandir(self.path)
3898        import pickle
3899        self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
3900        scandir_iter.close()
3901
3902    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
3903        self.assertIsInstance(entry, os.DirEntry)
3904        self.assertEqual(entry.name, name)
3905        self.assertEqual(entry.path, os.path.join(self.path, name))
3906        self.assertEqual(entry.inode(),
3907                         os.stat(entry.path, follow_symlinks=False).st_ino)
3908
3909        entry_stat = os.stat(entry.path)
3910        self.assertEqual(entry.is_dir(),
3911                         stat.S_ISDIR(entry_stat.st_mode))
3912        self.assertEqual(entry.is_file(),
3913                         stat.S_ISREG(entry_stat.st_mode))
3914        self.assertEqual(entry.is_symlink(),
3915                         os.path.islink(entry.path))
3916
3917        entry_lstat = os.stat(entry.path, follow_symlinks=False)
3918        self.assertEqual(entry.is_dir(follow_symlinks=False),
3919                         stat.S_ISDIR(entry_lstat.st_mode))
3920        self.assertEqual(entry.is_file(follow_symlinks=False),
3921                         stat.S_ISREG(entry_lstat.st_mode))
3922
3923        self.assert_stat_equal(entry.stat(),
3924                               entry_stat,
3925                               os.name == 'nt' and not is_symlink)
3926        self.assert_stat_equal(entry.stat(follow_symlinks=False),
3927                               entry_lstat,
3928                               os.name == 'nt')
3929
3930    def test_attributes(self):
3931        link = hasattr(os, 'link')
3932        symlink = support.can_symlink()
3933
3934        dirname = os.path.join(self.path, "dir")
3935        os.mkdir(dirname)
3936        filename = self.create_file("file.txt")
3937        if link:
3938            try:
3939                os.link(filename, os.path.join(self.path, "link_file.txt"))
3940            except PermissionError as e:
3941                self.skipTest('os.link(): %s' % e)
3942        if symlink:
3943            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3944                       target_is_directory=True)
3945            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3946
3947        names = ['dir', 'file.txt']
3948        if link:
3949            names.append('link_file.txt')
3950        if symlink:
3951            names.extend(('symlink_dir', 'symlink_file.txt'))
3952        entries = self.get_entries(names)
3953
3954        entry = entries['dir']
3955        self.check_entry(entry, 'dir', True, False, False)
3956
3957        entry = entries['file.txt']
3958        self.check_entry(entry, 'file.txt', False, True, False)
3959
3960        if link:
3961            entry = entries['link_file.txt']
3962            self.check_entry(entry, 'link_file.txt', False, True, False)
3963
3964        if symlink:
3965            entry = entries['symlink_dir']
3966            self.check_entry(entry, 'symlink_dir', True, False, True)
3967
3968            entry = entries['symlink_file.txt']
3969            self.check_entry(entry, 'symlink_file.txt', False, True, True)
3970
3971    def get_entry(self, name):
3972        path = self.bytes_path if isinstance(name, bytes) else self.path
3973        entries = list(os.scandir(path))
3974        self.assertEqual(len(entries), 1)
3975
3976        entry = entries[0]
3977        self.assertEqual(entry.name, name)
3978        return entry
3979
3980    def create_file_entry(self, name='file.txt'):
3981        filename = self.create_file(name=name)
3982        return self.get_entry(os.path.basename(filename))
3983
3984    def test_current_directory(self):
3985        filename = self.create_file()
3986        old_dir = os.getcwd()
3987        try:
3988            os.chdir(self.path)
3989
3990            # call scandir() without parameter: it must list the content
3991            # of the current directory
3992            entries = dict((entry.name, entry) for entry in os.scandir())
3993            self.assertEqual(sorted(entries.keys()),
3994                             [os.path.basename(filename)])
3995        finally:
3996            os.chdir(old_dir)
3997
3998    def test_repr(self):
3999        entry = self.create_file_entry()
4000        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4001
4002    def test_fspath_protocol(self):
4003        entry = self.create_file_entry()
4004        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4005
4006    def test_fspath_protocol_bytes(self):
4007        bytes_filename = os.fsencode('bytesfile.txt')
4008        bytes_entry = self.create_file_entry(name=bytes_filename)
4009        fspath = os.fspath(bytes_entry)
4010        self.assertIsInstance(fspath, bytes)
4011        self.assertEqual(fspath,
4012                         os.path.join(os.fsencode(self.path),bytes_filename))
4013
4014    def test_removed_dir(self):
4015        path = os.path.join(self.path, 'dir')
4016
4017        os.mkdir(path)
4018        entry = self.get_entry('dir')
4019        os.rmdir(path)
4020
4021        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4022        if os.name == 'nt':
4023            self.assertTrue(entry.is_dir())
4024        self.assertFalse(entry.is_file())
4025        self.assertFalse(entry.is_symlink())
4026        if os.name == 'nt':
4027            self.assertRaises(FileNotFoundError, entry.inode)
4028            # don't fail
4029            entry.stat()
4030            entry.stat(follow_symlinks=False)
4031        else:
4032            self.assertGreater(entry.inode(), 0)
4033            self.assertRaises(FileNotFoundError, entry.stat)
4034            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4035
4036    def test_removed_file(self):
4037        entry = self.create_file_entry()
4038        os.unlink(entry.path)
4039
4040        self.assertFalse(entry.is_dir())
4041        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4042        if os.name == 'nt':
4043            self.assertTrue(entry.is_file())
4044        self.assertFalse(entry.is_symlink())
4045        if os.name == 'nt':
4046            self.assertRaises(FileNotFoundError, entry.inode)
4047            # don't fail
4048            entry.stat()
4049            entry.stat(follow_symlinks=False)
4050        else:
4051            self.assertGreater(entry.inode(), 0)
4052            self.assertRaises(FileNotFoundError, entry.stat)
4053            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4054
4055    def test_broken_symlink(self):
4056        if not support.can_symlink():
4057            return self.skipTest('cannot create symbolic link')
4058
4059        filename = self.create_file("file.txt")
4060        os.symlink(filename,
4061                   os.path.join(self.path, "symlink.txt"))
4062        entries = self.get_entries(['file.txt', 'symlink.txt'])
4063        entry = entries['symlink.txt']
4064        os.unlink(filename)
4065
4066        self.assertGreater(entry.inode(), 0)
4067        self.assertFalse(entry.is_dir())
4068        self.assertFalse(entry.is_file())  # broken symlink returns False
4069        self.assertFalse(entry.is_dir(follow_symlinks=False))
4070        self.assertFalse(entry.is_file(follow_symlinks=False))
4071        self.assertTrue(entry.is_symlink())
4072        self.assertRaises(FileNotFoundError, entry.stat)
4073        # don't fail
4074        entry.stat(follow_symlinks=False)
4075
4076    def test_bytes(self):
4077        self.create_file("file.txt")
4078
4079        path_bytes = os.fsencode(self.path)
4080        entries = list(os.scandir(path_bytes))
4081        self.assertEqual(len(entries), 1, entries)
4082        entry = entries[0]
4083
4084        self.assertEqual(entry.name, b'file.txt')
4085        self.assertEqual(entry.path,
4086                         os.fsencode(os.path.join(self.path, 'file.txt')))
4087
4088    def test_bytes_like(self):
4089        self.create_file("file.txt")
4090
4091        for cls in bytearray, memoryview:
4092            path_bytes = cls(os.fsencode(self.path))
4093            with self.assertWarns(DeprecationWarning):
4094                entries = list(os.scandir(path_bytes))
4095            self.assertEqual(len(entries), 1, entries)
4096            entry = entries[0]
4097
4098            self.assertEqual(entry.name, b'file.txt')
4099            self.assertEqual(entry.path,
4100                             os.fsencode(os.path.join(self.path, 'file.txt')))
4101            self.assertIs(type(entry.name), bytes)
4102            self.assertIs(type(entry.path), bytes)
4103
4104    @unittest.skipUnless(os.listdir in os.supports_fd,
4105                         'fd support for listdir required for this test.')
4106    def test_fd(self):
4107        self.assertIn(os.scandir, os.supports_fd)
4108        self.create_file('file.txt')
4109        expected_names = ['file.txt']
4110        if support.can_symlink():
4111            os.symlink('file.txt', os.path.join(self.path, 'link'))
4112            expected_names.append('link')
4113
4114        fd = os.open(self.path, os.O_RDONLY)
4115        try:
4116            with os.scandir(fd) as it:
4117                entries = list(it)
4118            names = [entry.name for entry in entries]
4119            self.assertEqual(sorted(names), expected_names)
4120            self.assertEqual(names, os.listdir(fd))
4121            for entry in entries:
4122                self.assertEqual(entry.path, entry.name)
4123                self.assertEqual(os.fspath(entry), entry.name)
4124                self.assertEqual(entry.is_symlink(), entry.name == 'link')
4125                if os.stat in os.supports_dir_fd:
4126                    st = os.stat(entry.name, dir_fd=fd)
4127                    self.assertEqual(entry.stat(), st)
4128                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4129                    self.assertEqual(entry.stat(follow_symlinks=False), st)
4130        finally:
4131            os.close(fd)
4132
4133    def test_empty_path(self):
4134        self.assertRaises(FileNotFoundError, os.scandir, '')
4135
4136    def test_consume_iterator_twice(self):
4137        self.create_file("file.txt")
4138        iterator = os.scandir(self.path)
4139
4140        entries = list(iterator)
4141        self.assertEqual(len(entries), 1, entries)
4142
4143        # check than consuming the iterator twice doesn't raise exception
4144        entries2 = list(iterator)
4145        self.assertEqual(len(entries2), 0, entries2)
4146
4147    def test_bad_path_type(self):
4148        for obj in [1.234, {}, []]:
4149            self.assertRaises(TypeError, os.scandir, obj)
4150
4151    def test_close(self):
4152        self.create_file("file.txt")
4153        self.create_file("file2.txt")
4154        iterator = os.scandir(self.path)
4155        next(iterator)
4156        iterator.close()
4157        # multiple closes
4158        iterator.close()
4159        with self.check_no_resource_warning():
4160            del iterator
4161
4162    def test_context_manager(self):
4163        self.create_file("file.txt")
4164        self.create_file("file2.txt")
4165        with os.scandir(self.path) as iterator:
4166            next(iterator)
4167        with self.check_no_resource_warning():
4168            del iterator
4169
4170    def test_context_manager_close(self):
4171        self.create_file("file.txt")
4172        self.create_file("file2.txt")
4173        with os.scandir(self.path) as iterator:
4174            next(iterator)
4175            iterator.close()
4176
4177    def test_context_manager_exception(self):
4178        self.create_file("file.txt")
4179        self.create_file("file2.txt")
4180        with self.assertRaises(ZeroDivisionError):
4181            with os.scandir(self.path) as iterator:
4182                next(iterator)
4183                1/0
4184        with self.check_no_resource_warning():
4185            del iterator
4186
4187    def test_resource_warning(self):
4188        self.create_file("file.txt")
4189        self.create_file("file2.txt")
4190        iterator = os.scandir(self.path)
4191        next(iterator)
4192        with self.assertWarns(ResourceWarning):
4193            del iterator
4194            support.gc_collect()
4195        # exhausted iterator
4196        iterator = os.scandir(self.path)
4197        list(iterator)
4198        with self.check_no_resource_warning():
4199            del iterator
4200
4201
4202class TestPEP519(unittest.TestCase):
4203
4204    # Abstracted so it can be overridden to test pure Python implementation
4205    # if a C version is provided.
4206    fspath = staticmethod(os.fspath)
4207
4208    def test_return_bytes(self):
4209        for b in b'hello', b'goodbye', b'some/path/and/file':
4210            self.assertEqual(b, self.fspath(b))
4211
4212    def test_return_string(self):
4213        for s in 'hello', 'goodbye', 'some/path/and/file':
4214            self.assertEqual(s, self.fspath(s))
4215
4216    def test_fsencode_fsdecode(self):
4217        for p in "path/like/object", b"path/like/object":
4218            pathlike = FakePath(p)
4219
4220            self.assertEqual(p, self.fspath(pathlike))
4221            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4222            self.assertEqual("path/like/object", os.fsdecode(pathlike))
4223
4224    def test_pathlike(self):
4225        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4226        self.assertTrue(issubclass(FakePath, os.PathLike))
4227        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
4228
4229    def test_garbage_in_exception_out(self):
4230        vapor = type('blah', (), {})
4231        for o in int, type, os, vapor():
4232            self.assertRaises(TypeError, self.fspath, o)
4233
4234    def test_argument_required(self):
4235        self.assertRaises(TypeError, self.fspath)
4236
4237    def test_bad_pathlike(self):
4238        # __fspath__ returns a value other than str or bytes.
4239        self.assertRaises(TypeError, self.fspath, FakePath(42))
4240        # __fspath__ attribute that is not callable.
4241        c = type('foo', (), {})
4242        c.__fspath__ = 1
4243        self.assertRaises(TypeError, self.fspath, c())
4244        # __fspath__ raises an exception.
4245        self.assertRaises(ZeroDivisionError, self.fspath,
4246                          FakePath(ZeroDivisionError()))
4247
4248    def test_pathlike_subclasshook(self):
4249        # bpo-38878: subclasshook causes subclass checks
4250        # true on abstract implementation.
4251        class A(os.PathLike):
4252            pass
4253        self.assertFalse(issubclass(FakePath, A))
4254        self.assertTrue(issubclass(FakePath, os.PathLike))
4255
4256    def test_pathlike_class_getitem(self):
4257        self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
4258
4259
4260class TimesTests(unittest.TestCase):
4261    def test_times(self):
4262        times = os.times()
4263        self.assertIsInstance(times, os.times_result)
4264
4265        for field in ('user', 'system', 'children_user', 'children_system',
4266                      'elapsed'):
4267            value = getattr(times, field)
4268            self.assertIsInstance(value, float)
4269
4270        if os.name == 'nt':
4271            self.assertEqual(times.children_user, 0)
4272            self.assertEqual(times.children_system, 0)
4273            self.assertEqual(times.elapsed, 0)
4274
4275
4276# Only test if the C version is provided, otherwise TestPEP519 already tested
4277# the pure Python implementation.
4278if hasattr(os, "_fspath"):
4279    class TestPEP519PurePython(TestPEP519):
4280
4281        """Explicitly test the pure Python implementation of os.fspath()."""
4282
4283        fspath = staticmethod(os._fspath)
4284
4285
4286if __name__ == "__main__":
4287    unittest.main()
4288