• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections.abc
2import contextlib
3import errno
4import os
5import re
6import stat
7import sys
8import time
9import unittest
10import warnings
11
12
13# Filename used for testing
14if os.name == 'java':
15    # Jython disallows @ in module names
16    TESTFN_ASCII = '$test'
17else:
18    TESTFN_ASCII = '@test'
19
20# Disambiguate TESTFN for parallel testing, while letting it remain a valid
21# module name.
22TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
23
24# TESTFN_UNICODE is a non-ascii filename
25TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
26if sys.platform == 'darwin':
27    # In Mac OS X's VFS API file names are, by definition, canonically
28    # decomposed Unicode, encoded using UTF-8. See QA1173:
29    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
30    import unicodedata
31    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
32
33# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
34# encoded by the filesystem encoding (in strict mode). It can be None if we
35# cannot generate such filename.
36TESTFN_UNENCODABLE = None
37if os.name == 'nt':
38    # skip win32s (0) or Windows 9x/ME (1)
39    if sys.getwindowsversion().platform >= 2:
40        # Different kinds of characters from various languages to minimize the
41        # probability that the whole name is encodable to MBCS (issue #9819)
42        TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
43        try:
44            TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
45        except UnicodeEncodeError:
46            pass
47        else:
48            print('WARNING: The filename %r CAN be encoded by the filesystem '
49                  'encoding (%s). Unicode filename tests may not be effective'
50                  % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
51            TESTFN_UNENCODABLE = None
52# Mac OS X denies unencodable filenames (invalid utf-8)
53elif sys.platform != 'darwin':
54    try:
55        # ascii and utf-8 cannot encode the byte 0xff
56        b'\xff'.decode(sys.getfilesystemencoding())
57    except UnicodeDecodeError:
58        # 0xff will be encoded using the surrogate character u+DCFF
59        TESTFN_UNENCODABLE = TESTFN_ASCII \
60            + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
61    else:
62        # File system encoding (eg. ISO-8859-* encodings) can encode
63        # the byte 0xff. Skip some unicode filename tests.
64        pass
65
66# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
67# or an empty string if there is no such character.
68FS_NONASCII = ''
69for character in (
70    # First try printable and common characters to have a readable filename.
71    # For each character, the encoding list are just example of encodings able
72    # to encode the character (the list is not exhaustive).
73
74    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
75    '\u00E6',
76    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
77    '\u0130',
78    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
79    '\u0141',
80    # U+03C6 (Greek Small Letter Phi): cp1253
81    '\u03C6',
82    # U+041A (Cyrillic Capital Letter Ka): cp1251
83    '\u041A',
84    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
85    '\u05D0',
86    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
87    '\u060C',
88    # U+062A (Arabic Letter Teh): cp720
89    '\u062A',
90    # U+0E01 (Thai Character Ko Kai): cp874
91    '\u0E01',
92
93    # Then try more "special" characters. "special" because they may be
94    # interpreted or displayed differently depending on the exact locale
95    # encoding and the font.
96
97    # U+00A0 (No-Break Space)
98    '\u00A0',
99    # U+20AC (Euro Sign)
100    '\u20AC',
101):
102    try:
103        # If Python is set up to use the legacy 'mbcs' in Windows,
104        # 'replace' error mode is used, and encode() returns b'?'
105        # for characters missing in the ANSI codepage
106        if os.fsdecode(os.fsencode(character)) != character:
107            raise UnicodeError
108    except UnicodeError:
109        pass
110    else:
111        FS_NONASCII = character
112        break
113
114# Save the initial cwd
115SAVEDCWD = os.getcwd()
116
117# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
118# decoded from the filesystem encoding (in strict mode). It can be None if we
119# cannot generate such filename (ex: the latin1 encoding can decode any byte
120# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
121# to the surrogateescape error handler (PEP 383), but not from the filesystem
122# encoding in strict mode.
123TESTFN_UNDECODABLE = None
124for name in (
125    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
126    # accepts it to create a file or a directory, or don't accept to enter to
127    # such directory (when the bytes name is used). So test b'\xe7' first:
128    # it is not decodable from cp932.
129    b'\xe7w\xf0',
130    # undecodable from ASCII, UTF-8
131    b'\xff',
132    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
133    # and cp857
134    b'\xae\xd5'
135    # undecodable from UTF-8 (UNIX and Mac OS X)
136    b'\xed\xb2\x80', b'\xed\xb4\x80',
137    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
138    # cp1253, cp1254, cp1255, cp1257, cp1258
139    b'\x81\x98',
140):
141    try:
142        name.decode(sys.getfilesystemencoding())
143    except UnicodeDecodeError:
144        TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
145        break
146
147if FS_NONASCII:
148    TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
149else:
150    TESTFN_NONASCII = None
151TESTFN = TESTFN_NONASCII or TESTFN_ASCII
152
153
154def make_bad_fd():
155    """
156    Create an invalid file descriptor by opening and closing a file and return
157    its fd.
158    """
159    file = open(TESTFN, "wb")
160    try:
161        return file.fileno()
162    finally:
163        file.close()
164        unlink(TESTFN)
165
166
167_can_symlink = None
168
169
170def can_symlink():
171    global _can_symlink
172    if _can_symlink is not None:
173        return _can_symlink
174    symlink_path = TESTFN + "can_symlink"
175    try:
176        os.symlink(TESTFN, symlink_path)
177        can = True
178    except (OSError, NotImplementedError, AttributeError):
179        can = False
180    else:
181        os.remove(symlink_path)
182    _can_symlink = can
183    return can
184
185
186def skip_unless_symlink(test):
187    """Skip decorator for tests that require functional symlink"""
188    ok = can_symlink()
189    msg = "Requires functional symlink implementation"
190    return test if ok else unittest.skip(msg)(test)
191
192
193_can_xattr = None
194
195
196def can_xattr():
197    import tempfile
198    global _can_xattr
199    if _can_xattr is not None:
200        return _can_xattr
201    if not hasattr(os, "setxattr"):
202        can = False
203    else:
204        import platform
205        tmp_dir = tempfile.mkdtemp()
206        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
207        try:
208            with open(TESTFN, "wb") as fp:
209                try:
210                    # TESTFN & tempfile may use different file systems with
211                    # different capabilities
212                    os.setxattr(tmp_fp, b"user.test", b"")
213                    os.setxattr(tmp_name, b"trusted.foo", b"42")
214                    os.setxattr(fp.fileno(), b"user.test", b"")
215                    # Kernels < 2.6.39 don't respect setxattr flags.
216                    kernel_version = platform.release()
217                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
218                    can = m is None or int(m.group(1)) >= 39
219                except OSError:
220                    can = False
221        finally:
222            unlink(TESTFN)
223            unlink(tmp_name)
224            rmdir(tmp_dir)
225    _can_xattr = can
226    return can
227
228
229def skip_unless_xattr(test):
230    """Skip decorator for tests that require functional extended attributes"""
231    ok = can_xattr()
232    msg = "no non-broken extended attribute support"
233    return test if ok else unittest.skip(msg)(test)
234
235
236def unlink(filename):
237    try:
238        _unlink(filename)
239    except (FileNotFoundError, NotADirectoryError):
240        pass
241
242
243if sys.platform.startswith("win"):
244    def _waitfor(func, pathname, waitall=False):
245        # Perform the operation
246        func(pathname)
247        # Now setup the wait loop
248        if waitall:
249            dirname = pathname
250        else:
251            dirname, name = os.path.split(pathname)
252            dirname = dirname or '.'
253        # Check for `pathname` to be removed from the filesystem.
254        # The exponential backoff of the timeout amounts to a total
255        # of ~1 second after which the deletion is probably an error
256        # anyway.
257        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
258        # required when contention occurs.
259        timeout = 0.001
260        while timeout < 1.0:
261            # Note we are only testing for the existence of the file(s) in
262            # the contents of the directory regardless of any security or
263            # access rights.  If we have made it this far, we have sufficient
264            # permissions to do that much using Python's equivalent of the
265            # Windows API FindFirstFile.
266            # Other Windows APIs can fail or give incorrect results when
267            # dealing with files that are pending deletion.
268            L = os.listdir(dirname)
269            if not (L if waitall else name in L):
270                return
271            # Increase the timeout and try again
272            time.sleep(timeout)
273            timeout *= 2
274        warnings.warn('tests may fail, delete still pending for ' + pathname,
275                      RuntimeWarning, stacklevel=4)
276
277    def _unlink(filename):
278        _waitfor(os.unlink, filename)
279
280    def _rmdir(dirname):
281        _waitfor(os.rmdir, dirname)
282
283    def _rmtree(path):
284        from test.support import _force_run
285
286        def _rmtree_inner(path):
287            for name in _force_run(path, os.listdir, path):
288                fullname = os.path.join(path, name)
289                try:
290                    mode = os.lstat(fullname).st_mode
291                except OSError as exc:
292                    print("support.rmtree(): os.lstat(%r) failed with %s"
293                          % (fullname, exc),
294                          file=sys.__stderr__)
295                    mode = 0
296                if stat.S_ISDIR(mode):
297                    _waitfor(_rmtree_inner, fullname, waitall=True)
298                    _force_run(fullname, os.rmdir, fullname)
299                else:
300                    _force_run(fullname, os.unlink, fullname)
301        _waitfor(_rmtree_inner, path, waitall=True)
302        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
303
304    def _longpath(path):
305        try:
306            import ctypes
307        except ImportError:
308            # No ctypes means we can't expands paths.
309            pass
310        else:
311            buffer = ctypes.create_unicode_buffer(len(path) * 2)
312            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
313                                                             len(buffer))
314            if length:
315                return buffer[:length]
316        return path
317else:
318    _unlink = os.unlink
319    _rmdir = os.rmdir
320
321    def _rmtree(path):
322        import shutil
323        try:
324            shutil.rmtree(path)
325            return
326        except OSError:
327            pass
328
329        def _rmtree_inner(path):
330            from test.support import _force_run
331            for name in _force_run(path, os.listdir, path):
332                fullname = os.path.join(path, name)
333                try:
334                    mode = os.lstat(fullname).st_mode
335                except OSError:
336                    mode = 0
337                if stat.S_ISDIR(mode):
338                    _rmtree_inner(fullname)
339                    _force_run(path, os.rmdir, fullname)
340                else:
341                    _force_run(path, os.unlink, fullname)
342        _rmtree_inner(path)
343        os.rmdir(path)
344
345    def _longpath(path):
346        return path
347
348
349def rmdir(dirname):
350    try:
351        _rmdir(dirname)
352    except FileNotFoundError:
353        pass
354
355
356def rmtree(path):
357    try:
358        _rmtree(path)
359    except FileNotFoundError:
360        pass
361
362
363@contextlib.contextmanager
364def temp_dir(path=None, quiet=False):
365    """Return a context manager that creates a temporary directory.
366
367    Arguments:
368
369      path: the directory to create temporarily.  If omitted or None,
370        defaults to creating a temporary directory using tempfile.mkdtemp.
371
372      quiet: if False (the default), the context manager raises an exception
373        on error.  Otherwise, if the path is specified and cannot be
374        created, only a warning is issued.
375
376    """
377    import tempfile
378    dir_created = False
379    if path is None:
380        path = tempfile.mkdtemp()
381        dir_created = True
382        path = os.path.realpath(path)
383    else:
384        try:
385            os.mkdir(path)
386            dir_created = True
387        except OSError as exc:
388            if not quiet:
389                raise
390            warnings.warn(f'tests may fail, unable to create '
391                          f'temporary directory {path!r}: {exc}',
392                          RuntimeWarning, stacklevel=3)
393    if dir_created:
394        pid = os.getpid()
395    try:
396        yield path
397    finally:
398        # In case the process forks, let only the parent remove the
399        # directory. The child has a different process id. (bpo-30028)
400        if dir_created and pid == os.getpid():
401            rmtree(path)
402
403
404@contextlib.contextmanager
405def change_cwd(path, quiet=False):
406    """Return a context manager that changes the current working directory.
407
408    Arguments:
409
410      path: the directory to use as the temporary current working directory.
411
412      quiet: if False (the default), the context manager raises an exception
413        on error.  Otherwise, it issues only a warning and keeps the current
414        working directory the same.
415
416    """
417    saved_dir = os.getcwd()
418    try:
419        os.chdir(os.path.realpath(path))
420    except OSError as exc:
421        if not quiet:
422            raise
423        warnings.warn(f'tests may fail, unable to change the current working '
424                      f'directory to {path!r}: {exc}',
425                      RuntimeWarning, stacklevel=3)
426    try:
427        yield os.getcwd()
428    finally:
429        os.chdir(saved_dir)
430
431
432@contextlib.contextmanager
433def temp_cwd(name='tempcwd', quiet=False):
434    """
435    Context manager that temporarily creates and changes the CWD.
436
437    The function temporarily changes the current working directory
438    after creating a temporary directory in the current directory with
439    name *name*.  If *name* is None, the temporary directory is
440    created using tempfile.mkdtemp.
441
442    If *quiet* is False (default) and it is not possible to
443    create or change the CWD, an error is raised.  If *quiet* is True,
444    only a warning is raised and the original CWD is used.
445
446    """
447    with temp_dir(path=name, quiet=quiet) as temp_path:
448        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
449            yield cwd_dir
450
451
452def create_empty_file(filename):
453    """Create an empty file. If the file already exists, truncate it."""
454    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
455    os.close(fd)
456
457
458def fs_is_case_insensitive(directory):
459    """Detects if the file system for the specified directory
460    is case-insensitive."""
461    import tempfile
462    with tempfile.NamedTemporaryFile(dir=directory) as base:
463        base_path = base.name
464        case_path = base_path.upper()
465        if case_path == base_path:
466            case_path = base_path.lower()
467        try:
468            return os.path.samefile(base_path, case_path)
469        except FileNotFoundError:
470            return False
471
472
473class FakePath:
474    """Simple implementing of the path protocol.
475    """
476    def __init__(self, path):
477        self.path = path
478
479    def __repr__(self):
480        return f'<FakePath {self.path!r}>'
481
482    def __fspath__(self):
483        if (isinstance(self.path, BaseException) or
484            isinstance(self.path, type) and
485                issubclass(self.path, BaseException)):
486            raise self.path
487        else:
488            return self.path
489
490
491def fd_count():
492    """Count the number of open file descriptors.
493    """
494    if sys.platform.startswith(('linux', 'freebsd')):
495        try:
496            names = os.listdir("/proc/self/fd")
497            # Subtract one because listdir() internally opens a file
498            # descriptor to list the content of the /proc/self/fd/ directory.
499            return len(names) - 1
500        except FileNotFoundError:
501            pass
502
503    MAXFD = 256
504    if hasattr(os, 'sysconf'):
505        try:
506            MAXFD = os.sysconf("SC_OPEN_MAX")
507        except OSError:
508            pass
509
510    old_modes = None
511    if sys.platform == 'win32':
512        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
513        # on invalid file descriptor if Python is compiled in debug mode
514        try:
515            import msvcrt
516            msvcrt.CrtSetReportMode
517        except (AttributeError, ImportError):
518            # no msvcrt or a release build
519            pass
520        else:
521            old_modes = {}
522            for report_type in (msvcrt.CRT_WARN,
523                                msvcrt.CRT_ERROR,
524                                msvcrt.CRT_ASSERT):
525                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
526                                                                 0)
527
528    try:
529        count = 0
530        for fd in range(MAXFD):
531            try:
532                # Prefer dup() over fstat(). fstat() can require input/output
533                # whereas dup() doesn't.
534                fd2 = os.dup(fd)
535            except OSError as e:
536                if e.errno != errno.EBADF:
537                    raise
538            else:
539                os.close(fd2)
540                count += 1
541    finally:
542        if old_modes is not None:
543            for report_type in (msvcrt.CRT_WARN,
544                                msvcrt.CRT_ERROR,
545                                msvcrt.CRT_ASSERT):
546                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
547
548    return count
549
550
551if hasattr(os, "umask"):
552    @contextlib.contextmanager
553    def temp_umask(umask):
554        """Context manager that temporarily sets the process umask."""
555        oldmask = os.umask(umask)
556        try:
557            yield
558        finally:
559            os.umask(oldmask)
560
561
562class EnvironmentVarGuard(collections.abc.MutableMapping):
563
564    """Class to help protect the environment variable properly.  Can be used as
565    a context manager."""
566
567    def __init__(self):
568        self._environ = os.environ
569        self._changed = {}
570
571    def __getitem__(self, envvar):
572        return self._environ[envvar]
573
574    def __setitem__(self, envvar, value):
575        # Remember the initial value on the first access
576        if envvar not in self._changed:
577            self._changed[envvar] = self._environ.get(envvar)
578        self._environ[envvar] = value
579
580    def __delitem__(self, envvar):
581        # Remember the initial value on the first access
582        if envvar not in self._changed:
583            self._changed[envvar] = self._environ.get(envvar)
584        if envvar in self._environ:
585            del self._environ[envvar]
586
587    def keys(self):
588        return self._environ.keys()
589
590    def __iter__(self):
591        return iter(self._environ)
592
593    def __len__(self):
594        return len(self._environ)
595
596    def set(self, envvar, value):
597        self[envvar] = value
598
599    def unset(self, envvar):
600        del self[envvar]
601
602    def __enter__(self):
603        return self
604
605    def __exit__(self, *ignore_exc):
606        for (k, v) in self._changed.items():
607            if v is None:
608                if k in self._environ:
609                    del self._environ[k]
610            else:
611                self._environ[k] = v
612        os.environ = self._environ
613