• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import codecs
6import contextlib
7import decimal
8import errno
9import fnmatch
10import fractions
11import itertools
12import locale
13import mmap
14import os
15import pickle
16import select
17import shutil
18import signal
19import socket
20import stat
21import struct
22import subprocess
23import sys
24import sysconfig
25import tempfile
26import threading
27import time
28import types
29import unittest
30import uuid
31import warnings
32from test import support
33from test.support import import_helper
34from test.support import os_helper
35from test.support import socket_helper
36from test.support import threading_helper
37from test.support import warnings_helper
38from platform import win32_is_iot
39
40with warnings.catch_warnings():
41    warnings.simplefilter('ignore', DeprecationWarning)
42    import asynchat
43    import asyncore
44
45try:
46    import resource
47except ImportError:
48    resource = None
49try:
50    import fcntl
51except ImportError:
52    fcntl = None
53try:
54    import _winapi
55except ImportError:
56    _winapi = None
57try:
58    import pwd
59    all_users = [u.pw_uid for u in pwd.getpwall()]
60except (ImportError, AttributeError):
61    all_users = []
62try:
63    from _testcapi import INT_MAX, PY_SSIZE_T_MAX
64except ImportError:
65    INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
66
67
68from test.support.script_helper import assert_python_ok
69from test.support import unix_shell
70from test.support.os_helper import FakePath
71
72
73root_in_posix = False
74if hasattr(os, 'geteuid'):
75    root_in_posix = (os.geteuid() == 0)
76
77# Detect whether we're on a Linux system that uses the (now outdated
78# and unmaintained) linuxthreads threading library.  There's an issue
79# when combining linuxthreads with a failed execv call: see
80# http://bugs.python.org/issue4970.
81if hasattr(sys, 'thread_info') and sys.thread_info.version:
82    USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
83else:
84    USING_LINUXTHREADS = False
85
86# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
87HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
88
89
90def requires_os_func(name):
91    return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
92
93
94def create_file(filename, content=b'content'):
95    with open(filename, "xb", 0) as fp:
96        fp.write(content)
97
98
99# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
100requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
101                                       'on AIX, splice() only accepts sockets')
102
103
104class MiscTests(unittest.TestCase):
105    def test_getcwd(self):
106        cwd = os.getcwd()
107        self.assertIsInstance(cwd, str)
108
109    def test_getcwd_long_path(self):
110        # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On
111        # Windows, MAX_PATH is defined as 260 characters, but Windows supports
112        # longer path if longer paths support is enabled. Internally, the os
113        # module uses MAXPATHLEN which is at least 1024.
114        #
115        # Use a directory name of 200 characters to fit into Windows MAX_PATH
116        # limit.
117        #
118        # On Windows, the test can stop when trying to create a path longer
119        # than MAX_PATH if long paths support is disabled:
120        # see RtlAreLongPathsEnabled().
121        min_len = 2000   # characters
122        # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path
123        # longer than PATH_MAX will fail.
124        if sys.platform == 'vxworks':
125            min_len = 1000
126        dirlen = 200     # characters
127        dirname = 'python_test_dir_'
128        dirname = dirname + ('a' * (dirlen - len(dirname)))
129
130        with tempfile.TemporaryDirectory() as tmpdir:
131            with os_helper.change_cwd(tmpdir) as path:
132                expected = path
133
134                while True:
135                    cwd = os.getcwd()
136                    self.assertEqual(cwd, expected)
137
138                    need = min_len - (len(cwd) + len(os.path.sep))
139                    if need <= 0:
140                        break
141                    if len(dirname) > need and need > 0:
142                        dirname = dirname[:need]
143
144                    path = os.path.join(path, dirname)
145                    try:
146                        os.mkdir(path)
147                        # On Windows, chdir() can fail
148                        # even if mkdir() succeeded
149                        os.chdir(path)
150                    except FileNotFoundError:
151                        # On Windows, catch ERROR_PATH_NOT_FOUND (3) and
152                        # ERROR_FILENAME_EXCED_RANGE (206) errors
153                        # ("The filename or extension is too long")
154                        break
155                    except OSError as exc:
156                        if exc.errno == errno.ENAMETOOLONG:
157                            break
158                        else:
159                            raise
160
161                    expected = path
162
163                if support.verbose:
164                    print(f"Tested current directory length: {len(cwd)}")
165
166    def test_getcwdb(self):
167        cwd = os.getcwdb()
168        self.assertIsInstance(cwd, bytes)
169        self.assertEqual(os.fsdecode(cwd), os.getcwd())
170
171
172# Tests creating TESTFN
173class FileTests(unittest.TestCase):
174    def setUp(self):
175        if os.path.lexists(os_helper.TESTFN):
176            os.unlink(os_helper.TESTFN)
177    tearDown = setUp
178
179    def test_access(self):
180        f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
181        os.close(f)
182        self.assertTrue(os.access(os_helper.TESTFN, os.W_OK))
183
184    def test_closerange(self):
185        first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
186        # We must allocate two consecutive file descriptors, otherwise
187        # it will mess up other file descriptors (perhaps even the three
188        # standard ones).
189        second = os.dup(first)
190        try:
191            retries = 0
192            while second != first + 1:
193                os.close(first)
194                retries += 1
195                if retries > 10:
196                    # XXX test skipped
197                    self.skipTest("couldn't allocate two consecutive fds")
198                first, second = second, os.dup(second)
199        finally:
200            os.close(second)
201        # close a fd that is open, and one that isn't
202        os.closerange(first, first + 2)
203        self.assertRaises(OSError, os.write, first, b"a")
204
205    @support.cpython_only
206    def test_rename(self):
207        path = os_helper.TESTFN
208        old = sys.getrefcount(path)
209        self.assertRaises(TypeError, os.rename, path, 0)
210        new = sys.getrefcount(path)
211        self.assertEqual(old, new)
212
213    def test_read(self):
214        with open(os_helper.TESTFN, "w+b") as fobj:
215            fobj.write(b"spam")
216            fobj.flush()
217            fd = fobj.fileno()
218            os.lseek(fd, 0, 0)
219            s = os.read(fd, 4)
220            self.assertEqual(type(s), bytes)
221            self.assertEqual(s, b"spam")
222
223    @support.cpython_only
224    # Skip the test on 32-bit platforms: the number of bytes must fit in a
225    # Py_ssize_t type
226    @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
227                         "needs INT_MAX < PY_SSIZE_T_MAX")
228    @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
229    def test_large_read(self, size):
230        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
231        create_file(os_helper.TESTFN, b'test')
232
233        # Issue #21932: Make sure that os.read() does not raise an
234        # OverflowError for size larger than INT_MAX
235        with open(os_helper.TESTFN, "rb") as fp:
236            data = os.read(fp.fileno(), size)
237
238        # The test does not try to read more than 2 GiB at once because the
239        # operating system is free to return less bytes than requested.
240        self.assertEqual(data, b'test')
241
242    def test_write(self):
243        # os.write() accepts bytes- and buffer-like objects but not strings
244        fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY)
245        self.assertRaises(TypeError, os.write, fd, "beans")
246        os.write(fd, b"bacon\n")
247        os.write(fd, bytearray(b"eggs\n"))
248        os.write(fd, memoryview(b"spam\n"))
249        os.close(fd)
250        with open(os_helper.TESTFN, "rb") as fobj:
251            self.assertEqual(fobj.read().splitlines(),
252                [b"bacon", b"eggs", b"spam"])
253
254    def write_windows_console(self, *args):
255        retcode = subprocess.call(args,
256            # use a new console to not flood the test output
257            creationflags=subprocess.CREATE_NEW_CONSOLE,
258            # use a shell to hide the console window (SW_HIDE)
259            shell=True)
260        self.assertEqual(retcode, 0)
261
262    @unittest.skipUnless(sys.platform == 'win32',
263                         'test specific to the Windows console')
264    def test_write_windows_console(self):
265        # Issue #11395: the Windows console returns an error (12: not enough
266        # space error) on writing into stdout if stdout mode is binary and the
267        # length is greater than 66,000 bytes (or less, depending on heap
268        # usage).
269        code = "print('x' * 100000)"
270        self.write_windows_console(sys.executable, "-c", code)
271        self.write_windows_console(sys.executable, "-u", "-c", code)
272
273    def fdopen_helper(self, *args):
274        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
275        f = os.fdopen(fd, *args, encoding="utf-8")
276        f.close()
277
278    def test_fdopen(self):
279        fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR)
280        os.close(fd)
281
282        self.fdopen_helper()
283        self.fdopen_helper('r')
284        self.fdopen_helper('r', 100)
285
286    def test_replace(self):
287        TESTFN2 = os_helper.TESTFN + ".2"
288        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
289        self.addCleanup(os_helper.unlink, TESTFN2)
290
291        create_file(os_helper.TESTFN, b"1")
292        create_file(TESTFN2, b"2")
293
294        os.replace(os_helper.TESTFN, TESTFN2)
295        self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN)
296        with open(TESTFN2, 'r', encoding='utf-8') as f:
297            self.assertEqual(f.read(), "1")
298
299    def test_open_keywords(self):
300        f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
301            dir_fd=None)
302        os.close(f)
303
304    def test_symlink_keywords(self):
305        symlink = support.get_attribute(os, "symlink")
306        try:
307            symlink(src='target', dst=os_helper.TESTFN,
308                target_is_directory=False, dir_fd=None)
309        except (NotImplementedError, OSError):
310            pass  # No OS support or unprivileged user
311
312    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
313    def test_copy_file_range_invalid_values(self):
314        with self.assertRaises(ValueError):
315            os.copy_file_range(0, 1, -10)
316
317    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
318    def test_copy_file_range(self):
319        TESTFN2 = os_helper.TESTFN + ".3"
320        data = b'0123456789'
321
322        create_file(os_helper.TESTFN, data)
323        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
324
325        in_file = open(os_helper.TESTFN, 'rb')
326        self.addCleanup(in_file.close)
327        in_fd = in_file.fileno()
328
329        out_file = open(TESTFN2, 'w+b')
330        self.addCleanup(os_helper.unlink, TESTFN2)
331        self.addCleanup(out_file.close)
332        out_fd = out_file.fileno()
333
334        try:
335            i = os.copy_file_range(in_fd, out_fd, 5)
336        except OSError as e:
337            # Handle the case in which Python was compiled
338            # in a system with the syscall but without support
339            # in the kernel.
340            if e.errno != errno.ENOSYS:
341                raise
342            self.skipTest(e)
343        else:
344            # The number of copied bytes can be less than
345            # the number of bytes originally requested.
346            self.assertIn(i, range(0, 6));
347
348            with open(TESTFN2, 'rb') as in_file:
349                self.assertEqual(in_file.read(), data[:i])
350
351    @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
352    def test_copy_file_range_offset(self):
353        TESTFN4 = os_helper.TESTFN + ".4"
354        data = b'0123456789'
355        bytes_to_copy = 6
356        in_skip = 3
357        out_seek = 5
358
359        create_file(os_helper.TESTFN, data)
360        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
361
362        in_file = open(os_helper.TESTFN, 'rb')
363        self.addCleanup(in_file.close)
364        in_fd = in_file.fileno()
365
366        out_file = open(TESTFN4, 'w+b')
367        self.addCleanup(os_helper.unlink, TESTFN4)
368        self.addCleanup(out_file.close)
369        out_fd = out_file.fileno()
370
371        try:
372            i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
373                                   offset_src=in_skip,
374                                   offset_dst=out_seek)
375        except OSError as e:
376            # Handle the case in which Python was compiled
377            # in a system with the syscall but without support
378            # in the kernel.
379            if e.errno != errno.ENOSYS:
380                raise
381            self.skipTest(e)
382        else:
383            # The number of copied bytes can be less than
384            # the number of bytes originally requested.
385            self.assertIn(i, range(0, bytes_to_copy+1));
386
387            with open(TESTFN4, 'rb') as in_file:
388                read = in_file.read()
389            # seeked bytes (5) are zero'ed
390            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
391            # 012 are skipped (in_skip)
392            # 345678 are copied in the file (in_skip + bytes_to_copy)
393            self.assertEqual(read[out_seek:],
394                             data[in_skip:in_skip+i])
395
396    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
397    def test_splice_invalid_values(self):
398        with self.assertRaises(ValueError):
399            os.splice(0, 1, -10)
400
401    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
402    @requires_splice_pipe
403    def test_splice(self):
404        TESTFN2 = os_helper.TESTFN + ".3"
405        data = b'0123456789'
406
407        create_file(os_helper.TESTFN, data)
408        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
409
410        in_file = open(os_helper.TESTFN, 'rb')
411        self.addCleanup(in_file.close)
412        in_fd = in_file.fileno()
413
414        read_fd, write_fd = os.pipe()
415        self.addCleanup(lambda: os.close(read_fd))
416        self.addCleanup(lambda: os.close(write_fd))
417
418        try:
419            i = os.splice(in_fd, write_fd, 5)
420        except OSError as e:
421            # Handle the case in which Python was compiled
422            # in a system with the syscall but without support
423            # in the kernel.
424            if e.errno != errno.ENOSYS:
425                raise
426            self.skipTest(e)
427        else:
428            # The number of copied bytes can be less than
429            # the number of bytes originally requested.
430            self.assertIn(i, range(0, 6));
431
432            self.assertEqual(os.read(read_fd, 100), data[:i])
433
434    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
435    @requires_splice_pipe
436    def test_splice_offset_in(self):
437        TESTFN4 = os_helper.TESTFN + ".4"
438        data = b'0123456789'
439        bytes_to_copy = 6
440        in_skip = 3
441
442        create_file(os_helper.TESTFN, data)
443        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
444
445        in_file = open(os_helper.TESTFN, 'rb')
446        self.addCleanup(in_file.close)
447        in_fd = in_file.fileno()
448
449        read_fd, write_fd = os.pipe()
450        self.addCleanup(lambda: os.close(read_fd))
451        self.addCleanup(lambda: os.close(write_fd))
452
453        try:
454            i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip)
455        except OSError as e:
456            # Handle the case in which Python was compiled
457            # in a system with the syscall but without support
458            # in the kernel.
459            if e.errno != errno.ENOSYS:
460                raise
461            self.skipTest(e)
462        else:
463            # The number of copied bytes can be less than
464            # the number of bytes originally requested.
465            self.assertIn(i, range(0, bytes_to_copy+1));
466
467            read = os.read(read_fd, 100)
468            # 012 are skipped (in_skip)
469            # 345678 are copied in the file (in_skip + bytes_to_copy)
470            self.assertEqual(read, data[in_skip:in_skip+i])
471
472    @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()')
473    @requires_splice_pipe
474    def test_splice_offset_out(self):
475        TESTFN4 = os_helper.TESTFN + ".4"
476        data = b'0123456789'
477        bytes_to_copy = 6
478        out_seek = 3
479
480        create_file(os_helper.TESTFN, data)
481        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
482
483        read_fd, write_fd = os.pipe()
484        self.addCleanup(lambda: os.close(read_fd))
485        self.addCleanup(lambda: os.close(write_fd))
486        os.write(write_fd, data)
487
488        out_file = open(TESTFN4, 'w+b')
489        self.addCleanup(os_helper.unlink, TESTFN4)
490        self.addCleanup(out_file.close)
491        out_fd = out_file.fileno()
492
493        try:
494            i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek)
495        except OSError as e:
496            # Handle the case in which Python was compiled
497            # in a system with the syscall but without support
498            # in the kernel.
499            if e.errno != errno.ENOSYS:
500                raise
501            self.skipTest(e)
502        else:
503            # The number of copied bytes can be less than
504            # the number of bytes originally requested.
505            self.assertIn(i, range(0, bytes_to_copy+1));
506
507            with open(TESTFN4, 'rb') as in_file:
508                read = in_file.read()
509            # seeked bytes (5) are zero'ed
510            self.assertEqual(read[:out_seek], b'\x00'*out_seek)
511            # 012 are skipped (in_skip)
512            # 345678 are copied in the file (in_skip + bytes_to_copy)
513            self.assertEqual(read[out_seek:], data[:i])
514
515
516# Test attributes on return values from os.*stat* family.
517class StatAttributeTests(unittest.TestCase):
518    def setUp(self):
519        self.fname = os_helper.TESTFN
520        self.addCleanup(os_helper.unlink, self.fname)
521        create_file(self.fname, b"ABC")
522
523    def check_stat_attributes(self, fname):
524        result = os.stat(fname)
525
526        # Make sure direct access works
527        self.assertEqual(result[stat.ST_SIZE], 3)
528        self.assertEqual(result.st_size, 3)
529
530        # Make sure all the attributes are there
531        members = dir(result)
532        for name in dir(stat):
533            if name[:3] == 'ST_':
534                attr = name.lower()
535                if name.endswith("TIME"):
536                    def trunc(x): return int(x)
537                else:
538                    def trunc(x): return x
539                self.assertEqual(trunc(getattr(result, attr)),
540                                  result[getattr(stat, name)])
541                self.assertIn(attr, members)
542
543        # Make sure that the st_?time and st_?time_ns fields roughly agree
544        # (they should always agree up to around tens-of-microseconds)
545        for name in 'st_atime st_mtime st_ctime'.split():
546            floaty = int(getattr(result, name) * 100000)
547            nanosecondy = getattr(result, name + "_ns") // 10000
548            self.assertAlmostEqual(floaty, nanosecondy, delta=2)
549
550        try:
551            result[200]
552            self.fail("No exception raised")
553        except IndexError:
554            pass
555
556        # Make sure that assignment fails
557        try:
558            result.st_mode = 1
559            self.fail("No exception raised")
560        except AttributeError:
561            pass
562
563        try:
564            result.st_rdev = 1
565            self.fail("No exception raised")
566        except (AttributeError, TypeError):
567            pass
568
569        try:
570            result.parrot = 1
571            self.fail("No exception raised")
572        except AttributeError:
573            pass
574
575        # Use the stat_result constructor with a too-short tuple.
576        try:
577            result2 = os.stat_result((10,))
578            self.fail("No exception raised")
579        except TypeError:
580            pass
581
582        # Use the constructor with a too-long tuple.
583        try:
584            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
585        except TypeError:
586            pass
587
588    def test_stat_attributes(self):
589        self.check_stat_attributes(self.fname)
590
591    def test_stat_attributes_bytes(self):
592        try:
593            fname = self.fname.encode(sys.getfilesystemencoding())
594        except UnicodeEncodeError:
595            self.skipTest("cannot encode %a for the filesystem" % self.fname)
596        self.check_stat_attributes(fname)
597
598    def test_stat_result_pickle(self):
599        result = os.stat(self.fname)
600        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
601            p = pickle.dumps(result, proto)
602            self.assertIn(b'stat_result', p)
603            if proto < 4:
604                self.assertIn(b'cos\nstat_result\n', p)
605            unpickled = pickle.loads(p)
606            self.assertEqual(result, unpickled)
607
608    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
609    def test_statvfs_attributes(self):
610        result = os.statvfs(self.fname)
611
612        # Make sure direct access works
613        self.assertEqual(result.f_bfree, result[3])
614
615        # Make sure all the attributes are there.
616        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
617                    'ffree', 'favail', 'flag', 'namemax')
618        for value, member in enumerate(members):
619            self.assertEqual(getattr(result, 'f_' + member), result[value])
620
621        self.assertTrue(isinstance(result.f_fsid, int))
622
623        # Test that the size of the tuple doesn't change
624        self.assertEqual(len(result), 10)
625
626        # Make sure that assignment really fails
627        try:
628            result.f_bfree = 1
629            self.fail("No exception raised")
630        except AttributeError:
631            pass
632
633        try:
634            result.parrot = 1
635            self.fail("No exception raised")
636        except AttributeError:
637            pass
638
639        # Use the constructor with a too-short tuple.
640        try:
641            result2 = os.statvfs_result((10,))
642            self.fail("No exception raised")
643        except TypeError:
644            pass
645
646        # Use the constructor with a too-long tuple.
647        try:
648            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
649        except TypeError:
650            pass
651
652    @unittest.skipUnless(hasattr(os, 'statvfs'),
653                         "need os.statvfs()")
654    def test_statvfs_result_pickle(self):
655        result = os.statvfs(self.fname)
656
657        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
658            p = pickle.dumps(result, proto)
659            self.assertIn(b'statvfs_result', p)
660            if proto < 4:
661                self.assertIn(b'cos\nstatvfs_result\n', p)
662            unpickled = pickle.loads(p)
663            self.assertEqual(result, unpickled)
664
665    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
666    def test_1686475(self):
667        # Verify that an open file can be stat'ed
668        try:
669            os.stat(r"c:\pagefile.sys")
670        except FileNotFoundError:
671            self.skipTest(r'c:\pagefile.sys does not exist')
672        except OSError as e:
673            self.fail("Could not stat pagefile.sys")
674
675    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
676    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
677    def test_15261(self):
678        # Verify that stat'ing a closed fd does not cause crash
679        r, w = os.pipe()
680        try:
681            os.stat(r)          # should not raise error
682        finally:
683            os.close(r)
684            os.close(w)
685        with self.assertRaises(OSError) as ctx:
686            os.stat(r)
687        self.assertEqual(ctx.exception.errno, errno.EBADF)
688
689    def check_file_attributes(self, result):
690        self.assertTrue(hasattr(result, 'st_file_attributes'))
691        self.assertTrue(isinstance(result.st_file_attributes, int))
692        self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
693
694    @unittest.skipUnless(sys.platform == "win32",
695                         "st_file_attributes is Win32 specific")
696    def test_file_attributes(self):
697        # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
698        result = os.stat(self.fname)
699        self.check_file_attributes(result)
700        self.assertEqual(
701            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
702            0)
703
704        # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
705        dirname = os_helper.TESTFN + "dir"
706        os.mkdir(dirname)
707        self.addCleanup(os.rmdir, dirname)
708
709        result = os.stat(dirname)
710        self.check_file_attributes(result)
711        self.assertEqual(
712            result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
713            stat.FILE_ATTRIBUTE_DIRECTORY)
714
715    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
716    def test_access_denied(self):
717        # Default to FindFirstFile WIN32_FIND_DATA when access is
718        # denied. See issue 28075.
719        # os.environ['TEMP'] should be located on a volume that
720        # supports file ACLs.
721        fname = os.path.join(os.environ['TEMP'], self.fname)
722        self.addCleanup(os_helper.unlink, fname)
723        create_file(fname, b'ABC')
724        # Deny the right to [S]YNCHRONIZE on the file to
725        # force CreateFile to fail with ERROR_ACCESS_DENIED.
726        DETACHED_PROCESS = 8
727        subprocess.check_call(
728            # bpo-30584: Use security identifier *S-1-5-32-545 instead
729            # of localized "Users" to not depend on the locale.
730            ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
731            creationflags=DETACHED_PROCESS
732        )
733        result = os.stat(fname)
734        self.assertNotEqual(result.st_size, 0)
735
736    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
737    def test_stat_block_device(self):
738        # bpo-38030: os.stat fails for block devices
739        # Test a filename like "//./C:"
740        fname = "//./" + os.path.splitdrive(os.getcwd())[0]
741        result = os.stat(fname)
742        self.assertEqual(result.st_mode, stat.S_IFBLK)
743
744
745class UtimeTests(unittest.TestCase):
746    def setUp(self):
747        self.dirname = os_helper.TESTFN
748        self.fname = os.path.join(self.dirname, "f1")
749
750        self.addCleanup(os_helper.rmtree, self.dirname)
751        os.mkdir(self.dirname)
752        create_file(self.fname)
753
754    def support_subsecond(self, filename):
755        # Heuristic to check if the filesystem supports timestamp with
756        # subsecond resolution: check if float and int timestamps are different
757        st = os.stat(filename)
758        return ((st.st_atime != st[7])
759                or (st.st_mtime != st[8])
760                or (st.st_ctime != st[9]))
761
762    def _test_utime(self, set_time, filename=None):
763        if not filename:
764            filename = self.fname
765
766        support_subsecond = self.support_subsecond(filename)
767        if support_subsecond:
768            # Timestamp with a resolution of 1 microsecond (10^-6).
769            #
770            # The resolution of the C internal function used by os.utime()
771            # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
772            # test with a resolution of 1 ns requires more work:
773            # see the issue #15745.
774            atime_ns = 1002003000   # 1.002003 seconds
775            mtime_ns = 4005006000   # 4.005006 seconds
776        else:
777            # use a resolution of 1 second
778            atime_ns = 5 * 10**9
779            mtime_ns = 8 * 10**9
780
781        set_time(filename, (atime_ns, mtime_ns))
782        st = os.stat(filename)
783
784        if support_subsecond:
785            self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
786            self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
787        else:
788            self.assertEqual(st.st_atime, atime_ns * 1e-9)
789            self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
790        self.assertEqual(st.st_atime_ns, atime_ns)
791        self.assertEqual(st.st_mtime_ns, mtime_ns)
792
793    def test_utime(self):
794        def set_time(filename, ns):
795            # test the ns keyword parameter
796            os.utime(filename, ns=ns)
797        self._test_utime(set_time)
798
799    @staticmethod
800    def ns_to_sec(ns):
801        # Convert a number of nanosecond (int) to a number of seconds (float).
802        # Round towards infinity by adding 0.5 nanosecond to avoid rounding
803        # issue, os.utime() rounds towards minus infinity.
804        return (ns * 1e-9) + 0.5e-9
805
806    def test_utime_by_indexed(self):
807        # pass times as floating point seconds as the second indexed parameter
808        def set_time(filename, ns):
809            atime_ns, mtime_ns = ns
810            atime = self.ns_to_sec(atime_ns)
811            mtime = self.ns_to_sec(mtime_ns)
812            # test utimensat(timespec), utimes(timeval), utime(utimbuf)
813            # or utime(time_t)
814            os.utime(filename, (atime, mtime))
815        self._test_utime(set_time)
816
817    def test_utime_by_times(self):
818        def set_time(filename, ns):
819            atime_ns, mtime_ns = ns
820            atime = self.ns_to_sec(atime_ns)
821            mtime = self.ns_to_sec(mtime_ns)
822            # test the times keyword parameter
823            os.utime(filename, times=(atime, mtime))
824        self._test_utime(set_time)
825
826    @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
827                         "follow_symlinks support for utime required "
828                         "for this test.")
829    def test_utime_nofollow_symlinks(self):
830        def set_time(filename, ns):
831            # use follow_symlinks=False to test utimensat(timespec)
832            # or lutimes(timeval)
833            os.utime(filename, ns=ns, follow_symlinks=False)
834        self._test_utime(set_time)
835
836    @unittest.skipUnless(os.utime in os.supports_fd,
837                         "fd support for utime required for this test.")
838    def test_utime_fd(self):
839        def set_time(filename, ns):
840            with open(filename, 'wb', 0) as fp:
841                # use a file descriptor to test futimens(timespec)
842                # or futimes(timeval)
843                os.utime(fp.fileno(), ns=ns)
844        self._test_utime(set_time)
845
846    @unittest.skipUnless(os.utime in os.supports_dir_fd,
847                         "dir_fd support for utime required for this test.")
848    def test_utime_dir_fd(self):
849        def set_time(filename, ns):
850            dirname, name = os.path.split(filename)
851            dirfd = os.open(dirname, os.O_RDONLY)
852            try:
853                # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
854                os.utime(name, dir_fd=dirfd, ns=ns)
855            finally:
856                os.close(dirfd)
857        self._test_utime(set_time)
858
859    def test_utime_directory(self):
860        def set_time(filename, ns):
861            # test calling os.utime() on a directory
862            os.utime(filename, ns=ns)
863        self._test_utime(set_time, filename=self.dirname)
864
865    def _test_utime_current(self, set_time):
866        # Get the system clock
867        current = time.time()
868
869        # Call os.utime() to set the timestamp to the current system clock
870        set_time(self.fname)
871
872        if not self.support_subsecond(self.fname):
873            delta = 1.0
874        else:
875            # On Windows, the usual resolution of time.time() is 15.6 ms.
876            # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
877            #
878            # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
879            # also 50 ms on other platforms.
880            delta = 0.050
881        st = os.stat(self.fname)
882        msg = ("st_time=%r, current=%r, dt=%r"
883               % (st.st_mtime, current, st.st_mtime - current))
884        self.assertAlmostEqual(st.st_mtime, current,
885                               delta=delta, msg=msg)
886
887    def test_utime_current(self):
888        def set_time(filename):
889            # Set to the current time in the new way
890            os.utime(self.fname)
891        self._test_utime_current(set_time)
892
893    def test_utime_current_old(self):
894        def set_time(filename):
895            # Set to the current time in the old explicit way.
896            os.utime(self.fname, None)
897        self._test_utime_current(set_time)
898
899    def get_file_system(self, path):
900        if sys.platform == 'win32':
901            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
902            import ctypes
903            kernel32 = ctypes.windll.kernel32
904            buf = ctypes.create_unicode_buffer("", 100)
905            ok = kernel32.GetVolumeInformationW(root, None, 0,
906                                                None, None, None,
907                                                buf, len(buf))
908            if ok:
909                return buf.value
910        # return None if the filesystem is unknown
911
912    def test_large_time(self):
913        # Many filesystems are limited to the year 2038. At least, the test
914        # pass with NTFS filesystem.
915        if self.get_file_system(self.dirname) != "NTFS":
916            self.skipTest("requires NTFS")
917
918        large = 5000000000   # some day in 2128
919        os.utime(self.fname, (large, large))
920        self.assertEqual(os.stat(self.fname).st_mtime, large)
921
922    def test_utime_invalid_arguments(self):
923        # seconds and nanoseconds parameters are mutually exclusive
924        with self.assertRaises(ValueError):
925            os.utime(self.fname, (5, 5), ns=(5, 5))
926        with self.assertRaises(TypeError):
927            os.utime(self.fname, [5, 5])
928        with self.assertRaises(TypeError):
929            os.utime(self.fname, (5,))
930        with self.assertRaises(TypeError):
931            os.utime(self.fname, (5, 5, 5))
932        with self.assertRaises(TypeError):
933            os.utime(self.fname, ns=[5, 5])
934        with self.assertRaises(TypeError):
935            os.utime(self.fname, ns=(5,))
936        with self.assertRaises(TypeError):
937            os.utime(self.fname, ns=(5, 5, 5))
938
939        if os.utime not in os.supports_follow_symlinks:
940            with self.assertRaises(NotImplementedError):
941                os.utime(self.fname, (5, 5), follow_symlinks=False)
942        if os.utime not in os.supports_fd:
943            with open(self.fname, 'wb', 0) as fp:
944                with self.assertRaises(TypeError):
945                    os.utime(fp.fileno(), (5, 5))
946        if os.utime not in os.supports_dir_fd:
947            with self.assertRaises(NotImplementedError):
948                os.utime(self.fname, (5, 5), dir_fd=0)
949
950    @support.cpython_only
951    def test_issue31577(self):
952        # The interpreter shouldn't crash in case utime() received a bad
953        # ns argument.
954        def get_bad_int(divmod_ret_val):
955            class BadInt:
956                def __divmod__(*args):
957                    return divmod_ret_val
958            return BadInt()
959        with self.assertRaises(TypeError):
960            os.utime(self.fname, ns=(get_bad_int(42), 1))
961        with self.assertRaises(TypeError):
962            os.utime(self.fname, ns=(get_bad_int(()), 1))
963        with self.assertRaises(TypeError):
964            os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
965
966
967from test import mapping_tests
968
969class EnvironTests(mapping_tests.BasicTestMappingProtocol):
970    """check that os.environ object conform to mapping protocol"""
971    type2test = None
972
973    def setUp(self):
974        self.__save = dict(os.environ)
975        if os.supports_bytes_environ:
976            self.__saveb = dict(os.environb)
977        for key, value in self._reference().items():
978            os.environ[key] = value
979
980    def tearDown(self):
981        os.environ.clear()
982        os.environ.update(self.__save)
983        if os.supports_bytes_environ:
984            os.environb.clear()
985            os.environb.update(self.__saveb)
986
987    def _reference(self):
988        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
989
990    def _empty_mapping(self):
991        os.environ.clear()
992        return os.environ
993
994    # Bug 1110478
995    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
996                         'requires a shell')
997    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
998    def test_update2(self):
999        os.environ.clear()
1000        os.environ.update(HELLO="World")
1001        with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
1002            value = popen.read().strip()
1003            self.assertEqual(value, "World")
1004
1005    @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
1006                         'requires a shell')
1007    @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
1008    def test_os_popen_iter(self):
1009        with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
1010                      % unix_shell) as popen:
1011            it = iter(popen)
1012            self.assertEqual(next(it), "line1\n")
1013            self.assertEqual(next(it), "line2\n")
1014            self.assertEqual(next(it), "line3\n")
1015            self.assertRaises(StopIteration, next, it)
1016
1017    # Verify environ keys and values from the OS are of the
1018    # correct str type.
1019    def test_keyvalue_types(self):
1020        for key, val in os.environ.items():
1021            self.assertEqual(type(key), str)
1022            self.assertEqual(type(val), str)
1023
1024    def test_items(self):
1025        for key, value in self._reference().items():
1026            self.assertEqual(os.environ.get(key), value)
1027
1028    # Issue 7310
1029    def test___repr__(self):
1030        """Check that the repr() of os.environ looks like environ({...})."""
1031        env = os.environ
1032        self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
1033            '{!r}: {!r}'.format(key, value)
1034            for key, value in env.items())))
1035
1036    def test_get_exec_path(self):
1037        defpath_list = os.defpath.split(os.pathsep)
1038        test_path = ['/monty', '/python', '', '/flying/circus']
1039        test_env = {'PATH': os.pathsep.join(test_path)}
1040
1041        saved_environ = os.environ
1042        try:
1043            os.environ = dict(test_env)
1044            # Test that defaulting to os.environ works.
1045            self.assertSequenceEqual(test_path, os.get_exec_path())
1046            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
1047        finally:
1048            os.environ = saved_environ
1049
1050        # No PATH environment variable
1051        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
1052        # Empty PATH environment variable
1053        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
1054        # Supplied PATH environment variable
1055        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
1056
1057        if os.supports_bytes_environ:
1058            # env cannot contain 'PATH' and b'PATH' keys
1059            try:
1060                # ignore BytesWarning warning
1061                with warnings.catch_warnings(record=True):
1062                    mixed_env = {'PATH': '1', b'PATH': b'2'}
1063            except BytesWarning:
1064                # mixed_env cannot be created with python -bb
1065                pass
1066            else:
1067                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
1068
1069            # bytes key and/or value
1070            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
1071                ['abc'])
1072            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
1073                ['abc'])
1074            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
1075                ['abc'])
1076
1077    @unittest.skipUnless(os.supports_bytes_environ,
1078                         "os.environb required for this test.")
1079    def test_environb(self):
1080        # os.environ -> os.environb
1081        value = 'euro\u20ac'
1082        try:
1083            value_bytes = value.encode(sys.getfilesystemencoding(),
1084                                       'surrogateescape')
1085        except UnicodeEncodeError:
1086            msg = "U+20AC character is not encodable to %s" % (
1087                sys.getfilesystemencoding(),)
1088            self.skipTest(msg)
1089        os.environ['unicode'] = value
1090        self.assertEqual(os.environ['unicode'], value)
1091        self.assertEqual(os.environb[b'unicode'], value_bytes)
1092
1093        # os.environb -> os.environ
1094        value = b'\xff'
1095        os.environb[b'bytes'] = value
1096        self.assertEqual(os.environb[b'bytes'], value)
1097        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
1098        self.assertEqual(os.environ['bytes'], value_str)
1099
1100    def test_putenv_unsetenv(self):
1101        name = "PYTHONTESTVAR"
1102        value = "testvalue"
1103        code = f'import os; print(repr(os.environ.get({name!r})))'
1104
1105        with os_helper.EnvironmentVarGuard() as env:
1106            env.pop(name, None)
1107
1108            os.putenv(name, value)
1109            proc = subprocess.run([sys.executable, '-c', code], check=True,
1110                                  stdout=subprocess.PIPE, text=True)
1111            self.assertEqual(proc.stdout.rstrip(), repr(value))
1112
1113            os.unsetenv(name)
1114            proc = subprocess.run([sys.executable, '-c', code], check=True,
1115                                  stdout=subprocess.PIPE, text=True)
1116            self.assertEqual(proc.stdout.rstrip(), repr(None))
1117
1118    # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
1119    @support.requires_mac_ver(10, 6)
1120    def test_putenv_unsetenv_error(self):
1121        # Empty variable name is invalid.
1122        # "=" and null character are not allowed in a variable name.
1123        for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'):
1124            self.assertRaises((OSError, ValueError), os.putenv, name, "value")
1125            self.assertRaises((OSError, ValueError), os.unsetenv, name)
1126
1127        if sys.platform == "win32":
1128            # On Windows, an environment variable string ("name=value" string)
1129            # is limited to 32,767 characters
1130            longstr = 'x' * 32_768
1131            self.assertRaises(ValueError, os.putenv, longstr, "1")
1132            self.assertRaises(ValueError, os.putenv, "X", longstr)
1133            self.assertRaises(ValueError, os.unsetenv, longstr)
1134
1135    def test_key_type(self):
1136        missing = 'missingkey'
1137        self.assertNotIn(missing, os.environ)
1138
1139        with self.assertRaises(KeyError) as cm:
1140            os.environ[missing]
1141        self.assertIs(cm.exception.args[0], missing)
1142        self.assertTrue(cm.exception.__suppress_context__)
1143
1144        with self.assertRaises(KeyError) as cm:
1145            del os.environ[missing]
1146        self.assertIs(cm.exception.args[0], missing)
1147        self.assertTrue(cm.exception.__suppress_context__)
1148
1149    def _test_environ_iteration(self, collection):
1150        iterator = iter(collection)
1151        new_key = "__new_key__"
1152
1153        next(iterator)  # start iteration over os.environ.items
1154
1155        # add a new key in os.environ mapping
1156        os.environ[new_key] = "test_environ_iteration"
1157
1158        try:
1159            next(iterator)  # force iteration over modified mapping
1160            self.assertEqual(os.environ[new_key], "test_environ_iteration")
1161        finally:
1162            del os.environ[new_key]
1163
1164    def test_iter_error_when_changing_os_environ(self):
1165        self._test_environ_iteration(os.environ)
1166
1167    def test_iter_error_when_changing_os_environ_items(self):
1168        self._test_environ_iteration(os.environ.items())
1169
1170    def test_iter_error_when_changing_os_environ_values(self):
1171        self._test_environ_iteration(os.environ.values())
1172
1173    def _test_underlying_process_env(self, var, expected):
1174        if not (unix_shell and os.path.exists(unix_shell)):
1175            return
1176
1177        with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen:
1178            value = popen.read().strip()
1179
1180        self.assertEqual(expected, value)
1181
1182    def test_or_operator(self):
1183        overridden_key = '_TEST_VAR_'
1184        original_value = 'original_value'
1185        os.environ[overridden_key] = original_value
1186
1187        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1188        expected = dict(os.environ)
1189        expected.update(new_vars_dict)
1190
1191        actual = os.environ | new_vars_dict
1192        self.assertDictEqual(expected, actual)
1193        self.assertEqual('3', actual[overridden_key])
1194
1195        new_vars_items = new_vars_dict.items()
1196        self.assertIs(NotImplemented, os.environ.__or__(new_vars_items))
1197
1198        self._test_underlying_process_env('_A_', '')
1199        self._test_underlying_process_env(overridden_key, original_value)
1200
1201    def test_ior_operator(self):
1202        overridden_key = '_TEST_VAR_'
1203        os.environ[overridden_key] = 'original_value'
1204
1205        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1206        expected = dict(os.environ)
1207        expected.update(new_vars_dict)
1208
1209        os.environ |= new_vars_dict
1210        self.assertEqual(expected, os.environ)
1211        self.assertEqual('3', os.environ[overridden_key])
1212
1213        self._test_underlying_process_env('_A_', '1')
1214        self._test_underlying_process_env(overridden_key, '3')
1215
1216    def test_ior_operator_invalid_dicts(self):
1217        os_environ_copy = os.environ.copy()
1218        with self.assertRaises(TypeError):
1219            dict_with_bad_key = {1: '_A_'}
1220            os.environ |= dict_with_bad_key
1221
1222        with self.assertRaises(TypeError):
1223            dict_with_bad_val = {'_A_': 1}
1224            os.environ |= dict_with_bad_val
1225
1226        # Check nothing was added.
1227        self.assertEqual(os_environ_copy, os.environ)
1228
1229    def test_ior_operator_key_value_iterable(self):
1230        overridden_key = '_TEST_VAR_'
1231        os.environ[overridden_key] = 'original_value'
1232
1233        new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3'))
1234        expected = dict(os.environ)
1235        expected.update(new_vars_items)
1236
1237        os.environ |= new_vars_items
1238        self.assertEqual(expected, os.environ)
1239        self.assertEqual('3', os.environ[overridden_key])
1240
1241        self._test_underlying_process_env('_A_', '1')
1242        self._test_underlying_process_env(overridden_key, '3')
1243
1244    def test_ror_operator(self):
1245        overridden_key = '_TEST_VAR_'
1246        original_value = 'original_value'
1247        os.environ[overridden_key] = original_value
1248
1249        new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'}
1250        expected = dict(new_vars_dict)
1251        expected.update(os.environ)
1252
1253        actual = new_vars_dict | os.environ
1254        self.assertDictEqual(expected, actual)
1255        self.assertEqual(original_value, actual[overridden_key])
1256
1257        new_vars_items = new_vars_dict.items()
1258        self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items))
1259
1260        self._test_underlying_process_env('_A_', '')
1261        self._test_underlying_process_env(overridden_key, original_value)
1262
1263
1264class WalkTests(unittest.TestCase):
1265    """Tests for os.walk()."""
1266
1267    # Wrapper to hide minor differences between os.walk and os.fwalk
1268    # to tests both functions with the same code base
1269    def walk(self, top, **kwargs):
1270        if 'follow_symlinks' in kwargs:
1271            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1272        return os.walk(top, **kwargs)
1273
1274    def setUp(self):
1275        join = os.path.join
1276        self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
1277
1278        # Build:
1279        #     TESTFN/
1280        #       TEST1/              a file kid and two directory kids
1281        #         tmp1
1282        #         SUB1/             a file kid and a directory kid
1283        #           tmp2
1284        #           SUB11/          no kids
1285        #         SUB2/             a file kid and a dirsymlink kid
1286        #           tmp3
1287        #           SUB21/          not readable
1288        #             tmp5
1289        #           link/           a symlink to TESTFN.2
1290        #           broken_link
1291        #           broken_link2
1292        #           broken_link3
1293        #       TEST2/
1294        #         tmp4              a lone file
1295        self.walk_path = join(os_helper.TESTFN, "TEST1")
1296        self.sub1_path = join(self.walk_path, "SUB1")
1297        self.sub11_path = join(self.sub1_path, "SUB11")
1298        sub2_path = join(self.walk_path, "SUB2")
1299        sub21_path = join(sub2_path, "SUB21")
1300        tmp1_path = join(self.walk_path, "tmp1")
1301        tmp2_path = join(self.sub1_path, "tmp2")
1302        tmp3_path = join(sub2_path, "tmp3")
1303        tmp5_path = join(sub21_path, "tmp3")
1304        self.link_path = join(sub2_path, "link")
1305        t2_path = join(os_helper.TESTFN, "TEST2")
1306        tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
1307        broken_link_path = join(sub2_path, "broken_link")
1308        broken_link2_path = join(sub2_path, "broken_link2")
1309        broken_link3_path = join(sub2_path, "broken_link3")
1310
1311        # Create stuff.
1312        os.makedirs(self.sub11_path)
1313        os.makedirs(sub2_path)
1314        os.makedirs(sub21_path)
1315        os.makedirs(t2_path)
1316
1317        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
1318            with open(path, "x", encoding='utf-8') as f:
1319                f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
1320
1321        if os_helper.can_symlink():
1322            os.symlink(os.path.abspath(t2_path), self.link_path)
1323            os.symlink('broken', broken_link_path, True)
1324            os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1325            os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
1326            self.sub2_tree = (sub2_path, ["SUB21", "link"],
1327                              ["broken_link", "broken_link2", "broken_link3",
1328                               "tmp3"])
1329        else:
1330            self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
1331
1332        os.chmod(sub21_path, 0)
1333        try:
1334            os.listdir(sub21_path)
1335        except PermissionError:
1336            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1337        else:
1338            os.chmod(sub21_path, stat.S_IRWXU)
1339            os.unlink(tmp5_path)
1340            os.rmdir(sub21_path)
1341            del self.sub2_tree[1][:1]
1342
1343    def test_walk_topdown(self):
1344        # Walk top-down.
1345        all = list(self.walk(self.walk_path))
1346
1347        self.assertEqual(len(all), 4)
1348        # We can't know which order SUB1 and SUB2 will appear in.
1349        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
1350        #     flipped:  TESTFN, SUB2, SUB1, SUB11
1351        flipped = all[0][1][0] != "SUB1"
1352        all[0][1].sort()
1353        all[3 - 2 * flipped][-1].sort()
1354        all[3 - 2 * flipped][1].sort()
1355        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1356        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1357        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1358        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
1359
1360    def test_walk_prune(self, walk_path=None):
1361        if walk_path is None:
1362            walk_path = self.walk_path
1363        # Prune the search.
1364        all = []
1365        for root, dirs, files in self.walk(walk_path):
1366            all.append((root, dirs, files))
1367            # Don't descend into SUB1.
1368            if 'SUB1' in dirs:
1369                # Note that this also mutates the dirs we appended to all!
1370                dirs.remove('SUB1')
1371
1372        self.assertEqual(len(all), 2)
1373        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
1374
1375        all[1][-1].sort()
1376        all[1][1].sort()
1377        self.assertEqual(all[1], self.sub2_tree)
1378
1379    def test_file_like_path(self):
1380        self.test_walk_prune(FakePath(self.walk_path))
1381
1382    def test_walk_bottom_up(self):
1383        # Walk bottom-up.
1384        all = list(self.walk(self.walk_path, topdown=False))
1385
1386        self.assertEqual(len(all), 4, all)
1387        # We can't know which order SUB1 and SUB2 will appear in.
1388        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
1389        #     flipped:  SUB2, SUB11, SUB1, TESTFN
1390        flipped = all[3][1][0] != "SUB1"
1391        all[3][1].sort()
1392        all[2 - 2 * flipped][-1].sort()
1393        all[2 - 2 * flipped][1].sort()
1394        self.assertEqual(all[3],
1395                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1396        self.assertEqual(all[flipped],
1397                         (self.sub11_path, [], []))
1398        self.assertEqual(all[flipped + 1],
1399                         (self.sub1_path, ["SUB11"], ["tmp2"]))
1400        self.assertEqual(all[2 - 2 * flipped],
1401                         self.sub2_tree)
1402
1403    def test_walk_symlink(self):
1404        if not os_helper.can_symlink():
1405            self.skipTest("need symlink support")
1406
1407        # Walk, following symlinks.
1408        walk_it = self.walk(self.walk_path, follow_symlinks=True)
1409        for root, dirs, files in walk_it:
1410            if root == self.link_path:
1411                self.assertEqual(dirs, [])
1412                self.assertEqual(files, ["tmp4"])
1413                break
1414        else:
1415            self.fail("Didn't follow symlink with followlinks=True")
1416
1417    def test_walk_bad_dir(self):
1418        # Walk top-down.
1419        errors = []
1420        walk_it = self.walk(self.walk_path, onerror=errors.append)
1421        root, dirs, files = next(walk_it)
1422        self.assertEqual(errors, [])
1423        dir1 = 'SUB1'
1424        path1 = os.path.join(root, dir1)
1425        path1new = os.path.join(root, dir1 + '.new')
1426        os.rename(path1, path1new)
1427        try:
1428            roots = [r for r, d, f in walk_it]
1429            self.assertTrue(errors)
1430            self.assertNotIn(path1, roots)
1431            self.assertNotIn(path1new, roots)
1432            for dir2 in dirs:
1433                if dir2 != dir1:
1434                    self.assertIn(os.path.join(root, dir2), roots)
1435        finally:
1436            os.rename(path1new, path1)
1437
1438    def test_walk_many_open_files(self):
1439        depth = 30
1440        base = os.path.join(os_helper.TESTFN, 'deep')
1441        p = os.path.join(base, *(['d']*depth))
1442        os.makedirs(p)
1443
1444        iters = [self.walk(base, topdown=False) for j in range(100)]
1445        for i in range(depth + 1):
1446            expected = (p, ['d'] if i else [], [])
1447            for it in iters:
1448                self.assertEqual(next(it), expected)
1449            p = os.path.dirname(p)
1450
1451        iters = [self.walk(base, topdown=True) for j in range(100)]
1452        p = base
1453        for i in range(depth + 1):
1454            expected = (p, ['d'] if i < depth else [], [])
1455            for it in iters:
1456                self.assertEqual(next(it), expected)
1457            p = os.path.join(p, 'd')
1458
1459
1460@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1461class FwalkTests(WalkTests):
1462    """Tests for os.fwalk()."""
1463
1464    def walk(self, top, **kwargs):
1465        for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
1466            yield (root, dirs, files)
1467
1468    def fwalk(self, *args, **kwargs):
1469        return os.fwalk(*args, **kwargs)
1470
1471    def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1472        """
1473        compare with walk() results.
1474        """
1475        walk_kwargs = walk_kwargs.copy()
1476        fwalk_kwargs = fwalk_kwargs.copy()
1477        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1478            walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1479            fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
1480
1481            expected = {}
1482            for root, dirs, files in os.walk(**walk_kwargs):
1483                expected[root] = (set(dirs), set(files))
1484
1485            for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
1486                self.assertIn(root, expected)
1487                self.assertEqual(expected[root], (set(dirs), set(files)))
1488
1489    def test_compare_to_walk(self):
1490        kwargs = {'top': os_helper.TESTFN}
1491        self._compare_to_walk(kwargs, kwargs)
1492
1493    def test_dir_fd(self):
1494        try:
1495            fd = os.open(".", os.O_RDONLY)
1496            walk_kwargs = {'top': os_helper.TESTFN}
1497            fwalk_kwargs = walk_kwargs.copy()
1498            fwalk_kwargs['dir_fd'] = fd
1499            self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1500        finally:
1501            os.close(fd)
1502
1503    def test_yields_correct_dir_fd(self):
1504        # check returned file descriptors
1505        for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1506            args = os_helper.TESTFN, topdown, None
1507            for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
1508                # check that the FD is valid
1509                os.fstat(rootfd)
1510                # redundant check
1511                os.stat(rootfd)
1512                # check that listdir() returns consistent information
1513                self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
1514
1515    def test_fd_leak(self):
1516        # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1517        # we both check that calling fwalk() a large number of times doesn't
1518        # yield EMFILE, and that the minimum allocated FD hasn't changed.
1519        minfd = os.dup(1)
1520        os.close(minfd)
1521        for i in range(256):
1522            for x in self.fwalk(os_helper.TESTFN):
1523                pass
1524        newfd = os.dup(1)
1525        self.addCleanup(os.close, newfd)
1526        self.assertEqual(newfd, minfd)
1527
1528    # fwalk() keeps file descriptors open
1529    test_walk_many_open_files = None
1530
1531
1532class BytesWalkTests(WalkTests):
1533    """Tests for os.walk() with bytes."""
1534    def walk(self, top, **kwargs):
1535        if 'follow_symlinks' in kwargs:
1536            kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1537        for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1538            root = os.fsdecode(broot)
1539            dirs = list(map(os.fsdecode, bdirs))
1540            files = list(map(os.fsdecode, bfiles))
1541            yield (root, dirs, files)
1542            bdirs[:] = list(map(os.fsencode, dirs))
1543            bfiles[:] = list(map(os.fsencode, files))
1544
1545@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1546class BytesFwalkTests(FwalkTests):
1547    """Tests for os.walk() with bytes."""
1548    def fwalk(self, top='.', *args, **kwargs):
1549        for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1550            root = os.fsdecode(broot)
1551            dirs = list(map(os.fsdecode, bdirs))
1552            files = list(map(os.fsdecode, bfiles))
1553            yield (root, dirs, files, topfd)
1554            bdirs[:] = list(map(os.fsencode, dirs))
1555            bfiles[:] = list(map(os.fsencode, files))
1556
1557
1558class MakedirTests(unittest.TestCase):
1559    def setUp(self):
1560        os.mkdir(os_helper.TESTFN)
1561
1562    def test_makedir(self):
1563        base = os_helper.TESTFN
1564        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1565        os.makedirs(path)             # Should work
1566        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1567        os.makedirs(path)
1568
1569        # Try paths with a '.' in them
1570        self.assertRaises(OSError, os.makedirs, os.curdir)
1571        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1572        os.makedirs(path)
1573        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1574                            'dir5', 'dir6')
1575        os.makedirs(path)
1576
1577    def test_mode(self):
1578        with os_helper.temp_umask(0o002):
1579            base = os_helper.TESTFN
1580            parent = os.path.join(base, 'dir1')
1581            path = os.path.join(parent, 'dir2')
1582            os.makedirs(path, 0o555)
1583            self.assertTrue(os.path.exists(path))
1584            self.assertTrue(os.path.isdir(path))
1585            if os.name != 'nt':
1586                self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1587                self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
1588
1589    def test_exist_ok_existing_directory(self):
1590        path = os.path.join(os_helper.TESTFN, 'dir1')
1591        mode = 0o777
1592        old_mask = os.umask(0o022)
1593        os.makedirs(path, mode)
1594        self.assertRaises(OSError, os.makedirs, path, mode)
1595        self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
1596        os.makedirs(path, 0o776, exist_ok=True)
1597        os.makedirs(path, mode=mode, exist_ok=True)
1598        os.umask(old_mask)
1599
1600        # Issue #25583: A drive root could raise PermissionError on Windows
1601        os.makedirs(os.path.abspath('/'), exist_ok=True)
1602
1603    def test_exist_ok_s_isgid_directory(self):
1604        path = os.path.join(os_helper.TESTFN, 'dir1')
1605        S_ISGID = stat.S_ISGID
1606        mode = 0o777
1607        old_mask = os.umask(0o022)
1608        try:
1609            existing_testfn_mode = stat.S_IMODE(
1610                    os.lstat(os_helper.TESTFN).st_mode)
1611            try:
1612                os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID)
1613            except PermissionError:
1614                raise unittest.SkipTest('Cannot set S_ISGID for dir.')
1615            if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID):
1616                raise unittest.SkipTest('No support for S_ISGID dir mode.')
1617            # The os should apply S_ISGID from the parent dir for us, but
1618            # this test need not depend on that behavior.  Be explicit.
1619            os.makedirs(path, mode | S_ISGID)
1620            # http://bugs.python.org/issue14992
1621            # Should not fail when the bit is already set.
1622            os.makedirs(path, mode, exist_ok=True)
1623            # remove the bit.
1624            os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
1625            # May work even when the bit is not already set when demanded.
1626            os.makedirs(path, mode | S_ISGID, exist_ok=True)
1627        finally:
1628            os.umask(old_mask)
1629
1630    def test_exist_ok_existing_regular_file(self):
1631        base = os_helper.TESTFN
1632        path = os.path.join(os_helper.TESTFN, 'dir1')
1633        with open(path, 'w', encoding='utf-8') as f:
1634            f.write('abc')
1635        self.assertRaises(OSError, os.makedirs, path)
1636        self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1637        self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1638        os.remove(path)
1639
1640    def tearDown(self):
1641        path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3',
1642                            'dir4', 'dir5', 'dir6')
1643        # If the tests failed, the bottom-most directory ('../dir6')
1644        # may not have been created, so we look for the outermost directory
1645        # that exists.
1646        while not os.path.exists(path) and path != os_helper.TESTFN:
1647            path = os.path.dirname(path)
1648
1649        os.removedirs(path)
1650
1651
1652@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1653class ChownFileTests(unittest.TestCase):
1654
1655    @classmethod
1656    def setUpClass(cls):
1657        os.mkdir(os_helper.TESTFN)
1658
1659    def test_chown_uid_gid_arguments_must_be_index(self):
1660        stat = os.stat(os_helper.TESTFN)
1661        uid = stat.st_uid
1662        gid = stat.st_gid
1663        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1664            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid)
1665            self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value)
1666        self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid))
1667        self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1))
1668
1669    @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups')
1670    def test_chown_gid(self):
1671        groups = os.getgroups()
1672        if len(groups) < 2:
1673            self.skipTest("test needs at least 2 groups")
1674
1675        gid_1, gid_2 = groups[:2]
1676        uid = os.stat(os_helper.TESTFN).st_uid
1677
1678        os.chown(os_helper.TESTFN, uid, gid_1)
1679        gid = os.stat(os_helper.TESTFN).st_gid
1680        self.assertEqual(gid, gid_1)
1681
1682        os.chown(os_helper.TESTFN, uid, gid_2)
1683        gid = os.stat(os_helper.TESTFN).st_gid
1684        self.assertEqual(gid, gid_2)
1685
1686    @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1687                         "test needs root privilege and more than one user")
1688    def test_chown_with_root(self):
1689        uid_1, uid_2 = all_users[:2]
1690        gid = os.stat(os_helper.TESTFN).st_gid
1691        os.chown(os_helper.TESTFN, uid_1, gid)
1692        uid = os.stat(os_helper.TESTFN).st_uid
1693        self.assertEqual(uid, uid_1)
1694        os.chown(os_helper.TESTFN, uid_2, gid)
1695        uid = os.stat(os_helper.TESTFN).st_uid
1696        self.assertEqual(uid, uid_2)
1697
1698    @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1699                         "test needs non-root account and more than one user")
1700    def test_chown_without_permission(self):
1701        uid_1, uid_2 = all_users[:2]
1702        gid = os.stat(os_helper.TESTFN).st_gid
1703        with self.assertRaises(PermissionError):
1704            os.chown(os_helper.TESTFN, uid_1, gid)
1705            os.chown(os_helper.TESTFN, uid_2, gid)
1706
1707    @classmethod
1708    def tearDownClass(cls):
1709        os.rmdir(os_helper.TESTFN)
1710
1711
1712class RemoveDirsTests(unittest.TestCase):
1713    def setUp(self):
1714        os.makedirs(os_helper.TESTFN)
1715
1716    def tearDown(self):
1717        os_helper.rmtree(os_helper.TESTFN)
1718
1719    def test_remove_all(self):
1720        dira = os.path.join(os_helper.TESTFN, 'dira')
1721        os.mkdir(dira)
1722        dirb = os.path.join(dira, 'dirb')
1723        os.mkdir(dirb)
1724        os.removedirs(dirb)
1725        self.assertFalse(os.path.exists(dirb))
1726        self.assertFalse(os.path.exists(dira))
1727        self.assertFalse(os.path.exists(os_helper.TESTFN))
1728
1729    def test_remove_partial(self):
1730        dira = os.path.join(os_helper.TESTFN, 'dira')
1731        os.mkdir(dira)
1732        dirb = os.path.join(dira, 'dirb')
1733        os.mkdir(dirb)
1734        create_file(os.path.join(dira, 'file.txt'))
1735        os.removedirs(dirb)
1736        self.assertFalse(os.path.exists(dirb))
1737        self.assertTrue(os.path.exists(dira))
1738        self.assertTrue(os.path.exists(os_helper.TESTFN))
1739
1740    def test_remove_nothing(self):
1741        dira = os.path.join(os_helper.TESTFN, 'dira')
1742        os.mkdir(dira)
1743        dirb = os.path.join(dira, 'dirb')
1744        os.mkdir(dirb)
1745        create_file(os.path.join(dirb, 'file.txt'))
1746        with self.assertRaises(OSError):
1747            os.removedirs(dirb)
1748        self.assertTrue(os.path.exists(dirb))
1749        self.assertTrue(os.path.exists(dira))
1750        self.assertTrue(os.path.exists(os_helper.TESTFN))
1751
1752
1753class DevNullTests(unittest.TestCase):
1754    def test_devnull(self):
1755        with open(os.devnull, 'wb', 0) as f:
1756            f.write(b'hello')
1757            f.close()
1758        with open(os.devnull, 'rb') as f:
1759            self.assertEqual(f.read(), b'')
1760
1761
1762class URandomTests(unittest.TestCase):
1763    def test_urandom_length(self):
1764        self.assertEqual(len(os.urandom(0)), 0)
1765        self.assertEqual(len(os.urandom(1)), 1)
1766        self.assertEqual(len(os.urandom(10)), 10)
1767        self.assertEqual(len(os.urandom(100)), 100)
1768        self.assertEqual(len(os.urandom(1000)), 1000)
1769
1770    def test_urandom_value(self):
1771        data1 = os.urandom(16)
1772        self.assertIsInstance(data1, bytes)
1773        data2 = os.urandom(16)
1774        self.assertNotEqual(data1, data2)
1775
1776    def get_urandom_subprocess(self, count):
1777        code = '\n'.join((
1778            'import os, sys',
1779            'data = os.urandom(%s)' % count,
1780            'sys.stdout.buffer.write(data)',
1781            'sys.stdout.buffer.flush()'))
1782        out = assert_python_ok('-c', code)
1783        stdout = out[1]
1784        self.assertEqual(len(stdout), count)
1785        return stdout
1786
1787    def test_urandom_subprocess(self):
1788        data1 = self.get_urandom_subprocess(16)
1789        data2 = self.get_urandom_subprocess(16)
1790        self.assertNotEqual(data1, data2)
1791
1792
1793@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1794class GetRandomTests(unittest.TestCase):
1795    @classmethod
1796    def setUpClass(cls):
1797        try:
1798            os.getrandom(1)
1799        except OSError as exc:
1800            if exc.errno == errno.ENOSYS:
1801                # Python compiled on a more recent Linux version
1802                # than the current Linux kernel
1803                raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1804            else:
1805                raise
1806
1807    def test_getrandom_type(self):
1808        data = os.getrandom(16)
1809        self.assertIsInstance(data, bytes)
1810        self.assertEqual(len(data), 16)
1811
1812    def test_getrandom0(self):
1813        empty = os.getrandom(0)
1814        self.assertEqual(empty, b'')
1815
1816    def test_getrandom_random(self):
1817        self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1818
1819        # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1820        # resource /dev/random
1821
1822    def test_getrandom_nonblock(self):
1823        # The call must not fail. Check also that the flag exists
1824        try:
1825            os.getrandom(1, os.GRND_NONBLOCK)
1826        except BlockingIOError:
1827            # System urandom is not initialized yet
1828            pass
1829
1830    def test_getrandom_value(self):
1831        data1 = os.getrandom(16)
1832        data2 = os.getrandom(16)
1833        self.assertNotEqual(data1, data2)
1834
1835
1836# os.urandom() doesn't use a file descriptor when it is implemented with the
1837# getentropy() function, the getrandom() function or the getrandom() syscall
1838OS_URANDOM_DONT_USE_FD = (
1839    sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1840    or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1841    or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
1842
1843@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1844                 "os.random() does not use a file descriptor")
1845@unittest.skipIf(sys.platform == "vxworks",
1846                 "VxWorks can't set RLIMIT_NOFILE to 1")
1847class URandomFDTests(unittest.TestCase):
1848    @unittest.skipUnless(resource, "test requires the resource module")
1849    def test_urandom_failure(self):
1850        # Check urandom() failing when it is not able to open /dev/random.
1851        # We spawn a new process to make the test more robust (if getrlimit()
1852        # failed to restore the file descriptor limit after this, the whole
1853        # test suite would crash; this actually happened on the OS X Tiger
1854        # buildbot).
1855        code = """if 1:
1856            import errno
1857            import os
1858            import resource
1859
1860            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1861            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1862            try:
1863                os.urandom(16)
1864            except OSError as e:
1865                assert e.errno == errno.EMFILE, e.errno
1866            else:
1867                raise AssertionError("OSError not raised")
1868            """
1869        assert_python_ok('-c', code)
1870
1871    def test_urandom_fd_closed(self):
1872        # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1873        # closed.
1874        code = """if 1:
1875            import os
1876            import sys
1877            import test.support
1878            os.urandom(4)
1879            with test.support.SuppressCrashReport():
1880                os.closerange(3, 256)
1881            sys.stdout.buffer.write(os.urandom(4))
1882            """
1883        rc, out, err = assert_python_ok('-Sc', code)
1884
1885    def test_urandom_fd_reopened(self):
1886        # Issue #21207: urandom() should detect its fd to /dev/urandom
1887        # changed to something else, and reopen it.
1888        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1889        create_file(os_helper.TESTFN, b"x" * 256)
1890
1891        code = """if 1:
1892            import os
1893            import sys
1894            import test.support
1895            os.urandom(4)
1896            with test.support.SuppressCrashReport():
1897                for fd in range(3, 256):
1898                    try:
1899                        os.close(fd)
1900                    except OSError:
1901                        pass
1902                    else:
1903                        # Found the urandom fd (XXX hopefully)
1904                        break
1905                os.closerange(3, 256)
1906            with open({TESTFN!r}, 'rb') as f:
1907                new_fd = f.fileno()
1908                # Issue #26935: posix allows new_fd and fd to be equal but
1909                # some libc implementations have dup2 return an error in this
1910                # case.
1911                if new_fd != fd:
1912                    os.dup2(new_fd, fd)
1913                sys.stdout.buffer.write(os.urandom(4))
1914                sys.stdout.buffer.write(os.urandom(4))
1915            """.format(TESTFN=os_helper.TESTFN)
1916        rc, out, err = assert_python_ok('-Sc', code)
1917        self.assertEqual(len(out), 8)
1918        self.assertNotEqual(out[0:4], out[4:8])
1919        rc, out2, err2 = assert_python_ok('-Sc', code)
1920        self.assertEqual(len(out2), 8)
1921        self.assertNotEqual(out2, out)
1922
1923
1924@contextlib.contextmanager
1925def _execvpe_mockup(defpath=None):
1926    """
1927    Stubs out execv and execve functions when used as context manager.
1928    Records exec calls. The mock execv and execve functions always raise an
1929    exception as they would normally never return.
1930    """
1931    # A list of tuples containing (function name, first arg, args)
1932    # of calls to execv or execve that have been made.
1933    calls = []
1934
1935    def mock_execv(name, *args):
1936        calls.append(('execv', name, args))
1937        raise RuntimeError("execv called")
1938
1939    def mock_execve(name, *args):
1940        calls.append(('execve', name, args))
1941        raise OSError(errno.ENOTDIR, "execve called")
1942
1943    try:
1944        orig_execv = os.execv
1945        orig_execve = os.execve
1946        orig_defpath = os.defpath
1947        os.execv = mock_execv
1948        os.execve = mock_execve
1949        if defpath is not None:
1950            os.defpath = defpath
1951        yield calls
1952    finally:
1953        os.execv = orig_execv
1954        os.execve = orig_execve
1955        os.defpath = orig_defpath
1956
1957@unittest.skipUnless(hasattr(os, 'execv'),
1958                     "need os.execv()")
1959class ExecTests(unittest.TestCase):
1960    @unittest.skipIf(USING_LINUXTHREADS,
1961                     "avoid triggering a linuxthreads bug: see issue #4970")
1962    def test_execvpe_with_bad_program(self):
1963        self.assertRaises(OSError, os.execvpe, 'no such app-',
1964                          ['no such app-'], None)
1965
1966    def test_execv_with_bad_arglist(self):
1967        self.assertRaises(ValueError, os.execv, 'notepad', ())
1968        self.assertRaises(ValueError, os.execv, 'notepad', [])
1969        self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1970        self.assertRaises(ValueError, os.execv, 'notepad', [''])
1971
1972    def test_execvpe_with_bad_arglist(self):
1973        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1974        self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1975        self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
1976
1977    @unittest.skipUnless(hasattr(os, '_execvpe'),
1978                         "No internal os._execvpe function to test.")
1979    def _test_internal_execvpe(self, test_type):
1980        program_path = os.sep + 'absolutepath'
1981        if test_type is bytes:
1982            program = b'executable'
1983            fullpath = os.path.join(os.fsencode(program_path), program)
1984            native_fullpath = fullpath
1985            arguments = [b'progname', 'arg1', 'arg2']
1986        else:
1987            program = 'executable'
1988            arguments = ['progname', 'arg1', 'arg2']
1989            fullpath = os.path.join(program_path, program)
1990            if os.name != "nt":
1991                native_fullpath = os.fsencode(fullpath)
1992            else:
1993                native_fullpath = fullpath
1994        env = {'spam': 'beans'}
1995
1996        # test os._execvpe() with an absolute path
1997        with _execvpe_mockup() as calls:
1998            self.assertRaises(RuntimeError,
1999                os._execvpe, fullpath, arguments)
2000            self.assertEqual(len(calls), 1)
2001            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
2002
2003        # test os._execvpe() with a relative path:
2004        # os.get_exec_path() returns defpath
2005        with _execvpe_mockup(defpath=program_path) as calls:
2006            self.assertRaises(OSError,
2007                os._execvpe, program, arguments, env=env)
2008            self.assertEqual(len(calls), 1)
2009            self.assertSequenceEqual(calls[0],
2010                ('execve', native_fullpath, (arguments, env)))
2011
2012        # test os._execvpe() with a relative path:
2013        # os.get_exec_path() reads the 'PATH' variable
2014        with _execvpe_mockup() as calls:
2015            env_path = env.copy()
2016            if test_type is bytes:
2017                env_path[b'PATH'] = program_path
2018            else:
2019                env_path['PATH'] = program_path
2020            self.assertRaises(OSError,
2021                os._execvpe, program, arguments, env=env_path)
2022            self.assertEqual(len(calls), 1)
2023            self.assertSequenceEqual(calls[0],
2024                ('execve', native_fullpath, (arguments, env_path)))
2025
2026    def test_internal_execvpe_str(self):
2027        self._test_internal_execvpe(str)
2028        if os.name != "nt":
2029            self._test_internal_execvpe(bytes)
2030
2031    def test_execve_invalid_env(self):
2032        args = [sys.executable, '-c', 'pass']
2033
2034        # null character in the environment variable name
2035        newenv = os.environ.copy()
2036        newenv["FRUIT\0VEGETABLE"] = "cabbage"
2037        with self.assertRaises(ValueError):
2038            os.execve(args[0], args, newenv)
2039
2040        # null character in the environment variable value
2041        newenv = os.environ.copy()
2042        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2043        with self.assertRaises(ValueError):
2044            os.execve(args[0], args, newenv)
2045
2046        # equal character in the environment variable name
2047        newenv = os.environ.copy()
2048        newenv["FRUIT=ORANGE"] = "lemon"
2049        with self.assertRaises(ValueError):
2050            os.execve(args[0], args, newenv)
2051
2052    @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
2053    def test_execve_with_empty_path(self):
2054        # bpo-32890: Check GetLastError() misuse
2055        try:
2056            os.execve('', ['arg'], {})
2057        except OSError as e:
2058            self.assertTrue(e.winerror is None or e.winerror != 0)
2059        else:
2060            self.fail('No OSError raised')
2061
2062
2063@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2064class Win32ErrorTests(unittest.TestCase):
2065    def setUp(self):
2066        try:
2067            os.stat(os_helper.TESTFN)
2068        except FileNotFoundError:
2069            exists = False
2070        except OSError as exc:
2071            exists = True
2072            self.fail("file %s must not exist; os.stat failed with %s"
2073                      % (os_helper.TESTFN, exc))
2074        else:
2075            self.fail("file %s must not exist" % os_helper.TESTFN)
2076
2077    def test_rename(self):
2078        self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
2079
2080    def test_remove(self):
2081        self.assertRaises(OSError, os.remove, os_helper.TESTFN)
2082
2083    def test_chdir(self):
2084        self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
2085
2086    def test_mkdir(self):
2087        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
2088
2089        with open(os_helper.TESTFN, "x") as f:
2090            self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
2091
2092    def test_utime(self):
2093        self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
2094
2095    def test_chmod(self):
2096        self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
2097
2098
2099class TestInvalidFD(unittest.TestCase):
2100    singles = ["fchdir", "dup", "fdatasync", "fstat",
2101               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
2102    #singles.append("close")
2103    #We omit close because it doesn't raise an exception on some platforms
2104    def get_single(f):
2105        def helper(self):
2106            if  hasattr(os, f):
2107                self.check(getattr(os, f))
2108        return helper
2109    for f in singles:
2110        locals()["test_"+f] = get_single(f)
2111
2112    def check(self, f, *args, **kwargs):
2113        try:
2114            f(os_helper.make_bad_fd(), *args, **kwargs)
2115        except OSError as e:
2116            self.assertEqual(e.errno, errno.EBADF)
2117        else:
2118            self.fail("%r didn't raise an OSError with a bad file descriptor"
2119                      % f)
2120
2121    def test_fdopen(self):
2122        self.check(os.fdopen, encoding="utf-8")
2123
2124    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
2125    def test_isatty(self):
2126        self.assertEqual(os.isatty(os_helper.make_bad_fd()), False)
2127
2128    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
2129    def test_closerange(self):
2130        fd = os_helper.make_bad_fd()
2131        # Make sure none of the descriptors we are about to close are
2132        # currently valid (issue 6542).
2133        for i in range(10):
2134            try: os.fstat(fd+i)
2135            except OSError:
2136                pass
2137            else:
2138                break
2139        if i < 2:
2140            raise unittest.SkipTest(
2141                "Unable to acquire a range of invalid file descriptors")
2142        self.assertEqual(os.closerange(fd, fd + i-1), None)
2143
2144    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
2145    def test_dup2(self):
2146        self.check(os.dup2, 20)
2147
2148    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
2149    def test_fchmod(self):
2150        self.check(os.fchmod, 0)
2151
2152    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
2153    def test_fchown(self):
2154        self.check(os.fchown, -1, -1)
2155
2156    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
2157    def test_fpathconf(self):
2158        self.check(os.pathconf, "PC_NAME_MAX")
2159        self.check(os.fpathconf, "PC_NAME_MAX")
2160
2161    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
2162    def test_ftruncate(self):
2163        self.check(os.truncate, 0)
2164        self.check(os.ftruncate, 0)
2165
2166    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
2167    def test_lseek(self):
2168        self.check(os.lseek, 0, 0)
2169
2170    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
2171    def test_read(self):
2172        self.check(os.read, 1)
2173
2174    @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
2175    def test_readv(self):
2176        buf = bytearray(10)
2177        self.check(os.readv, [buf])
2178
2179    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
2180    def test_tcsetpgrpt(self):
2181        self.check(os.tcsetpgrp, 0)
2182
2183    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
2184    def test_write(self):
2185        self.check(os.write, b" ")
2186
2187    @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
2188    def test_writev(self):
2189        self.check(os.writev, [b'abc'])
2190
2191    def test_inheritable(self):
2192        self.check(os.get_inheritable)
2193        self.check(os.set_inheritable, True)
2194
2195    @unittest.skipUnless(hasattr(os, 'get_blocking'),
2196                         'needs os.get_blocking() and os.set_blocking()')
2197    def test_blocking(self):
2198        self.check(os.get_blocking)
2199        self.check(os.set_blocking, True)
2200
2201
2202class LinkTests(unittest.TestCase):
2203    def setUp(self):
2204        self.file1 = os_helper.TESTFN
2205        self.file2 = os.path.join(os_helper.TESTFN + "2")
2206
2207    def tearDown(self):
2208        for file in (self.file1, self.file2):
2209            if os.path.exists(file):
2210                os.unlink(file)
2211
2212    def _test_link(self, file1, file2):
2213        create_file(file1)
2214
2215        try:
2216            os.link(file1, file2)
2217        except PermissionError as e:
2218            self.skipTest('os.link(): %s' % e)
2219        with open(file1, "rb") as f1, open(file2, "rb") as f2:
2220            self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
2221
2222    def test_link(self):
2223        self._test_link(self.file1, self.file2)
2224
2225    def test_link_bytes(self):
2226        self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
2227                        bytes(self.file2, sys.getfilesystemencoding()))
2228
2229    def test_unicode_name(self):
2230        try:
2231            os.fsencode("\xf1")
2232        except UnicodeError:
2233            raise unittest.SkipTest("Unable to encode for this platform.")
2234
2235        self.file1 += "\xf1"
2236        self.file2 = self.file1 + "2"
2237        self._test_link(self.file1, self.file2)
2238
2239@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2240class PosixUidGidTests(unittest.TestCase):
2241    # uid_t and gid_t are 32-bit unsigned integers on Linux
2242    UID_OVERFLOW = (1 << 32)
2243    GID_OVERFLOW = (1 << 32)
2244
2245    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
2246    def test_setuid(self):
2247        if os.getuid() != 0:
2248            self.assertRaises(OSError, os.setuid, 0)
2249        self.assertRaises(TypeError, os.setuid, 'not an int')
2250        self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
2251
2252    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
2253    def test_setgid(self):
2254        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2255            self.assertRaises(OSError, os.setgid, 0)
2256        self.assertRaises(TypeError, os.setgid, 'not an int')
2257        self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
2258
2259    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
2260    def test_seteuid(self):
2261        if os.getuid() != 0:
2262            self.assertRaises(OSError, os.seteuid, 0)
2263        self.assertRaises(TypeError, os.setegid, 'not an int')
2264        self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
2265
2266    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
2267    def test_setegid(self):
2268        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2269            self.assertRaises(OSError, os.setegid, 0)
2270        self.assertRaises(TypeError, os.setegid, 'not an int')
2271        self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
2272
2273    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2274    def test_setreuid(self):
2275        if os.getuid() != 0:
2276            self.assertRaises(OSError, os.setreuid, 0, 0)
2277        self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
2278        self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
2279        self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
2280        self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
2281
2282    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
2283    def test_setreuid_neg1(self):
2284        # Needs to accept -1.  We run this in a subprocess to avoid
2285        # altering the test runner's process state (issue8045).
2286        subprocess.check_call([
2287                sys.executable, '-c',
2288                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
2289
2290    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2291    def test_setregid(self):
2292        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
2293            self.assertRaises(OSError, os.setregid, 0, 0)
2294        self.assertRaises(TypeError, os.setregid, 'not an int', 0)
2295        self.assertRaises(TypeError, os.setregid, 0, 'not an int')
2296        self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
2297        self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
2298
2299    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
2300    def test_setregid_neg1(self):
2301        # Needs to accept -1.  We run this in a subprocess to avoid
2302        # altering the test runner's process state (issue8045).
2303        subprocess.check_call([
2304                sys.executable, '-c',
2305                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
2306
2307@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
2308class Pep383Tests(unittest.TestCase):
2309    def setUp(self):
2310        if os_helper.TESTFN_UNENCODABLE:
2311            self.dir = os_helper.TESTFN_UNENCODABLE
2312        elif os_helper.TESTFN_NONASCII:
2313            self.dir = os_helper.TESTFN_NONASCII
2314        else:
2315            self.dir = os_helper.TESTFN
2316        self.bdir = os.fsencode(self.dir)
2317
2318        bytesfn = []
2319        def add_filename(fn):
2320            try:
2321                fn = os.fsencode(fn)
2322            except UnicodeEncodeError:
2323                return
2324            bytesfn.append(fn)
2325        add_filename(os_helper.TESTFN_UNICODE)
2326        if os_helper.TESTFN_UNENCODABLE:
2327            add_filename(os_helper.TESTFN_UNENCODABLE)
2328        if os_helper.TESTFN_NONASCII:
2329            add_filename(os_helper.TESTFN_NONASCII)
2330        if not bytesfn:
2331            self.skipTest("couldn't create any non-ascii filename")
2332
2333        self.unicodefn = set()
2334        os.mkdir(self.dir)
2335        try:
2336            for fn in bytesfn:
2337                os_helper.create_empty_file(os.path.join(self.bdir, fn))
2338                fn = os.fsdecode(fn)
2339                if fn in self.unicodefn:
2340                    raise ValueError("duplicate filename")
2341                self.unicodefn.add(fn)
2342        except:
2343            shutil.rmtree(self.dir)
2344            raise
2345
2346    def tearDown(self):
2347        shutil.rmtree(self.dir)
2348
2349    def test_listdir(self):
2350        expected = self.unicodefn
2351        found = set(os.listdir(self.dir))
2352        self.assertEqual(found, expected)
2353        # test listdir without arguments
2354        current_directory = os.getcwd()
2355        try:
2356            os.chdir(os.sep)
2357            self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2358        finally:
2359            os.chdir(current_directory)
2360
2361    def test_open(self):
2362        for fn in self.unicodefn:
2363            f = open(os.path.join(self.dir, fn), 'rb')
2364            f.close()
2365
2366    @unittest.skipUnless(hasattr(os, 'statvfs'),
2367                            "need os.statvfs()")
2368    def test_statvfs(self):
2369        # issue #9645
2370        for fn in self.unicodefn:
2371            # should not fail with file not found error
2372            fullname = os.path.join(self.dir, fn)
2373            os.statvfs(fullname)
2374
2375    def test_stat(self):
2376        for fn in self.unicodefn:
2377            os.stat(os.path.join(self.dir, fn))
2378
2379@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2380class Win32KillTests(unittest.TestCase):
2381    def _kill(self, sig):
2382        # Start sys.executable as a subprocess and communicate from the
2383        # subprocess to the parent that the interpreter is ready. When it
2384        # becomes ready, send *sig* via os.kill to the subprocess and check
2385        # that the return code is equal to *sig*.
2386        import ctypes
2387        from ctypes import wintypes
2388        import msvcrt
2389
2390        # Since we can't access the contents of the process' stdout until the
2391        # process has exited, use PeekNamedPipe to see what's inside stdout
2392        # without waiting. This is done so we can tell that the interpreter
2393        # is started and running at a point where it could handle a signal.
2394        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2395        PeekNamedPipe.restype = wintypes.BOOL
2396        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2397                                  ctypes.POINTER(ctypes.c_char), # stdout buf
2398                                  wintypes.DWORD, # Buffer size
2399                                  ctypes.POINTER(wintypes.DWORD), # bytes read
2400                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
2401                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
2402        msg = "running"
2403        proc = subprocess.Popen([sys.executable, "-c",
2404                                 "import sys;"
2405                                 "sys.stdout.write('{}');"
2406                                 "sys.stdout.flush();"
2407                                 "input()".format(msg)],
2408                                stdout=subprocess.PIPE,
2409                                stderr=subprocess.PIPE,
2410                                stdin=subprocess.PIPE)
2411        self.addCleanup(proc.stdout.close)
2412        self.addCleanup(proc.stderr.close)
2413        self.addCleanup(proc.stdin.close)
2414
2415        count, max = 0, 100
2416        while count < max and proc.poll() is None:
2417            # Create a string buffer to store the result of stdout from the pipe
2418            buf = ctypes.create_string_buffer(len(msg))
2419            # Obtain the text currently in proc.stdout
2420            # Bytes read/avail/left are left as NULL and unused
2421            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2422                                 buf, ctypes.sizeof(buf), None, None, None)
2423            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2424            if buf.value:
2425                self.assertEqual(msg, buf.value.decode())
2426                break
2427            time.sleep(0.1)
2428            count += 1
2429        else:
2430            self.fail("Did not receive communication from the subprocess")
2431
2432        os.kill(proc.pid, sig)
2433        self.assertEqual(proc.wait(), sig)
2434
2435    def test_kill_sigterm(self):
2436        # SIGTERM doesn't mean anything special, but make sure it works
2437        self._kill(signal.SIGTERM)
2438
2439    def test_kill_int(self):
2440        # os.kill on Windows can take an int which gets set as the exit code
2441        self._kill(100)
2442
2443    def _kill_with_event(self, event, name):
2444        tagname = "test_os_%s" % uuid.uuid1()
2445        m = mmap.mmap(-1, 1, tagname)
2446        m[0] = 0
2447        # Run a script which has console control handling enabled.
2448        proc = subprocess.Popen([sys.executable,
2449                   os.path.join(os.path.dirname(__file__),
2450                                "win_console_handler.py"), tagname],
2451                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2452        # Let the interpreter startup before we send signals. See #3137.
2453        count, max = 0, 100
2454        while count < max and proc.poll() is None:
2455            if m[0] == 1:
2456                break
2457            time.sleep(0.1)
2458            count += 1
2459        else:
2460            # Forcefully kill the process if we weren't able to signal it.
2461            os.kill(proc.pid, signal.SIGINT)
2462            self.fail("Subprocess didn't finish initialization")
2463        os.kill(proc.pid, event)
2464        # proc.send_signal(event) could also be done here.
2465        # Allow time for the signal to be passed and the process to exit.
2466        time.sleep(0.5)
2467        if not proc.poll():
2468            # Forcefully kill the process if we weren't able to signal it.
2469            os.kill(proc.pid, signal.SIGINT)
2470            self.fail("subprocess did not stop on {}".format(name))
2471
2472    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
2473    def test_CTRL_C_EVENT(self):
2474        from ctypes import wintypes
2475        import ctypes
2476
2477        # Make a NULL value by creating a pointer with no argument.
2478        NULL = ctypes.POINTER(ctypes.c_int)()
2479        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2480        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2481                                          wintypes.BOOL)
2482        SetConsoleCtrlHandler.restype = wintypes.BOOL
2483
2484        # Calling this with NULL and FALSE causes the calling process to
2485        # handle Ctrl+C, rather than ignore it. This property is inherited
2486        # by subprocesses.
2487        SetConsoleCtrlHandler(NULL, 0)
2488
2489        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2490
2491    def test_CTRL_BREAK_EVENT(self):
2492        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2493
2494
2495@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2496class Win32ListdirTests(unittest.TestCase):
2497    """Test listdir on Windows."""
2498
2499    def setUp(self):
2500        self.created_paths = []
2501        for i in range(2):
2502            dir_name = 'SUB%d' % i
2503            dir_path = os.path.join(os_helper.TESTFN, dir_name)
2504            file_name = 'FILE%d' % i
2505            file_path = os.path.join(os_helper.TESTFN, file_name)
2506            os.makedirs(dir_path)
2507            with open(file_path, 'w', encoding='utf-8') as f:
2508                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2509            self.created_paths.extend([dir_name, file_name])
2510        self.created_paths.sort()
2511
2512    def tearDown(self):
2513        shutil.rmtree(os_helper.TESTFN)
2514
2515    def test_listdir_no_extended_path(self):
2516        """Test when the path is not an "extended" path."""
2517        # unicode
2518        self.assertEqual(
2519                sorted(os.listdir(os_helper.TESTFN)),
2520                self.created_paths)
2521
2522        # bytes
2523        self.assertEqual(
2524                sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
2525                [os.fsencode(path) for path in self.created_paths])
2526
2527    def test_listdir_extended_path(self):
2528        """Test when the path starts with '\\\\?\\'."""
2529        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
2530        # unicode
2531        path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
2532        self.assertEqual(
2533                sorted(os.listdir(path)),
2534                self.created_paths)
2535
2536        # bytes
2537        path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
2538        self.assertEqual(
2539                sorted(os.listdir(path)),
2540                [os.fsencode(path) for path in self.created_paths])
2541
2542
2543@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2544class ReadlinkTests(unittest.TestCase):
2545    filelink = 'readlinktest'
2546    filelink_target = os.path.abspath(__file__)
2547    filelinkb = os.fsencode(filelink)
2548    filelinkb_target = os.fsencode(filelink_target)
2549
2550    def assertPathEqual(self, left, right):
2551        left = os.path.normcase(left)
2552        right = os.path.normcase(right)
2553        if sys.platform == 'win32':
2554            # Bad practice to blindly strip the prefix as it may be required to
2555            # correctly refer to the file, but we're only comparing paths here.
2556            has_prefix = lambda p: p.startswith(
2557                b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\')
2558            if has_prefix(left):
2559                left = left[4:]
2560            if has_prefix(right):
2561                right = right[4:]
2562        self.assertEqual(left, right)
2563
2564    def setUp(self):
2565        self.assertTrue(os.path.exists(self.filelink_target))
2566        self.assertTrue(os.path.exists(self.filelinkb_target))
2567        self.assertFalse(os.path.exists(self.filelink))
2568        self.assertFalse(os.path.exists(self.filelinkb))
2569
2570    def test_not_symlink(self):
2571        filelink_target = FakePath(self.filelink_target)
2572        self.assertRaises(OSError, os.readlink, self.filelink_target)
2573        self.assertRaises(OSError, os.readlink, filelink_target)
2574
2575    def test_missing_link(self):
2576        self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2577        self.assertRaises(FileNotFoundError, os.readlink,
2578                          FakePath('missing-link'))
2579
2580    @os_helper.skip_unless_symlink
2581    def test_pathlike(self):
2582        os.symlink(self.filelink_target, self.filelink)
2583        self.addCleanup(os_helper.unlink, self.filelink)
2584        filelink = FakePath(self.filelink)
2585        self.assertPathEqual(os.readlink(filelink), self.filelink_target)
2586
2587    @os_helper.skip_unless_symlink
2588    def test_pathlike_bytes(self):
2589        os.symlink(self.filelinkb_target, self.filelinkb)
2590        self.addCleanup(os_helper.unlink, self.filelinkb)
2591        path = os.readlink(FakePath(self.filelinkb))
2592        self.assertPathEqual(path, self.filelinkb_target)
2593        self.assertIsInstance(path, bytes)
2594
2595    @os_helper.skip_unless_symlink
2596    def test_bytes(self):
2597        os.symlink(self.filelinkb_target, self.filelinkb)
2598        self.addCleanup(os_helper.unlink, self.filelinkb)
2599        path = os.readlink(self.filelinkb)
2600        self.assertPathEqual(path, self.filelinkb_target)
2601        self.assertIsInstance(path, bytes)
2602
2603
2604@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2605@os_helper.skip_unless_symlink
2606class Win32SymlinkTests(unittest.TestCase):
2607    filelink = 'filelinktest'
2608    filelink_target = os.path.abspath(__file__)
2609    dirlink = 'dirlinktest'
2610    dirlink_target = os.path.dirname(filelink_target)
2611    missing_link = 'missing link'
2612
2613    def setUp(self):
2614        assert os.path.exists(self.dirlink_target)
2615        assert os.path.exists(self.filelink_target)
2616        assert not os.path.exists(self.dirlink)
2617        assert not os.path.exists(self.filelink)
2618        assert not os.path.exists(self.missing_link)
2619
2620    def tearDown(self):
2621        if os.path.exists(self.filelink):
2622            os.remove(self.filelink)
2623        if os.path.exists(self.dirlink):
2624            os.rmdir(self.dirlink)
2625        if os.path.lexists(self.missing_link):
2626            os.remove(self.missing_link)
2627
2628    def test_directory_link(self):
2629        os.symlink(self.dirlink_target, self.dirlink)
2630        self.assertTrue(os.path.exists(self.dirlink))
2631        self.assertTrue(os.path.isdir(self.dirlink))
2632        self.assertTrue(os.path.islink(self.dirlink))
2633        self.check_stat(self.dirlink, self.dirlink_target)
2634
2635    def test_file_link(self):
2636        os.symlink(self.filelink_target, self.filelink)
2637        self.assertTrue(os.path.exists(self.filelink))
2638        self.assertTrue(os.path.isfile(self.filelink))
2639        self.assertTrue(os.path.islink(self.filelink))
2640        self.check_stat(self.filelink, self.filelink_target)
2641
2642    def _create_missing_dir_link(self):
2643        'Create a "directory" link to a non-existent target'
2644        linkname = self.missing_link
2645        if os.path.lexists(linkname):
2646            os.remove(linkname)
2647        target = r'c:\\target does not exist.29r3c740'
2648        assert not os.path.exists(target)
2649        target_is_dir = True
2650        os.symlink(target, linkname, target_is_dir)
2651
2652    def test_remove_directory_link_to_missing_target(self):
2653        self._create_missing_dir_link()
2654        # For compatibility with Unix, os.remove will check the
2655        #  directory status and call RemoveDirectory if the symlink
2656        #  was created with target_is_dir==True.
2657        os.remove(self.missing_link)
2658
2659    def test_isdir_on_directory_link_to_missing_target(self):
2660        self._create_missing_dir_link()
2661        self.assertFalse(os.path.isdir(self.missing_link))
2662
2663    def test_rmdir_on_directory_link_to_missing_target(self):
2664        self._create_missing_dir_link()
2665        os.rmdir(self.missing_link)
2666
2667    def check_stat(self, link, target):
2668        self.assertEqual(os.stat(link), os.stat(target))
2669        self.assertNotEqual(os.lstat(link), os.stat(link))
2670
2671        bytes_link = os.fsencode(link)
2672        self.assertEqual(os.stat(bytes_link), os.stat(target))
2673        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
2674
2675    def test_12084(self):
2676        level1 = os.path.abspath(os_helper.TESTFN)
2677        level2 = os.path.join(level1, "level2")
2678        level3 = os.path.join(level2, "level3")
2679        self.addCleanup(os_helper.rmtree, level1)
2680
2681        os.mkdir(level1)
2682        os.mkdir(level2)
2683        os.mkdir(level3)
2684
2685        file1 = os.path.abspath(os.path.join(level1, "file1"))
2686        create_file(file1)
2687
2688        orig_dir = os.getcwd()
2689        try:
2690            os.chdir(level2)
2691            link = os.path.join(level2, "link")
2692            os.symlink(os.path.relpath(file1), "link")
2693            self.assertIn("link", os.listdir(os.getcwd()))
2694
2695            # Check os.stat calls from the same dir as the link
2696            self.assertEqual(os.stat(file1), os.stat("link"))
2697
2698            # Check os.stat calls from a dir below the link
2699            os.chdir(level1)
2700            self.assertEqual(os.stat(file1),
2701                             os.stat(os.path.relpath(link)))
2702
2703            # Check os.stat calls from a dir above the link
2704            os.chdir(level3)
2705            self.assertEqual(os.stat(file1),
2706                             os.stat(os.path.relpath(link)))
2707        finally:
2708            os.chdir(orig_dir)
2709
2710    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2711                            and os.path.exists(r'C:\ProgramData'),
2712                            'Test directories not found')
2713    def test_29248(self):
2714        # os.symlink() calls CreateSymbolicLink, which creates
2715        # the reparse data buffer with the print name stored
2716        # first, so the offset is always 0. CreateSymbolicLink
2717        # stores the "PrintName" DOS path (e.g. "C:\") first,
2718        # with an offset of 0, followed by the "SubstituteName"
2719        # NT path (e.g. "\??\C:\"). The "All Users" link, on
2720        # the other hand, seems to have been created manually
2721        # with an inverted order.
2722        target = os.readlink(r'C:\Users\All Users')
2723        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2724
2725    def test_buffer_overflow(self):
2726        # Older versions would have a buffer overflow when detecting
2727        # whether a link source was a directory. This test ensures we
2728        # no longer crash, but does not otherwise validate the behavior
2729        segment = 'X' * 27
2730        path = os.path.join(*[segment] * 10)
2731        test_cases = [
2732            # overflow with absolute src
2733            ('\\' + path, segment),
2734            # overflow dest with relative src
2735            (segment, path),
2736            # overflow when joining src
2737            (path[:180], path[:180]),
2738        ]
2739        for src, dest in test_cases:
2740            try:
2741                os.symlink(src, dest)
2742            except FileNotFoundError:
2743                pass
2744            else:
2745                try:
2746                    os.remove(dest)
2747                except OSError:
2748                    pass
2749            # Also test with bytes, since that is a separate code path.
2750            try:
2751                os.symlink(os.fsencode(src), os.fsencode(dest))
2752            except FileNotFoundError:
2753                pass
2754            else:
2755                try:
2756                    os.remove(dest)
2757                except OSError:
2758                    pass
2759
2760    def test_appexeclink(self):
2761        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
2762        if not os.path.isdir(root):
2763            self.skipTest("test requires a WindowsApps directory")
2764
2765        aliases = [os.path.join(root, a)
2766                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
2767
2768        for alias in aliases:
2769            if support.verbose:
2770                print()
2771                print("Testing with", alias)
2772            st = os.lstat(alias)
2773            self.assertEqual(st, os.stat(alias))
2774            self.assertFalse(stat.S_ISLNK(st.st_mode))
2775            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
2776            # testing the first one we see is sufficient
2777            break
2778        else:
2779            self.skipTest("test requires an app execution alias")
2780
2781@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2782class Win32JunctionTests(unittest.TestCase):
2783    junction = 'junctiontest'
2784    junction_target = os.path.dirname(os.path.abspath(__file__))
2785
2786    def setUp(self):
2787        assert os.path.exists(self.junction_target)
2788        assert not os.path.lexists(self.junction)
2789
2790    def tearDown(self):
2791        if os.path.lexists(self.junction):
2792            os.unlink(self.junction)
2793
2794    def test_create_junction(self):
2795        _winapi.CreateJunction(self.junction_target, self.junction)
2796        self.assertTrue(os.path.lexists(self.junction))
2797        self.assertTrue(os.path.exists(self.junction))
2798        self.assertTrue(os.path.isdir(self.junction))
2799        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
2800        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
2801
2802        # bpo-37834: Junctions are not recognized as links.
2803        self.assertFalse(os.path.islink(self.junction))
2804        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
2805                         os.path.normcase(os.readlink(self.junction)))
2806
2807    def test_unlink_removes_junction(self):
2808        _winapi.CreateJunction(self.junction_target, self.junction)
2809        self.assertTrue(os.path.exists(self.junction))
2810        self.assertTrue(os.path.lexists(self.junction))
2811
2812        os.unlink(self.junction)
2813        self.assertFalse(os.path.exists(self.junction))
2814
2815@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2816class Win32NtTests(unittest.TestCase):
2817    def test_getfinalpathname_handles(self):
2818        nt = import_helper.import_module('nt')
2819        ctypes = import_helper.import_module('ctypes')
2820        import ctypes.wintypes
2821
2822        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2823        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2824
2825        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2826        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2827                                                 ctypes.wintypes.LPDWORD)
2828
2829        # This is a pseudo-handle that doesn't need to be closed
2830        hproc = kernel.GetCurrentProcess()
2831
2832        handle_count = ctypes.wintypes.DWORD()
2833        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2834        self.assertEqual(1, ok)
2835
2836        before_count = handle_count.value
2837
2838        # The first two test the error path, __file__ tests the success path
2839        filenames = [
2840            r'\\?\C:',
2841            r'\\?\NUL',
2842            r'\\?\CONIN',
2843            __file__,
2844        ]
2845
2846        for _ in range(10):
2847            for name in filenames:
2848                try:
2849                    nt._getfinalpathname(name)
2850                except Exception:
2851                    # Failure is expected
2852                    pass
2853                try:
2854                    os.stat(name)
2855                except Exception:
2856                    pass
2857
2858        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2859        self.assertEqual(1, ok)
2860
2861        handle_delta = handle_count.value - before_count
2862
2863        self.assertEqual(0, handle_delta)
2864
2865@os_helper.skip_unless_symlink
2866class NonLocalSymlinkTests(unittest.TestCase):
2867
2868    def setUp(self):
2869        r"""
2870        Create this structure:
2871
2872        base
2873         \___ some_dir
2874        """
2875        os.makedirs('base/some_dir')
2876
2877    def tearDown(self):
2878        shutil.rmtree('base')
2879
2880    def test_directory_link_nonlocal(self):
2881        """
2882        The symlink target should resolve relative to the link, not relative
2883        to the current directory.
2884
2885        Then, link base/some_link -> base/some_dir and ensure that some_link
2886        is resolved as a directory.
2887
2888        In issue13772, it was discovered that directory detection failed if
2889        the symlink target was not specified relative to the current
2890        directory, which was a defect in the implementation.
2891        """
2892        src = os.path.join('base', 'some_link')
2893        os.symlink('some_dir', src)
2894        assert os.path.isdir(src)
2895
2896
2897class FSEncodingTests(unittest.TestCase):
2898    def test_nop(self):
2899        self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2900        self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
2901
2902    def test_identity(self):
2903        # assert fsdecode(fsencode(x)) == x
2904        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2905            try:
2906                bytesfn = os.fsencode(fn)
2907            except UnicodeEncodeError:
2908                continue
2909            self.assertEqual(os.fsdecode(bytesfn), fn)
2910
2911
2912
2913class DeviceEncodingTests(unittest.TestCase):
2914
2915    def test_bad_fd(self):
2916        # Return None when an fd doesn't actually exist.
2917        self.assertIsNone(os.device_encoding(123456))
2918
2919    @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
2920            (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
2921            'test requires a tty and either Windows or nl_langinfo(CODESET)')
2922    def test_device_encoding(self):
2923        encoding = os.device_encoding(0)
2924        self.assertIsNotNone(encoding)
2925        self.assertTrue(codecs.lookup(encoding))
2926
2927
2928class PidTests(unittest.TestCase):
2929    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2930    def test_getppid(self):
2931        p = subprocess.Popen([sys.executable, '-c',
2932                              'import os; print(os.getppid())'],
2933                             stdout=subprocess.PIPE)
2934        stdout, _ = p.communicate()
2935        # We are the parent of our subprocess
2936        self.assertEqual(int(stdout), os.getpid())
2937
2938    def check_waitpid(self, code, exitcode, callback=None):
2939        if sys.platform == 'win32':
2940            # On Windows, os.spawnv() simply joins arguments with spaces:
2941            # arguments need to be quoted
2942            args = [f'"{sys.executable}"', '-c', f'"{code}"']
2943        else:
2944            args = [sys.executable, '-c', code]
2945        pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
2946
2947        if callback is not None:
2948            callback(pid)
2949
2950        # don't use support.wait_process() to test directly os.waitpid()
2951        # and os.waitstatus_to_exitcode()
2952        pid2, status = os.waitpid(pid, 0)
2953        self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
2954        self.assertEqual(pid2, pid)
2955
2956    def test_waitpid(self):
2957        self.check_waitpid(code='pass', exitcode=0)
2958
2959    def test_waitstatus_to_exitcode(self):
2960        exitcode = 23
2961        code = f'import sys; sys.exit({exitcode})'
2962        self.check_waitpid(code, exitcode=exitcode)
2963
2964        with self.assertRaises(TypeError):
2965            os.waitstatus_to_exitcode(0.0)
2966
2967    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2968    def test_waitpid_windows(self):
2969        # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode()
2970        # with exit code larger than INT_MAX.
2971        STATUS_CONTROL_C_EXIT = 0xC000013A
2972        code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})'
2973        self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT)
2974
2975    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
2976    def test_waitstatus_to_exitcode_windows(self):
2977        max_exitcode = 2 ** 32 - 1
2978        for exitcode in (0, 1, 5, max_exitcode):
2979            self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8),
2980                             exitcode)
2981
2982        # invalid values
2983        with self.assertRaises(ValueError):
2984            os.waitstatus_to_exitcode((max_exitcode + 1) << 8)
2985        with self.assertRaises(OverflowError):
2986            os.waitstatus_to_exitcode(-1)
2987
2988    # Skip the test on Windows
2989    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL')
2990    def test_waitstatus_to_exitcode_kill(self):
2991        code = f'import time; time.sleep({support.LONG_TIMEOUT})'
2992        signum = signal.SIGKILL
2993
2994        def kill_process(pid):
2995            os.kill(pid, signum)
2996
2997        self.check_waitpid(code, exitcode=-signum, callback=kill_process)
2998
2999
3000class SpawnTests(unittest.TestCase):
3001    def create_args(self, *, with_env=False, use_bytes=False):
3002        self.exitcode = 17
3003
3004        filename = os_helper.TESTFN
3005        self.addCleanup(os_helper.unlink, filename)
3006
3007        if not with_env:
3008            code = 'import sys; sys.exit(%s)' % self.exitcode
3009        else:
3010            self.env = dict(os.environ)
3011            # create an unique key
3012            self.key = str(uuid.uuid4())
3013            self.env[self.key] = self.key
3014            # read the variable from os.environ to check that it exists
3015            code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
3016                    % (self.key, self.exitcode))
3017
3018        with open(filename, "w", encoding="utf-8") as fp:
3019            fp.write(code)
3020
3021        args = [sys.executable, filename]
3022        if use_bytes:
3023            args = [os.fsencode(a) for a in args]
3024            self.env = {os.fsencode(k): os.fsencode(v)
3025                        for k, v in self.env.items()}
3026
3027        return args
3028
3029    @requires_os_func('spawnl')
3030    def test_spawnl(self):
3031        args = self.create_args()
3032        exitcode = os.spawnl(os.P_WAIT, args[0], *args)
3033        self.assertEqual(exitcode, self.exitcode)
3034
3035    @requires_os_func('spawnle')
3036    def test_spawnle(self):
3037        args = self.create_args(with_env=True)
3038        exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
3039        self.assertEqual(exitcode, self.exitcode)
3040
3041    @requires_os_func('spawnlp')
3042    def test_spawnlp(self):
3043        args = self.create_args()
3044        exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
3045        self.assertEqual(exitcode, self.exitcode)
3046
3047    @requires_os_func('spawnlpe')
3048    def test_spawnlpe(self):
3049        args = self.create_args(with_env=True)
3050        exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
3051        self.assertEqual(exitcode, self.exitcode)
3052
3053    @requires_os_func('spawnv')
3054    def test_spawnv(self):
3055        args = self.create_args()
3056        exitcode = os.spawnv(os.P_WAIT, args[0], args)
3057        self.assertEqual(exitcode, self.exitcode)
3058
3059        # Test for PyUnicode_FSConverter()
3060        exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args)
3061        self.assertEqual(exitcode, self.exitcode)
3062
3063    @requires_os_func('spawnve')
3064    def test_spawnve(self):
3065        args = self.create_args(with_env=True)
3066        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3067        self.assertEqual(exitcode, self.exitcode)
3068
3069    @requires_os_func('spawnvp')
3070    def test_spawnvp(self):
3071        args = self.create_args()
3072        exitcode = os.spawnvp(os.P_WAIT, args[0], args)
3073        self.assertEqual(exitcode, self.exitcode)
3074
3075    @requires_os_func('spawnvpe')
3076    def test_spawnvpe(self):
3077        args = self.create_args(with_env=True)
3078        exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
3079        self.assertEqual(exitcode, self.exitcode)
3080
3081    @requires_os_func('spawnv')
3082    def test_nowait(self):
3083        args = self.create_args()
3084        pid = os.spawnv(os.P_NOWAIT, args[0], args)
3085        support.wait_process(pid, exitcode=self.exitcode)
3086
3087    @requires_os_func('spawnve')
3088    def test_spawnve_bytes(self):
3089        # Test bytes handling in parse_arglist and parse_envlist (#28114)
3090        args = self.create_args(with_env=True, use_bytes=True)
3091        exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
3092        self.assertEqual(exitcode, self.exitcode)
3093
3094    @requires_os_func('spawnl')
3095    def test_spawnl_noargs(self):
3096        args = self.create_args()
3097        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
3098        self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
3099
3100    @requires_os_func('spawnle')
3101    def test_spawnle_noargs(self):
3102        args = self.create_args()
3103        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
3104        self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
3105
3106    @requires_os_func('spawnv')
3107    def test_spawnv_noargs(self):
3108        args = self.create_args()
3109        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
3110        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
3111        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
3112        self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
3113
3114    @requires_os_func('spawnve')
3115    def test_spawnve_noargs(self):
3116        args = self.create_args()
3117        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
3118        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
3119        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
3120        self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
3121
3122    def _test_invalid_env(self, spawn):
3123        args = [sys.executable, '-c', 'pass']
3124
3125        # null character in the environment variable name
3126        newenv = os.environ.copy()
3127        newenv["FRUIT\0VEGETABLE"] = "cabbage"
3128        try:
3129            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3130        except ValueError:
3131            pass
3132        else:
3133            self.assertEqual(exitcode, 127)
3134
3135        # null character in the environment variable value
3136        newenv = os.environ.copy()
3137        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
3138        try:
3139            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3140        except ValueError:
3141            pass
3142        else:
3143            self.assertEqual(exitcode, 127)
3144
3145        # equal character in the environment variable name
3146        newenv = os.environ.copy()
3147        newenv["FRUIT=ORANGE"] = "lemon"
3148        try:
3149            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3150        except ValueError:
3151            pass
3152        else:
3153            self.assertEqual(exitcode, 127)
3154
3155        # equal character in the environment variable value
3156        filename = os_helper.TESTFN
3157        self.addCleanup(os_helper.unlink, filename)
3158        with open(filename, "w", encoding="utf-8") as fp:
3159            fp.write('import sys, os\n'
3160                     'if os.getenv("FRUIT") != "orange=lemon":\n'
3161                     '    raise AssertionError')
3162        args = [sys.executable, filename]
3163        newenv = os.environ.copy()
3164        newenv["FRUIT"] = "orange=lemon"
3165        exitcode = spawn(os.P_WAIT, args[0], args, newenv)
3166        self.assertEqual(exitcode, 0)
3167
3168    @requires_os_func('spawnve')
3169    def test_spawnve_invalid_env(self):
3170        self._test_invalid_env(os.spawnve)
3171
3172    @requires_os_func('spawnvpe')
3173    def test_spawnvpe_invalid_env(self):
3174        self._test_invalid_env(os.spawnvpe)
3175
3176
3177# The introduction of this TestCase caused at least two different errors on
3178# *nix buildbots. Temporarily skip this to let the buildbots move along.
3179@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
3180@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
3181class LoginTests(unittest.TestCase):
3182    def test_getlogin(self):
3183        user_name = os.getlogin()
3184        self.assertNotEqual(len(user_name), 0)
3185
3186
3187@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
3188                     "needs os.getpriority and os.setpriority")
3189class ProgramPriorityTests(unittest.TestCase):
3190    """Tests for os.getpriority() and os.setpriority()."""
3191
3192    def test_set_get_priority(self):
3193
3194        base = os.getpriority(os.PRIO_PROCESS, os.getpid())
3195        os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
3196        try:
3197            new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
3198            if base >= 19 and new_prio <= 19:
3199                raise unittest.SkipTest("unable to reliably test setpriority "
3200                                        "at current nice level of %s" % base)
3201            else:
3202                self.assertEqual(new_prio, base + 1)
3203        finally:
3204            try:
3205                os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
3206            except OSError as err:
3207                if err.errno != errno.EACCES:
3208                    raise
3209
3210
3211class SendfileTestServer(asyncore.dispatcher, threading.Thread):
3212
3213    class Handler(asynchat.async_chat):
3214
3215        def __init__(self, conn):
3216            asynchat.async_chat.__init__(self, conn)
3217            self.in_buffer = []
3218            self.accumulate = True
3219            self.closed = False
3220            self.push(b"220 ready\r\n")
3221
3222        def handle_read(self):
3223            data = self.recv(4096)
3224            if self.accumulate:
3225                self.in_buffer.append(data)
3226
3227        def get_data(self):
3228            return b''.join(self.in_buffer)
3229
3230        def handle_close(self):
3231            self.close()
3232            self.closed = True
3233
3234        def handle_error(self):
3235            raise
3236
3237    def __init__(self, address):
3238        threading.Thread.__init__(self)
3239        asyncore.dispatcher.__init__(self)
3240        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
3241        self.bind(address)
3242        self.listen(5)
3243        self.host, self.port = self.socket.getsockname()[:2]
3244        self.handler_instance = None
3245        self._active = False
3246        self._active_lock = threading.Lock()
3247
3248    # --- public API
3249
3250    @property
3251    def running(self):
3252        return self._active
3253
3254    def start(self):
3255        assert not self.running
3256        self.__flag = threading.Event()
3257        threading.Thread.start(self)
3258        self.__flag.wait()
3259
3260    def stop(self):
3261        assert self.running
3262        self._active = False
3263        self.join()
3264
3265    def wait(self):
3266        # wait for handler connection to be closed, then stop the server
3267        while not getattr(self.handler_instance, "closed", False):
3268            time.sleep(0.001)
3269        self.stop()
3270
3271    # --- internals
3272
3273    def run(self):
3274        self._active = True
3275        self.__flag.set()
3276        while self._active and asyncore.socket_map:
3277            self._active_lock.acquire()
3278            asyncore.loop(timeout=0.001, count=1)
3279            self._active_lock.release()
3280        asyncore.close_all()
3281
3282    def handle_accept(self):
3283        conn, addr = self.accept()
3284        self.handler_instance = self.Handler(conn)
3285
3286    def handle_connect(self):
3287        self.close()
3288    handle_read = handle_connect
3289
3290    def writable(self):
3291        return 0
3292
3293    def handle_error(self):
3294        raise
3295
3296
3297@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
3298class TestSendfile(unittest.TestCase):
3299
3300    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
3301    SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
3302                               not sys.platform.startswith("solaris") and \
3303                               not sys.platform.startswith("sunos")
3304    requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
3305            'requires headers and trailers support')
3306    requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
3307            'test is only meaningful on 32-bit builds')
3308
3309    @classmethod
3310    def setUpClass(cls):
3311        cls.key = threading_helper.threading_setup()
3312        create_file(os_helper.TESTFN, cls.DATA)
3313
3314    @classmethod
3315    def tearDownClass(cls):
3316        threading_helper.threading_cleanup(*cls.key)
3317        os_helper.unlink(os_helper.TESTFN)
3318
3319    def setUp(self):
3320        self.server = SendfileTestServer((socket_helper.HOST, 0))
3321        self.server.start()
3322        self.client = socket.socket()
3323        self.client.connect((self.server.host, self.server.port))
3324        self.client.settimeout(1)
3325        # synchronize by waiting for "220 ready" response
3326        self.client.recv(1024)
3327        self.sockno = self.client.fileno()
3328        self.file = open(os_helper.TESTFN, 'rb')
3329        self.fileno = self.file.fileno()
3330
3331    def tearDown(self):
3332        self.file.close()
3333        self.client.close()
3334        if self.server.running:
3335            self.server.stop()
3336        self.server = None
3337
3338    def sendfile_wrapper(self, *args, **kwargs):
3339        """A higher level wrapper representing how an application is
3340        supposed to use sendfile().
3341        """
3342        while True:
3343            try:
3344                return os.sendfile(*args, **kwargs)
3345            except OSError as err:
3346                if err.errno == errno.ECONNRESET:
3347                    # disconnected
3348                    raise
3349                elif err.errno in (errno.EAGAIN, errno.EBUSY):
3350                    # we have to retry send data
3351                    continue
3352                else:
3353                    raise
3354
3355    def test_send_whole_file(self):
3356        # normal send
3357        total_sent = 0
3358        offset = 0
3359        nbytes = 4096
3360        while total_sent < len(self.DATA):
3361            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3362            if sent == 0:
3363                break
3364            offset += sent
3365            total_sent += sent
3366            self.assertTrue(sent <= nbytes)
3367            self.assertEqual(offset, total_sent)
3368
3369        self.assertEqual(total_sent, len(self.DATA))
3370        self.client.shutdown(socket.SHUT_RDWR)
3371        self.client.close()
3372        self.server.wait()
3373        data = self.server.handler_instance.get_data()
3374        self.assertEqual(len(data), len(self.DATA))
3375        self.assertEqual(data, self.DATA)
3376
3377    def test_send_at_certain_offset(self):
3378        # start sending a file at a certain offset
3379        total_sent = 0
3380        offset = len(self.DATA) // 2
3381        must_send = len(self.DATA) - offset
3382        nbytes = 4096
3383        while total_sent < must_send:
3384            sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
3385            if sent == 0:
3386                break
3387            offset += sent
3388            total_sent += sent
3389            self.assertTrue(sent <= nbytes)
3390
3391        self.client.shutdown(socket.SHUT_RDWR)
3392        self.client.close()
3393        self.server.wait()
3394        data = self.server.handler_instance.get_data()
3395        expected = self.DATA[len(self.DATA) // 2:]
3396        self.assertEqual(total_sent, len(expected))
3397        self.assertEqual(len(data), len(expected))
3398        self.assertEqual(data, expected)
3399
3400    def test_offset_overflow(self):
3401        # specify an offset > file size
3402        offset = len(self.DATA) + 4096
3403        try:
3404            sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
3405        except OSError as e:
3406            # Solaris can raise EINVAL if offset >= file length, ignore.
3407            if e.errno != errno.EINVAL:
3408                raise
3409        else:
3410            self.assertEqual(sent, 0)
3411        self.client.shutdown(socket.SHUT_RDWR)
3412        self.client.close()
3413        self.server.wait()
3414        data = self.server.handler_instance.get_data()
3415        self.assertEqual(data, b'')
3416
3417    def test_invalid_offset(self):
3418        with self.assertRaises(OSError) as cm:
3419            os.sendfile(self.sockno, self.fileno, -1, 4096)
3420        self.assertEqual(cm.exception.errno, errno.EINVAL)
3421
3422    def test_keywords(self):
3423        # Keyword arguments should be supported
3424        os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3425                    offset=0, count=4096)
3426        if self.SUPPORT_HEADERS_TRAILERS:
3427            os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
3428                        offset=0, count=4096,
3429                        headers=(), trailers=(), flags=0)
3430
3431    # --- headers / trailers tests
3432
3433    @requires_headers_trailers
3434    def test_headers(self):
3435        total_sent = 0
3436        expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
3437        sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
3438                            headers=[b"x" * 512, b"y" * 256])
3439        self.assertLessEqual(sent, 512 + 256 + 4096)
3440        total_sent += sent
3441        offset = 4096
3442        while total_sent < len(expected_data):
3443            nbytes = min(len(expected_data) - total_sent, 4096)
3444            sent = self.sendfile_wrapper(self.sockno, self.fileno,
3445                                                    offset, nbytes)
3446            if sent == 0:
3447                break
3448            self.assertLessEqual(sent, nbytes)
3449            total_sent += sent
3450            offset += sent
3451
3452        self.assertEqual(total_sent, len(expected_data))
3453        self.client.close()
3454        self.server.wait()
3455        data = self.server.handler_instance.get_data()
3456        self.assertEqual(hash(data), hash(expected_data))
3457
3458    @requires_headers_trailers
3459    def test_trailers(self):
3460        TESTFN2 = os_helper.TESTFN + "2"
3461        file_data = b"abcdef"
3462
3463        self.addCleanup(os_helper.unlink, TESTFN2)
3464        create_file(TESTFN2, file_data)
3465
3466        with open(TESTFN2, 'rb') as f:
3467            os.sendfile(self.sockno, f.fileno(), 0, 5,
3468                        trailers=[b"123456", b"789"])
3469            self.client.close()
3470            self.server.wait()
3471            data = self.server.handler_instance.get_data()
3472            self.assertEqual(data, b"abcde123456789")
3473
3474    @requires_headers_trailers
3475    @requires_32b
3476    def test_headers_overflow_32bits(self):
3477        self.server.handler_instance.accumulate = False
3478        with self.assertRaises(OSError) as cm:
3479            os.sendfile(self.sockno, self.fileno, 0, 0,
3480                        headers=[b"x" * 2**16] * 2**15)
3481        self.assertEqual(cm.exception.errno, errno.EINVAL)
3482
3483    @requires_headers_trailers
3484    @requires_32b
3485    def test_trailers_overflow_32bits(self):
3486        self.server.handler_instance.accumulate = False
3487        with self.assertRaises(OSError) as cm:
3488            os.sendfile(self.sockno, self.fileno, 0, 0,
3489                        trailers=[b"x" * 2**16] * 2**15)
3490        self.assertEqual(cm.exception.errno, errno.EINVAL)
3491
3492    @requires_headers_trailers
3493    @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3494                         'test needs os.SF_NODISKIO')
3495    def test_flags(self):
3496        try:
3497            os.sendfile(self.sockno, self.fileno, 0, 4096,
3498                        flags=os.SF_NODISKIO)
3499        except OSError as err:
3500            if err.errno not in (errno.EBUSY, errno.EAGAIN):
3501                raise
3502
3503
3504def supports_extended_attributes():
3505    if not hasattr(os, "setxattr"):
3506        return False
3507
3508    try:
3509        with open(os_helper.TESTFN, "xb", 0) as fp:
3510            try:
3511                os.setxattr(fp.fileno(), b"user.test", b"")
3512            except OSError:
3513                return False
3514    finally:
3515        os_helper.unlink(os_helper.TESTFN)
3516
3517    return True
3518
3519
3520@unittest.skipUnless(supports_extended_attributes(),
3521                     "no non-broken extended attribute support")
3522# Kernels < 2.6.39 don't respect setxattr flags.
3523@support.requires_linux_version(2, 6, 39)
3524class ExtendedAttributeTests(unittest.TestCase):
3525
3526    def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
3527        fn = os_helper.TESTFN
3528        self.addCleanup(os_helper.unlink, fn)
3529        create_file(fn)
3530
3531        with self.assertRaises(OSError) as cm:
3532            getxattr(fn, s("user.test"), **kwargs)
3533        self.assertEqual(cm.exception.errno, errno.ENODATA)
3534
3535        init_xattr = listxattr(fn)
3536        self.assertIsInstance(init_xattr, list)
3537
3538        setxattr(fn, s("user.test"), b"", **kwargs)
3539        xattr = set(init_xattr)
3540        xattr.add("user.test")
3541        self.assertEqual(set(listxattr(fn)), xattr)
3542        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3543        setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3544        self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
3545
3546        with self.assertRaises(OSError) as cm:
3547            setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
3548        self.assertEqual(cm.exception.errno, errno.EEXIST)
3549
3550        with self.assertRaises(OSError) as cm:
3551            setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
3552        self.assertEqual(cm.exception.errno, errno.ENODATA)
3553
3554        setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
3555        xattr.add("user.test2")
3556        self.assertEqual(set(listxattr(fn)), xattr)
3557        removexattr(fn, s("user.test"), **kwargs)
3558
3559        with self.assertRaises(OSError) as cm:
3560            getxattr(fn, s("user.test"), **kwargs)
3561        self.assertEqual(cm.exception.errno, errno.ENODATA)
3562
3563        xattr.remove("user.test")
3564        self.assertEqual(set(listxattr(fn)), xattr)
3565        self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3566        setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3567        self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3568        removexattr(fn, s("user.test"), **kwargs)
3569        many = sorted("user.test{}".format(i) for i in range(100))
3570        for thing in many:
3571            setxattr(fn, thing, b"x", **kwargs)
3572        self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
3573
3574    def _check_xattrs(self, *args, **kwargs):
3575        self._check_xattrs_str(str, *args, **kwargs)
3576        os_helper.unlink(os_helper.TESTFN)
3577
3578        self._check_xattrs_str(os.fsencode, *args, **kwargs)
3579        os_helper.unlink(os_helper.TESTFN)
3580
3581    def test_simple(self):
3582        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3583                           os.listxattr)
3584
3585    def test_lpath(self):
3586        self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3587                           os.listxattr, follow_symlinks=False)
3588
3589    def test_fds(self):
3590        def getxattr(path, *args):
3591            with open(path, "rb") as fp:
3592                return os.getxattr(fp.fileno(), *args)
3593        def setxattr(path, *args):
3594            with open(path, "wb", 0) as fp:
3595                os.setxattr(fp.fileno(), *args)
3596        def removexattr(path, *args):
3597            with open(path, "wb", 0) as fp:
3598                os.removexattr(fp.fileno(), *args)
3599        def listxattr(path, *args):
3600            with open(path, "rb") as fp:
3601                return os.listxattr(fp.fileno(), *args)
3602        self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3603
3604
3605@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3606class TermsizeTests(unittest.TestCase):
3607    def test_does_not_crash(self):
3608        """Check if get_terminal_size() returns a meaningful value.
3609
3610        There's no easy portable way to actually check the size of the
3611        terminal, so let's check if it returns something sensible instead.
3612        """
3613        try:
3614            size = os.get_terminal_size()
3615        except OSError as e:
3616            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3617                # Under win32 a generic OSError can be thrown if the
3618                # handle cannot be retrieved
3619                self.skipTest("failed to query terminal size")
3620            raise
3621
3622        self.assertGreaterEqual(size.columns, 0)
3623        self.assertGreaterEqual(size.lines, 0)
3624
3625    def test_stty_match(self):
3626        """Check if stty returns the same results
3627
3628        stty actually tests stdin, so get_terminal_size is invoked on
3629        stdin explicitly. If stty succeeded, then get_terminal_size()
3630        should work too.
3631        """
3632        try:
3633            size = (
3634                subprocess.check_output(
3635                    ["stty", "size"], stderr=subprocess.DEVNULL, text=True
3636                ).split()
3637            )
3638        except (FileNotFoundError, subprocess.CalledProcessError,
3639                PermissionError):
3640            self.skipTest("stty invocation failed")
3641        expected = (int(size[1]), int(size[0])) # reversed order
3642
3643        try:
3644            actual = os.get_terminal_size(sys.__stdin__.fileno())
3645        except OSError as e:
3646            if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3647                # Under win32 a generic OSError can be thrown if the
3648                # handle cannot be retrieved
3649                self.skipTest("failed to query terminal size")
3650            raise
3651        self.assertEqual(expected, actual)
3652
3653
3654@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
3655@support.requires_linux_version(3, 17)
3656class MemfdCreateTests(unittest.TestCase):
3657    def test_memfd_create(self):
3658        fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3659        self.assertNotEqual(fd, -1)
3660        self.addCleanup(os.close, fd)
3661        self.assertFalse(os.get_inheritable(fd))
3662        with open(fd, "wb", closefd=False) as f:
3663            f.write(b'memfd_create')
3664            self.assertEqual(f.tell(), 12)
3665
3666        fd2 = os.memfd_create("Hi")
3667        self.addCleanup(os.close, fd2)
3668        self.assertFalse(os.get_inheritable(fd2))
3669
3670
3671@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
3672@support.requires_linux_version(2, 6, 30)
3673class EventfdTests(unittest.TestCase):
3674    def test_eventfd_initval(self):
3675        def pack(value):
3676            """Pack as native uint64_t
3677            """
3678            return struct.pack("@Q", value)
3679        size = 8  # read/write 8 bytes
3680        initval = 42
3681        fd = os.eventfd(initval)
3682        self.assertNotEqual(fd, -1)
3683        self.addCleanup(os.close, fd)
3684        self.assertFalse(os.get_inheritable(fd))
3685
3686        # test with raw read/write
3687        res = os.read(fd, size)
3688        self.assertEqual(res, pack(initval))
3689
3690        os.write(fd, pack(23))
3691        res = os.read(fd, size)
3692        self.assertEqual(res, pack(23))
3693
3694        os.write(fd, pack(40))
3695        os.write(fd, pack(2))
3696        res = os.read(fd, size)
3697        self.assertEqual(res, pack(42))
3698
3699        # test with eventfd_read/eventfd_write
3700        os.eventfd_write(fd, 20)
3701        os.eventfd_write(fd, 3)
3702        res = os.eventfd_read(fd)
3703        self.assertEqual(res, 23)
3704
3705    def test_eventfd_semaphore(self):
3706        initval = 2
3707        flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
3708        fd = os.eventfd(initval, flags)
3709        self.assertNotEqual(fd, -1)
3710        self.addCleanup(os.close, fd)
3711
3712        # semaphore starts has initval 2, two reads return '1'
3713        res = os.eventfd_read(fd)
3714        self.assertEqual(res, 1)
3715        res = os.eventfd_read(fd)
3716        self.assertEqual(res, 1)
3717        # third read would block
3718        with self.assertRaises(BlockingIOError):
3719            os.eventfd_read(fd)
3720        with self.assertRaises(BlockingIOError):
3721            os.read(fd, 8)
3722
3723        # increase semaphore counter, read one
3724        os.eventfd_write(fd, 1)
3725        res = os.eventfd_read(fd)
3726        self.assertEqual(res, 1)
3727        # next read would block, too
3728        with self.assertRaises(BlockingIOError):
3729            os.eventfd_read(fd)
3730
3731    def test_eventfd_select(self):
3732        flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
3733        fd = os.eventfd(0, flags)
3734        self.assertNotEqual(fd, -1)
3735        self.addCleanup(os.close, fd)
3736
3737        # counter is zero, only writeable
3738        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3739        self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
3740
3741        # counter is non-zero, read and writeable
3742        os.eventfd_write(fd, 23)
3743        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3744        self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
3745        self.assertEqual(os.eventfd_read(fd), 23)
3746
3747        # counter at max, only readable
3748        os.eventfd_write(fd, (2**64) - 2)
3749        rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
3750        self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
3751        os.eventfd_read(fd)
3752
3753
3754class OSErrorTests(unittest.TestCase):
3755    def setUp(self):
3756        class Str(str):
3757            pass
3758
3759        self.bytes_filenames = []
3760        self.unicode_filenames = []
3761        if os_helper.TESTFN_UNENCODABLE is not None:
3762            decoded = os_helper.TESTFN_UNENCODABLE
3763        else:
3764            decoded = os_helper.TESTFN
3765        self.unicode_filenames.append(decoded)
3766        self.unicode_filenames.append(Str(decoded))
3767        if os_helper.TESTFN_UNDECODABLE is not None:
3768            encoded = os_helper.TESTFN_UNDECODABLE
3769        else:
3770            encoded = os.fsencode(os_helper.TESTFN)
3771        self.bytes_filenames.append(encoded)
3772        self.bytes_filenames.append(bytearray(encoded))
3773        self.bytes_filenames.append(memoryview(encoded))
3774
3775        self.filenames = self.bytes_filenames + self.unicode_filenames
3776
3777    def test_oserror_filename(self):
3778        funcs = [
3779            (self.filenames, os.chdir,),
3780            (self.filenames, os.chmod, 0o777),
3781            (self.filenames, os.lstat,),
3782            (self.filenames, os.open, os.O_RDONLY),
3783            (self.filenames, os.rmdir,),
3784            (self.filenames, os.stat,),
3785            (self.filenames, os.unlink,),
3786        ]
3787        if sys.platform == "win32":
3788            funcs.extend((
3789                (self.bytes_filenames, os.rename, b"dst"),
3790                (self.bytes_filenames, os.replace, b"dst"),
3791                (self.unicode_filenames, os.rename, "dst"),
3792                (self.unicode_filenames, os.replace, "dst"),
3793                (self.unicode_filenames, os.listdir, ),
3794            ))
3795        else:
3796            funcs.extend((
3797                (self.filenames, os.listdir,),
3798                (self.filenames, os.rename, "dst"),
3799                (self.filenames, os.replace, "dst"),
3800            ))
3801        if hasattr(os, "chown"):
3802            funcs.append((self.filenames, os.chown, 0, 0))
3803        if hasattr(os, "lchown"):
3804            funcs.append((self.filenames, os.lchown, 0, 0))
3805        if hasattr(os, "truncate"):
3806            funcs.append((self.filenames, os.truncate, 0))
3807        if hasattr(os, "chflags"):
3808            funcs.append((self.filenames, os.chflags, 0))
3809        if hasattr(os, "lchflags"):
3810            funcs.append((self.filenames, os.lchflags, 0))
3811        if hasattr(os, "chroot"):
3812            funcs.append((self.filenames, os.chroot,))
3813        if hasattr(os, "link"):
3814            if sys.platform == "win32":
3815                funcs.append((self.bytes_filenames, os.link, b"dst"))
3816                funcs.append((self.unicode_filenames, os.link, "dst"))
3817            else:
3818                funcs.append((self.filenames, os.link, "dst"))
3819        if hasattr(os, "listxattr"):
3820            funcs.extend((
3821                (self.filenames, os.listxattr,),
3822                (self.filenames, os.getxattr, "user.test"),
3823                (self.filenames, os.setxattr, "user.test", b'user'),
3824                (self.filenames, os.removexattr, "user.test"),
3825            ))
3826        if hasattr(os, "lchmod"):
3827            funcs.append((self.filenames, os.lchmod, 0o777))
3828        if hasattr(os, "readlink"):
3829            funcs.append((self.filenames, os.readlink,))
3830
3831
3832        for filenames, func, *func_args in funcs:
3833            for name in filenames:
3834                try:
3835                    if isinstance(name, (str, bytes)):
3836                        func(name, *func_args)
3837                    else:
3838                        with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3839                            func(name, *func_args)
3840                except OSError as err:
3841                    self.assertIs(err.filename, name, str(func))
3842                except UnicodeDecodeError:
3843                    pass
3844                else:
3845                    self.fail("No exception thrown by {}".format(func))
3846
3847class CPUCountTests(unittest.TestCase):
3848    def test_cpu_count(self):
3849        cpus = os.cpu_count()
3850        if cpus is not None:
3851            self.assertIsInstance(cpus, int)
3852            self.assertGreater(cpus, 0)
3853        else:
3854            self.skipTest("Could not determine the number of CPUs")
3855
3856
3857class FDInheritanceTests(unittest.TestCase):
3858    def test_get_set_inheritable(self):
3859        fd = os.open(__file__, os.O_RDONLY)
3860        self.addCleanup(os.close, fd)
3861        self.assertEqual(os.get_inheritable(fd), False)
3862
3863        os.set_inheritable(fd, True)
3864        self.assertEqual(os.get_inheritable(fd), True)
3865
3866    @unittest.skipIf(fcntl is None, "need fcntl")
3867    def test_get_inheritable_cloexec(self):
3868        fd = os.open(__file__, os.O_RDONLY)
3869        self.addCleanup(os.close, fd)
3870        self.assertEqual(os.get_inheritable(fd), False)
3871
3872        # clear FD_CLOEXEC flag
3873        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3874        flags &= ~fcntl.FD_CLOEXEC
3875        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
3876
3877        self.assertEqual(os.get_inheritable(fd), True)
3878
3879    @unittest.skipIf(fcntl is None, "need fcntl")
3880    def test_set_inheritable_cloexec(self):
3881        fd = os.open(__file__, os.O_RDONLY)
3882        self.addCleanup(os.close, fd)
3883        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3884                         fcntl.FD_CLOEXEC)
3885
3886        os.set_inheritable(fd, True)
3887        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3888                         0)
3889
3890    @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH")
3891    def test_get_set_inheritable_o_path(self):
3892        fd = os.open(__file__, os.O_PATH)
3893        self.addCleanup(os.close, fd)
3894        self.assertEqual(os.get_inheritable(fd), False)
3895
3896        os.set_inheritable(fd, True)
3897        self.assertEqual(os.get_inheritable(fd), True)
3898
3899        os.set_inheritable(fd, False)
3900        self.assertEqual(os.get_inheritable(fd), False)
3901
3902    def test_get_set_inheritable_badf(self):
3903        fd = os_helper.make_bad_fd()
3904
3905        with self.assertRaises(OSError) as ctx:
3906            os.get_inheritable(fd)
3907        self.assertEqual(ctx.exception.errno, errno.EBADF)
3908
3909        with self.assertRaises(OSError) as ctx:
3910            os.set_inheritable(fd, True)
3911        self.assertEqual(ctx.exception.errno, errno.EBADF)
3912
3913        with self.assertRaises(OSError) as ctx:
3914            os.set_inheritable(fd, False)
3915        self.assertEqual(ctx.exception.errno, errno.EBADF)
3916
3917    def test_open(self):
3918        fd = os.open(__file__, os.O_RDONLY)
3919        self.addCleanup(os.close, fd)
3920        self.assertEqual(os.get_inheritable(fd), False)
3921
3922    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3923    def test_pipe(self):
3924        rfd, wfd = os.pipe()
3925        self.addCleanup(os.close, rfd)
3926        self.addCleanup(os.close, wfd)
3927        self.assertEqual(os.get_inheritable(rfd), False)
3928        self.assertEqual(os.get_inheritable(wfd), False)
3929
3930    def test_dup(self):
3931        fd1 = os.open(__file__, os.O_RDONLY)
3932        self.addCleanup(os.close, fd1)
3933
3934        fd2 = os.dup(fd1)
3935        self.addCleanup(os.close, fd2)
3936        self.assertEqual(os.get_inheritable(fd2), False)
3937
3938    def test_dup_standard_stream(self):
3939        fd = os.dup(1)
3940        self.addCleanup(os.close, fd)
3941        self.assertGreater(fd, 0)
3942
3943    @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3944    def test_dup_nul(self):
3945        # os.dup() was creating inheritable fds for character files.
3946        fd1 = os.open('NUL', os.O_RDONLY)
3947        self.addCleanup(os.close, fd1)
3948        fd2 = os.dup(fd1)
3949        self.addCleanup(os.close, fd2)
3950        self.assertFalse(os.get_inheritable(fd2))
3951
3952    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3953    def test_dup2(self):
3954        fd = os.open(__file__, os.O_RDONLY)
3955        self.addCleanup(os.close, fd)
3956
3957        # inheritable by default
3958        fd2 = os.open(__file__, os.O_RDONLY)
3959        self.addCleanup(os.close, fd2)
3960        self.assertEqual(os.dup2(fd, fd2), fd2)
3961        self.assertTrue(os.get_inheritable(fd2))
3962
3963        # force non-inheritable
3964        fd3 = os.open(__file__, os.O_RDONLY)
3965        self.addCleanup(os.close, fd3)
3966        self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3967        self.assertFalse(os.get_inheritable(fd3))
3968
3969    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3970    def test_openpty(self):
3971        master_fd, slave_fd = os.openpty()
3972        self.addCleanup(os.close, master_fd)
3973        self.addCleanup(os.close, slave_fd)
3974        self.assertEqual(os.get_inheritable(master_fd), False)
3975        self.assertEqual(os.get_inheritable(slave_fd), False)
3976
3977
3978class PathTConverterTests(unittest.TestCase):
3979    # tuples of (function name, allows fd arguments, additional arguments to
3980    # function, cleanup function)
3981    functions = [
3982        ('stat', True, (), None),
3983        ('lstat', False, (), None),
3984        ('access', False, (os.F_OK,), None),
3985        ('chflags', False, (0,), None),
3986        ('lchflags', False, (0,), None),
3987        ('open', False, (0,), getattr(os, 'close', None)),
3988    ]
3989
3990    def test_path_t_converter(self):
3991        str_filename = os_helper.TESTFN
3992        if os.name == 'nt':
3993            bytes_fspath = bytes_filename = None
3994        else:
3995            bytes_filename = os.fsencode(os_helper.TESTFN)
3996            bytes_fspath = FakePath(bytes_filename)
3997        fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
3998        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
3999        self.addCleanup(os.close, fd)
4000
4001        int_fspath = FakePath(fd)
4002        str_fspath = FakePath(str_filename)
4003
4004        for name, allow_fd, extra_args, cleanup_fn in self.functions:
4005            with self.subTest(name=name):
4006                try:
4007                    fn = getattr(os, name)
4008                except AttributeError:
4009                    continue
4010
4011                for path in (str_filename, bytes_filename, str_fspath,
4012                             bytes_fspath):
4013                    if path is None:
4014                        continue
4015                    with self.subTest(name=name, path=path):
4016                        result = fn(path, *extra_args)
4017                        if cleanup_fn is not None:
4018                            cleanup_fn(result)
4019
4020                with self.assertRaisesRegex(
4021                        TypeError, 'to return str or bytes'):
4022                    fn(int_fspath, *extra_args)
4023
4024                if allow_fd:
4025                    result = fn(fd, *extra_args)  # should not fail
4026                    if cleanup_fn is not None:
4027                        cleanup_fn(result)
4028                else:
4029                    with self.assertRaisesRegex(
4030                            TypeError,
4031                            'os.PathLike'):
4032                        fn(fd, *extra_args)
4033
4034    def test_path_t_converter_and_custom_class(self):
4035        msg = r'__fspath__\(\) to return str or bytes, not %s'
4036        with self.assertRaisesRegex(TypeError, msg % r'int'):
4037            os.stat(FakePath(2))
4038        with self.assertRaisesRegex(TypeError, msg % r'float'):
4039            os.stat(FakePath(2.34))
4040        with self.assertRaisesRegex(TypeError, msg % r'object'):
4041            os.stat(FakePath(object()))
4042
4043
4044@unittest.skipUnless(hasattr(os, 'get_blocking'),
4045                     'needs os.get_blocking() and os.set_blocking()')
4046class BlockingTests(unittest.TestCase):
4047    def test_blocking(self):
4048        fd = os.open(__file__, os.O_RDONLY)
4049        self.addCleanup(os.close, fd)
4050        self.assertEqual(os.get_blocking(fd), True)
4051
4052        os.set_blocking(fd, False)
4053        self.assertEqual(os.get_blocking(fd), False)
4054
4055        os.set_blocking(fd, True)
4056        self.assertEqual(os.get_blocking(fd), True)
4057
4058
4059
4060class ExportsTests(unittest.TestCase):
4061    def test_os_all(self):
4062        self.assertIn('open', os.__all__)
4063        self.assertIn('walk', os.__all__)
4064
4065
4066class TestDirEntry(unittest.TestCase):
4067    def setUp(self):
4068        self.path = os.path.realpath(os_helper.TESTFN)
4069        self.addCleanup(os_helper.rmtree, self.path)
4070        os.mkdir(self.path)
4071
4072    def test_uninstantiable(self):
4073        self.assertRaises(TypeError, os.DirEntry)
4074
4075    def test_unpickable(self):
4076        filename = create_file(os.path.join(self.path, "file.txt"), b'python')
4077        entry = [entry for entry in os.scandir(self.path)].pop()
4078        self.assertIsInstance(entry, os.DirEntry)
4079        self.assertEqual(entry.name, "file.txt")
4080        import pickle
4081        self.assertRaises(TypeError, pickle.dumps, entry, filename)
4082
4083
4084class TestScandir(unittest.TestCase):
4085    check_no_resource_warning = warnings_helper.check_no_resource_warning
4086
4087    def setUp(self):
4088        self.path = os.path.realpath(os_helper.TESTFN)
4089        self.bytes_path = os.fsencode(self.path)
4090        self.addCleanup(os_helper.rmtree, self.path)
4091        os.mkdir(self.path)
4092
4093    def create_file(self, name="file.txt"):
4094        path = self.bytes_path if isinstance(name, bytes) else self.path
4095        filename = os.path.join(path, name)
4096        create_file(filename, b'python')
4097        return filename
4098
4099    def get_entries(self, names):
4100        entries = dict((entry.name, entry)
4101                       for entry in os.scandir(self.path))
4102        self.assertEqual(sorted(entries.keys()), names)
4103        return entries
4104
4105    def assert_stat_equal(self, stat1, stat2, skip_fields):
4106        if skip_fields:
4107            for attr in dir(stat1):
4108                if not attr.startswith("st_"):
4109                    continue
4110                if attr in ("st_dev", "st_ino", "st_nlink"):
4111                    continue
4112                self.assertEqual(getattr(stat1, attr),
4113                                 getattr(stat2, attr),
4114                                 (stat1, stat2, attr))
4115        else:
4116            self.assertEqual(stat1, stat2)
4117
4118    def test_uninstantiable(self):
4119        scandir_iter = os.scandir(self.path)
4120        self.assertRaises(TypeError, type(scandir_iter))
4121        scandir_iter.close()
4122
4123    def test_unpickable(self):
4124        filename = self.create_file("file.txt")
4125        scandir_iter = os.scandir(self.path)
4126        import pickle
4127        self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename)
4128        scandir_iter.close()
4129
4130    def check_entry(self, entry, name, is_dir, is_file, is_symlink):
4131        self.assertIsInstance(entry, os.DirEntry)
4132        self.assertEqual(entry.name, name)
4133        self.assertEqual(entry.path, os.path.join(self.path, name))
4134        self.assertEqual(entry.inode(),
4135                         os.stat(entry.path, follow_symlinks=False).st_ino)
4136
4137        entry_stat = os.stat(entry.path)
4138        self.assertEqual(entry.is_dir(),
4139                         stat.S_ISDIR(entry_stat.st_mode))
4140        self.assertEqual(entry.is_file(),
4141                         stat.S_ISREG(entry_stat.st_mode))
4142        self.assertEqual(entry.is_symlink(),
4143                         os.path.islink(entry.path))
4144
4145        entry_lstat = os.stat(entry.path, follow_symlinks=False)
4146        self.assertEqual(entry.is_dir(follow_symlinks=False),
4147                         stat.S_ISDIR(entry_lstat.st_mode))
4148        self.assertEqual(entry.is_file(follow_symlinks=False),
4149                         stat.S_ISREG(entry_lstat.st_mode))
4150
4151        self.assert_stat_equal(entry.stat(),
4152                               entry_stat,
4153                               os.name == 'nt' and not is_symlink)
4154        self.assert_stat_equal(entry.stat(follow_symlinks=False),
4155                               entry_lstat,
4156                               os.name == 'nt')
4157
4158    def test_attributes(self):
4159        link = hasattr(os, 'link')
4160        symlink = os_helper.can_symlink()
4161
4162        dirname = os.path.join(self.path, "dir")
4163        os.mkdir(dirname)
4164        filename = self.create_file("file.txt")
4165        if link:
4166            try:
4167                os.link(filename, os.path.join(self.path, "link_file.txt"))
4168            except PermissionError as e:
4169                self.skipTest('os.link(): %s' % e)
4170        if symlink:
4171            os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
4172                       target_is_directory=True)
4173            os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
4174
4175        names = ['dir', 'file.txt']
4176        if link:
4177            names.append('link_file.txt')
4178        if symlink:
4179            names.extend(('symlink_dir', 'symlink_file.txt'))
4180        entries = self.get_entries(names)
4181
4182        entry = entries['dir']
4183        self.check_entry(entry, 'dir', True, False, False)
4184
4185        entry = entries['file.txt']
4186        self.check_entry(entry, 'file.txt', False, True, False)
4187
4188        if link:
4189            entry = entries['link_file.txt']
4190            self.check_entry(entry, 'link_file.txt', False, True, False)
4191
4192        if symlink:
4193            entry = entries['symlink_dir']
4194            self.check_entry(entry, 'symlink_dir', True, False, True)
4195
4196            entry = entries['symlink_file.txt']
4197            self.check_entry(entry, 'symlink_file.txt', False, True, True)
4198
4199    def get_entry(self, name):
4200        path = self.bytes_path if isinstance(name, bytes) else self.path
4201        entries = list(os.scandir(path))
4202        self.assertEqual(len(entries), 1)
4203
4204        entry = entries[0]
4205        self.assertEqual(entry.name, name)
4206        return entry
4207
4208    def create_file_entry(self, name='file.txt'):
4209        filename = self.create_file(name=name)
4210        return self.get_entry(os.path.basename(filename))
4211
4212    def test_current_directory(self):
4213        filename = self.create_file()
4214        old_dir = os.getcwd()
4215        try:
4216            os.chdir(self.path)
4217
4218            # call scandir() without parameter: it must list the content
4219            # of the current directory
4220            entries = dict((entry.name, entry) for entry in os.scandir())
4221            self.assertEqual(sorted(entries.keys()),
4222                             [os.path.basename(filename)])
4223        finally:
4224            os.chdir(old_dir)
4225
4226    def test_repr(self):
4227        entry = self.create_file_entry()
4228        self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
4229
4230    def test_fspath_protocol(self):
4231        entry = self.create_file_entry()
4232        self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
4233
4234    def test_fspath_protocol_bytes(self):
4235        bytes_filename = os.fsencode('bytesfile.txt')
4236        bytes_entry = self.create_file_entry(name=bytes_filename)
4237        fspath = os.fspath(bytes_entry)
4238        self.assertIsInstance(fspath, bytes)
4239        self.assertEqual(fspath,
4240                         os.path.join(os.fsencode(self.path),bytes_filename))
4241
4242    def test_removed_dir(self):
4243        path = os.path.join(self.path, 'dir')
4244
4245        os.mkdir(path)
4246        entry = self.get_entry('dir')
4247        os.rmdir(path)
4248
4249        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4250        if os.name == 'nt':
4251            self.assertTrue(entry.is_dir())
4252        self.assertFalse(entry.is_file())
4253        self.assertFalse(entry.is_symlink())
4254        if os.name == 'nt':
4255            self.assertRaises(FileNotFoundError, entry.inode)
4256            # don't fail
4257            entry.stat()
4258            entry.stat(follow_symlinks=False)
4259        else:
4260            self.assertGreater(entry.inode(), 0)
4261            self.assertRaises(FileNotFoundError, entry.stat)
4262            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4263
4264    def test_removed_file(self):
4265        entry = self.create_file_entry()
4266        os.unlink(entry.path)
4267
4268        self.assertFalse(entry.is_dir())
4269        # On POSIX, is_dir() result depends if scandir() filled d_type or not
4270        if os.name == 'nt':
4271            self.assertTrue(entry.is_file())
4272        self.assertFalse(entry.is_symlink())
4273        if os.name == 'nt':
4274            self.assertRaises(FileNotFoundError, entry.inode)
4275            # don't fail
4276            entry.stat()
4277            entry.stat(follow_symlinks=False)
4278        else:
4279            self.assertGreater(entry.inode(), 0)
4280            self.assertRaises(FileNotFoundError, entry.stat)
4281            self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
4282
4283    def test_broken_symlink(self):
4284        if not os_helper.can_symlink():
4285            return self.skipTest('cannot create symbolic link')
4286
4287        filename = self.create_file("file.txt")
4288        os.symlink(filename,
4289                   os.path.join(self.path, "symlink.txt"))
4290        entries = self.get_entries(['file.txt', 'symlink.txt'])
4291        entry = entries['symlink.txt']
4292        os.unlink(filename)
4293
4294        self.assertGreater(entry.inode(), 0)
4295        self.assertFalse(entry.is_dir())
4296        self.assertFalse(entry.is_file())  # broken symlink returns False
4297        self.assertFalse(entry.is_dir(follow_symlinks=False))
4298        self.assertFalse(entry.is_file(follow_symlinks=False))
4299        self.assertTrue(entry.is_symlink())
4300        self.assertRaises(FileNotFoundError, entry.stat)
4301        # don't fail
4302        entry.stat(follow_symlinks=False)
4303
4304    def test_bytes(self):
4305        self.create_file("file.txt")
4306
4307        path_bytes = os.fsencode(self.path)
4308        entries = list(os.scandir(path_bytes))
4309        self.assertEqual(len(entries), 1, entries)
4310        entry = entries[0]
4311
4312        self.assertEqual(entry.name, b'file.txt')
4313        self.assertEqual(entry.path,
4314                         os.fsencode(os.path.join(self.path, 'file.txt')))
4315
4316    def test_bytes_like(self):
4317        self.create_file("file.txt")
4318
4319        for cls in bytearray, memoryview:
4320            path_bytes = cls(os.fsencode(self.path))
4321            with self.assertWarns(DeprecationWarning):
4322                entries = list(os.scandir(path_bytes))
4323            self.assertEqual(len(entries), 1, entries)
4324            entry = entries[0]
4325
4326            self.assertEqual(entry.name, b'file.txt')
4327            self.assertEqual(entry.path,
4328                             os.fsencode(os.path.join(self.path, 'file.txt')))
4329            self.assertIs(type(entry.name), bytes)
4330            self.assertIs(type(entry.path), bytes)
4331
4332    @unittest.skipUnless(os.listdir in os.supports_fd,
4333                         'fd support for listdir required for this test.')
4334    def test_fd(self):
4335        self.assertIn(os.scandir, os.supports_fd)
4336        self.create_file('file.txt')
4337        expected_names = ['file.txt']
4338        if os_helper.can_symlink():
4339            os.symlink('file.txt', os.path.join(self.path, 'link'))
4340            expected_names.append('link')
4341
4342        fd = os.open(self.path, os.O_RDONLY)
4343        try:
4344            with os.scandir(fd) as it:
4345                entries = list(it)
4346            names = [entry.name for entry in entries]
4347            self.assertEqual(sorted(names), expected_names)
4348            self.assertEqual(names, os.listdir(fd))
4349            for entry in entries:
4350                self.assertEqual(entry.path, entry.name)
4351                self.assertEqual(os.fspath(entry), entry.name)
4352                self.assertEqual(entry.is_symlink(), entry.name == 'link')
4353                if os.stat in os.supports_dir_fd:
4354                    st = os.stat(entry.name, dir_fd=fd)
4355                    self.assertEqual(entry.stat(), st)
4356                    st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
4357                    self.assertEqual(entry.stat(follow_symlinks=False), st)
4358        finally:
4359            os.close(fd)
4360
4361    def test_empty_path(self):
4362        self.assertRaises(FileNotFoundError, os.scandir, '')
4363
4364    def test_consume_iterator_twice(self):
4365        self.create_file("file.txt")
4366        iterator = os.scandir(self.path)
4367
4368        entries = list(iterator)
4369        self.assertEqual(len(entries), 1, entries)
4370
4371        # check than consuming the iterator twice doesn't raise exception
4372        entries2 = list(iterator)
4373        self.assertEqual(len(entries2), 0, entries2)
4374
4375    def test_bad_path_type(self):
4376        for obj in [1.234, {}, []]:
4377            self.assertRaises(TypeError, os.scandir, obj)
4378
4379    def test_close(self):
4380        self.create_file("file.txt")
4381        self.create_file("file2.txt")
4382        iterator = os.scandir(self.path)
4383        next(iterator)
4384        iterator.close()
4385        # multiple closes
4386        iterator.close()
4387        with self.check_no_resource_warning():
4388            del iterator
4389
4390    def test_context_manager(self):
4391        self.create_file("file.txt")
4392        self.create_file("file2.txt")
4393        with os.scandir(self.path) as iterator:
4394            next(iterator)
4395        with self.check_no_resource_warning():
4396            del iterator
4397
4398    def test_context_manager_close(self):
4399        self.create_file("file.txt")
4400        self.create_file("file2.txt")
4401        with os.scandir(self.path) as iterator:
4402            next(iterator)
4403            iterator.close()
4404
4405    def test_context_manager_exception(self):
4406        self.create_file("file.txt")
4407        self.create_file("file2.txt")
4408        with self.assertRaises(ZeroDivisionError):
4409            with os.scandir(self.path) as iterator:
4410                next(iterator)
4411                1/0
4412        with self.check_no_resource_warning():
4413            del iterator
4414
4415    def test_resource_warning(self):
4416        self.create_file("file.txt")
4417        self.create_file("file2.txt")
4418        iterator = os.scandir(self.path)
4419        next(iterator)
4420        with self.assertWarns(ResourceWarning):
4421            del iterator
4422            support.gc_collect()
4423        # exhausted iterator
4424        iterator = os.scandir(self.path)
4425        list(iterator)
4426        with self.check_no_resource_warning():
4427            del iterator
4428
4429
4430class TestPEP519(unittest.TestCase):
4431
4432    # Abstracted so it can be overridden to test pure Python implementation
4433    # if a C version is provided.
4434    fspath = staticmethod(os.fspath)
4435
4436    def test_return_bytes(self):
4437        for b in b'hello', b'goodbye', b'some/path/and/file':
4438            self.assertEqual(b, self.fspath(b))
4439
4440    def test_return_string(self):
4441        for s in 'hello', 'goodbye', 'some/path/and/file':
4442            self.assertEqual(s, self.fspath(s))
4443
4444    def test_fsencode_fsdecode(self):
4445        for p in "path/like/object", b"path/like/object":
4446            pathlike = FakePath(p)
4447
4448            self.assertEqual(p, self.fspath(pathlike))
4449            self.assertEqual(b"path/like/object", os.fsencode(pathlike))
4450            self.assertEqual("path/like/object", os.fsdecode(pathlike))
4451
4452    def test_pathlike(self):
4453        self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
4454        self.assertTrue(issubclass(FakePath, os.PathLike))
4455        self.assertTrue(isinstance(FakePath('x'), os.PathLike))
4456
4457    def test_garbage_in_exception_out(self):
4458        vapor = type('blah', (), {})
4459        for o in int, type, os, vapor():
4460            self.assertRaises(TypeError, self.fspath, o)
4461
4462    def test_argument_required(self):
4463        self.assertRaises(TypeError, self.fspath)
4464
4465    def test_bad_pathlike(self):
4466        # __fspath__ returns a value other than str or bytes.
4467        self.assertRaises(TypeError, self.fspath, FakePath(42))
4468        # __fspath__ attribute that is not callable.
4469        c = type('foo', (), {})
4470        c.__fspath__ = 1
4471        self.assertRaises(TypeError, self.fspath, c())
4472        # __fspath__ raises an exception.
4473        self.assertRaises(ZeroDivisionError, self.fspath,
4474                          FakePath(ZeroDivisionError()))
4475
4476    def test_pathlike_subclasshook(self):
4477        # bpo-38878: subclasshook causes subclass checks
4478        # true on abstract implementation.
4479        class A(os.PathLike):
4480            pass
4481        self.assertFalse(issubclass(FakePath, A))
4482        self.assertTrue(issubclass(FakePath, os.PathLike))
4483
4484    def test_pathlike_class_getitem(self):
4485        self.assertIsInstance(os.PathLike[bytes], types.GenericAlias)
4486
4487
4488class TimesTests(unittest.TestCase):
4489    def test_times(self):
4490        times = os.times()
4491        self.assertIsInstance(times, os.times_result)
4492
4493        for field in ('user', 'system', 'children_user', 'children_system',
4494                      'elapsed'):
4495            value = getattr(times, field)
4496            self.assertIsInstance(value, float)
4497
4498        if os.name == 'nt':
4499            self.assertEqual(times.children_user, 0)
4500            self.assertEqual(times.children_system, 0)
4501            self.assertEqual(times.elapsed, 0)
4502
4503
4504@requires_os_func('fork')
4505class ForkTests(unittest.TestCase):
4506    def test_fork(self):
4507        # bpo-42540: ensure os.fork() with non-default memory allocator does
4508        # not crash on exit.
4509        code = """if 1:
4510            import os
4511            from test import support
4512            pid = os.fork()
4513            if pid != 0:
4514                support.wait_process(pid, exitcode=0)
4515        """
4516        assert_python_ok("-c", code)
4517        assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug")
4518
4519
4520# Only test if the C version is provided, otherwise TestPEP519 already tested
4521# the pure Python implementation.
4522if hasattr(os, "_fspath"):
4523    class TestPEP519PurePython(TestPEP519):
4524
4525        """Explicitly test the pure Python implementation of os.fspath()."""
4526
4527        fspath = staticmethod(os._fspath)
4528
4529
4530if __name__ == "__main__":
4531    unittest.main()
4532