• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections.abc
2import contextlib
3import errno
4import os
5import re
6import stat
7import string
8import sys
9import time
10import unittest
11import warnings
12
13from test import support
14
15
16# Filename used for testing
17TESTFN_ASCII = '@test'
18
19# Disambiguate TESTFN for parallel testing, while letting it remain a valid
20# module name.
21TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
22
23# TESTFN_UNICODE is a non-ascii filename
24TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
25if support.is_apple:
26    # On Apple's VFS API file names are, by definition, canonically
27    # decomposed Unicode, encoded using UTF-8. See QA1173:
28    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
29    import unicodedata
30    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
31
32# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
33# encoded by the filesystem encoding (in strict mode). It can be None if we
34# cannot generate such filename.
35TESTFN_UNENCODABLE = None
36if os.name == 'nt':
37    # skip win32s (0) or Windows 9x/ME (1)
38    if sys.getwindowsversion().platform >= 2:
39        # Different kinds of characters from various languages to minimize the
40        # probability that the whole name is encodable to MBCS (issue #9819)
41        TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
42        try:
43            TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
44        except UnicodeEncodeError:
45            pass
46        else:
47            print('WARNING: The filename %r CAN be encoded by the filesystem '
48                  'encoding (%s). Unicode filename tests may not be effective'
49                  % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
50            TESTFN_UNENCODABLE = None
51# Apple and Emscripten deny unencodable filenames (invalid utf-8)
52elif not support.is_apple and sys.platform not in {"emscripten", "wasi"}:
53    try:
54        # ascii and utf-8 cannot encode the byte 0xff
55        b'\xff'.decode(sys.getfilesystemencoding())
56    except UnicodeDecodeError:
57        # 0xff will be encoded using the surrogate character u+DCFF
58        TESTFN_UNENCODABLE = TESTFN_ASCII \
59            + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
60    else:
61        # File system encoding (eg. ISO-8859-* encodings) can encode
62        # the byte 0xff. Skip some unicode filename tests.
63        pass
64
65# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
66# or an empty string if there is no such character.
67FS_NONASCII = ''
68for character in (
69    # First try printable and common characters to have a readable filename.
70    # For each character, the encoding list are just example of encodings able
71    # to encode the character (the list is not exhaustive).
72
73    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
74    '\u00E6',
75    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
76    '\u0130',
77    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
78    '\u0141',
79    # U+03C6 (Greek Small Letter Phi): cp1253
80    '\u03C6',
81    # U+041A (Cyrillic Capital Letter Ka): cp1251
82    '\u041A',
83    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
84    '\u05D0',
85    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
86    '\u060C',
87    # U+062A (Arabic Letter Teh): cp720
88    '\u062A',
89    # U+0E01 (Thai Character Ko Kai): cp874
90    '\u0E01',
91
92    # Then try more "special" characters. "special" because they may be
93    # interpreted or displayed differently depending on the exact locale
94    # encoding and the font.
95
96    # U+00A0 (No-Break Space)
97    '\u00A0',
98    # U+20AC (Euro Sign)
99    '\u20AC',
100):
101    try:
102        # If Python is set up to use the legacy 'mbcs' in Windows,
103        # 'replace' error mode is used, and encode() returns b'?'
104        # for characters missing in the ANSI codepage
105        if os.fsdecode(os.fsencode(character)) != character:
106            raise UnicodeError
107    except UnicodeError:
108        pass
109    else:
110        FS_NONASCII = character
111        break
112
113# Save the initial cwd
114SAVEDCWD = os.getcwd()
115
116# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
117# decoded from the filesystem encoding (in strict mode). It can be None if we
118# cannot generate such filename (ex: the latin1 encoding can decode any byte
119# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
120# to the surrogateescape error handler (PEP 383), but not from the filesystem
121# encoding in strict mode.
122TESTFN_UNDECODABLE = None
123for name in (
124    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
125    # accepts it to create a file or a directory, or don't accept to enter to
126    # such directory (when the bytes name is used). So test b'\xe7' first:
127    # it is not decodable from cp932.
128    b'\xe7w\xf0',
129    # undecodable from ASCII, UTF-8
130    b'\xff',
131    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
132    # and cp857
133    b'\xae\xd5'
134    # undecodable from UTF-8 (UNIX and Mac OS X)
135    b'\xed\xb2\x80', b'\xed\xb4\x80',
136    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
137    # cp1253, cp1254, cp1255, cp1257, cp1258
138    b'\x81\x98',
139):
140    try:
141        name.decode(sys.getfilesystemencoding())
142    except UnicodeDecodeError:
143        try:
144            name.decode(sys.getfilesystemencoding(),
145                        sys.getfilesystemencodeerrors())
146        except UnicodeDecodeError:
147            continue
148        TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
149        break
150
151if FS_NONASCII:
152    TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
153else:
154    TESTFN_NONASCII = None
155TESTFN = TESTFN_NONASCII or TESTFN_ASCII
156
157
158def make_bad_fd():
159    """
160    Create an invalid file descriptor by opening and closing a file and return
161    its fd.
162    """
163    file = open(TESTFN, "wb")
164    try:
165        return file.fileno()
166    finally:
167        file.close()
168        unlink(TESTFN)
169
170
171_can_symlink = None
172
173
174def can_symlink():
175    global _can_symlink
176    if _can_symlink is not None:
177        return _can_symlink
178    # WASI / wasmtime prevents symlinks with absolute paths, see man
179    # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
180    # paths. Skip symlink tests on WASI for now.
181    src = os.path.abspath(TESTFN)
182    symlink_path = src + "can_symlink"
183    try:
184        os.symlink(src, symlink_path)
185        can = True
186    except (OSError, NotImplementedError, AttributeError):
187        can = False
188    else:
189        os.remove(symlink_path)
190    _can_symlink = can
191    return can
192
193
194def skip_unless_symlink(test):
195    """Skip decorator for tests that require functional symlink"""
196    ok = can_symlink()
197    msg = "Requires functional symlink implementation"
198    return test if ok else unittest.skip(msg)(test)
199
200
201_can_hardlink = None
202
203def can_hardlink():
204    global _can_hardlink
205    if _can_hardlink is None:
206        # Android blocks hard links using SELinux
207        # (https://stackoverflow.com/q/32365690).
208        _can_hardlink = hasattr(os, "link") and not support.is_android
209    return _can_hardlink
210
211
212def skip_unless_hardlink(test):
213    ok = can_hardlink()
214    msg = "requires hardlink support"
215    return test if ok else unittest.skip(msg)(test)
216
217
218_can_xattr = None
219
220
221def can_xattr():
222    import tempfile
223    global _can_xattr
224    if _can_xattr is not None:
225        return _can_xattr
226    if not hasattr(os, "setxattr"):
227        can = False
228    else:
229        import platform
230        tmp_dir = tempfile.mkdtemp()
231        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
232        try:
233            with open(TESTFN, "wb") as fp:
234                try:
235                    # TESTFN & tempfile may use different file systems with
236                    # different capabilities
237                    os.setxattr(tmp_fp, b"user.test", b"")
238                    os.setxattr(tmp_name, b"trusted.foo", b"42")
239                    os.setxattr(fp.fileno(), b"user.test", b"")
240                    # Kernels < 2.6.39 don't respect setxattr flags.
241                    kernel_version = platform.release()
242                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
243                    can = m is None or int(m.group(1)) >= 39
244                except OSError:
245                    can = False
246        finally:
247            unlink(TESTFN)
248            unlink(tmp_name)
249            rmdir(tmp_dir)
250    _can_xattr = can
251    return can
252
253
254def skip_unless_xattr(test):
255    """Skip decorator for tests that require functional extended attributes"""
256    ok = can_xattr()
257    msg = "no non-broken extended attribute support"
258    return test if ok else unittest.skip(msg)(test)
259
260
261_can_chmod = None
262
263def can_chmod():
264    global _can_chmod
265    if _can_chmod is not None:
266        return _can_chmod
267    if not hasattr(os, "chmod"):
268        _can_chmod = False
269        return _can_chmod
270    try:
271        with open(TESTFN, "wb") as f:
272            try:
273                os.chmod(TESTFN, 0o555)
274                mode1 = os.stat(TESTFN).st_mode
275                os.chmod(TESTFN, 0o777)
276                mode2 = os.stat(TESTFN).st_mode
277            except OSError as e:
278                can = False
279            else:
280                can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
281    finally:
282        unlink(TESTFN)
283    _can_chmod = can
284    return can
285
286
287def skip_unless_working_chmod(test):
288    """Skip tests that require working os.chmod()
289
290    WASI SDK 15.0 cannot change file mode bits.
291    """
292    ok = can_chmod()
293    msg = "requires working os.chmod()"
294    return test if ok else unittest.skip(msg)(test)
295
296
297# Check whether the current effective user has the capability to override
298# DAC (discretionary access control). Typically user root is able to
299# bypass file read, write, and execute permission checks. The capability
300# is independent of the effective user. See capabilities(7).
301_can_dac_override = None
302
303def can_dac_override():
304    global _can_dac_override
305
306    if not can_chmod():
307        _can_dac_override = False
308    if _can_dac_override is not None:
309        return _can_dac_override
310
311    try:
312        with open(TESTFN, "wb") as f:
313            os.chmod(TESTFN, 0o400)
314            try:
315                with open(TESTFN, "wb"):
316                    pass
317            except OSError:
318                _can_dac_override = False
319            else:
320                _can_dac_override = True
321    finally:
322        try:
323            os.chmod(TESTFN, 0o700)
324        except OSError:
325            pass
326        unlink(TESTFN)
327
328    return _can_dac_override
329
330
331def skip_if_dac_override(test):
332    ok = not can_dac_override()
333    msg = "incompatible with CAP_DAC_OVERRIDE"
334    return test if ok else unittest.skip(msg)(test)
335
336
337def skip_unless_dac_override(test):
338    ok = can_dac_override()
339    msg = "requires CAP_DAC_OVERRIDE"
340    return test if ok else unittest.skip(msg)(test)
341
342
343def unlink(filename):
344    try:
345        _unlink(filename)
346    except (FileNotFoundError, NotADirectoryError):
347        pass
348
349
350if sys.platform.startswith("win"):
351    def _waitfor(func, pathname, waitall=False):
352        # Perform the operation
353        func(pathname)
354        # Now setup the wait loop
355        if waitall:
356            dirname = pathname
357        else:
358            dirname, name = os.path.split(pathname)
359            dirname = dirname or '.'
360        # Check for `pathname` to be removed from the filesystem.
361        # The exponential backoff of the timeout amounts to a total
362        # of ~1 second after which the deletion is probably an error
363        # anyway.
364        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
365        # required when contention occurs.
366        timeout = 0.001
367        while timeout < 1.0:
368            # Note we are only testing for the existence of the file(s) in
369            # the contents of the directory regardless of any security or
370            # access rights.  If we have made it this far, we have sufficient
371            # permissions to do that much using Python's equivalent of the
372            # Windows API FindFirstFile.
373            # Other Windows APIs can fail or give incorrect results when
374            # dealing with files that are pending deletion.
375            L = os.listdir(dirname)
376            if not (L if waitall else name in L):
377                return
378            # Increase the timeout and try again
379            time.sleep(timeout)
380            timeout *= 2
381        warnings.warn('tests may fail, delete still pending for ' + pathname,
382                      RuntimeWarning, stacklevel=4)
383
384    def _unlink(filename):
385        _waitfor(os.unlink, filename)
386
387    def _rmdir(dirname):
388        _waitfor(os.rmdir, dirname)
389
390    def _rmtree(path):
391        from test.support import _force_run
392
393        def _rmtree_inner(path):
394            for name in _force_run(path, os.listdir, path):
395                fullname = os.path.join(path, name)
396                try:
397                    mode = os.lstat(fullname).st_mode
398                except OSError as exc:
399                    print("support.rmtree(): os.lstat(%r) failed with %s"
400                          % (fullname, exc),
401                          file=sys.__stderr__)
402                    mode = 0
403                if stat.S_ISDIR(mode):
404                    _waitfor(_rmtree_inner, fullname, waitall=True)
405                    _force_run(fullname, os.rmdir, fullname)
406                else:
407                    _force_run(fullname, os.unlink, fullname)
408        _waitfor(_rmtree_inner, path, waitall=True)
409        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
410
411    def _longpath(path):
412        try:
413            import ctypes
414        except ImportError:
415            # No ctypes means we can't expands paths.
416            pass
417        else:
418            buffer = ctypes.create_unicode_buffer(len(path) * 2)
419            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
420                                                             len(buffer))
421            if length:
422                return buffer[:length]
423        return path
424else:
425    _unlink = os.unlink
426    _rmdir = os.rmdir
427
428    def _rmtree(path):
429        import shutil
430        try:
431            shutil.rmtree(path)
432            return
433        except OSError:
434            pass
435
436        def _rmtree_inner(path):
437            from test.support import _force_run
438            for name in _force_run(path, os.listdir, path):
439                fullname = os.path.join(path, name)
440                try:
441                    mode = os.lstat(fullname).st_mode
442                except OSError:
443                    mode = 0
444                if stat.S_ISDIR(mode):
445                    _rmtree_inner(fullname)
446                    _force_run(path, os.rmdir, fullname)
447                else:
448                    _force_run(path, os.unlink, fullname)
449        _rmtree_inner(path)
450        os.rmdir(path)
451
452    def _longpath(path):
453        return path
454
455
456def rmdir(dirname):
457    try:
458        _rmdir(dirname)
459    except FileNotFoundError:
460        pass
461
462
463def rmtree(path):
464    try:
465        _rmtree(path)
466    except FileNotFoundError:
467        pass
468
469
470@contextlib.contextmanager
471def temp_dir(path=None, quiet=False):
472    """Return a context manager that creates a temporary directory.
473
474    Arguments:
475
476      path: the directory to create temporarily.  If omitted or None,
477        defaults to creating a temporary directory using tempfile.mkdtemp.
478
479      quiet: if False (the default), the context manager raises an exception
480        on error.  Otherwise, if the path is specified and cannot be
481        created, only a warning is issued.
482
483    """
484    import tempfile
485    dir_created = False
486    if path is None:
487        path = tempfile.mkdtemp()
488        dir_created = True
489        path = os.path.realpath(path)
490    else:
491        try:
492            os.mkdir(path)
493            dir_created = True
494        except OSError as exc:
495            if not quiet:
496                raise
497            warnings.warn(f'tests may fail, unable to create '
498                          f'temporary directory {path!r}: {exc}',
499                          RuntimeWarning, stacklevel=3)
500    if dir_created:
501        pid = os.getpid()
502    try:
503        yield path
504    finally:
505        # In case the process forks, let only the parent remove the
506        # directory. The child has a different process id. (bpo-30028)
507        if dir_created and pid == os.getpid():
508            rmtree(path)
509
510
511@contextlib.contextmanager
512def change_cwd(path, quiet=False):
513    """Return a context manager that changes the current working directory.
514
515    Arguments:
516
517      path: the directory to use as the temporary current working directory.
518
519      quiet: if False (the default), the context manager raises an exception
520        on error.  Otherwise, it issues only a warning and keeps the current
521        working directory the same.
522
523    """
524    saved_dir = os.getcwd()
525    try:
526        os.chdir(os.path.realpath(path))
527    except OSError as exc:
528        if not quiet:
529            raise
530        warnings.warn(f'tests may fail, unable to change the current working '
531                      f'directory to {path!r}: {exc}',
532                      RuntimeWarning, stacklevel=3)
533    try:
534        yield os.getcwd()
535    finally:
536        os.chdir(saved_dir)
537
538
539@contextlib.contextmanager
540def temp_cwd(name='tempcwd', quiet=False):
541    """
542    Context manager that temporarily creates and changes the CWD.
543
544    The function temporarily changes the current working directory
545    after creating a temporary directory in the current directory with
546    name *name*.  If *name* is None, the temporary directory is
547    created using tempfile.mkdtemp.
548
549    If *quiet* is False (default) and it is not possible to
550    create or change the CWD, an error is raised.  If *quiet* is True,
551    only a warning is raised and the original CWD is used.
552
553    """
554    with temp_dir(path=name, quiet=quiet) as temp_path:
555        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
556            yield cwd_dir
557
558
559def create_empty_file(filename):
560    """Create an empty file. If the file already exists, truncate it."""
561    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
562    os.close(fd)
563
564
565@contextlib.contextmanager
566def open_dir_fd(path):
567    """Open a file descriptor to a directory."""
568    assert os.path.isdir(path)
569    flags = os.O_RDONLY
570    if hasattr(os, "O_DIRECTORY"):
571        flags |= os.O_DIRECTORY
572    dir_fd = os.open(path, flags)
573    try:
574        yield dir_fd
575    finally:
576        os.close(dir_fd)
577
578
579def fs_is_case_insensitive(directory):
580    """Detects if the file system for the specified directory
581    is case-insensitive."""
582    import tempfile
583    with tempfile.NamedTemporaryFile(dir=directory) as base:
584        base_path = base.name
585        case_path = base_path.upper()
586        if case_path == base_path:
587            case_path = base_path.lower()
588        try:
589            return os.path.samefile(base_path, case_path)
590        except FileNotFoundError:
591            return False
592
593
594class FakePath:
595    """Simple implementation of the path protocol.
596    """
597    def __init__(self, path):
598        self.path = path
599
600    def __repr__(self):
601        return f'<FakePath {self.path!r}>'
602
603    def __fspath__(self):
604        if (isinstance(self.path, BaseException) or
605            isinstance(self.path, type) and
606                issubclass(self.path, BaseException)):
607            raise self.path
608        else:
609            return self.path
610
611
612def fd_count():
613    """Count the number of open file descriptors.
614    """
615    if sys.platform.startswith(('linux', 'android', 'freebsd', 'emscripten')):
616        fd_path = "/proc/self/fd"
617    elif sys.platform == "darwin":
618        fd_path = "/dev/fd"
619    else:
620        fd_path = None
621
622    if fd_path is not None:
623        try:
624            names = os.listdir(fd_path)
625            # Subtract one because listdir() internally opens a file
626            # descriptor to list the content of the directory.
627            return len(names) - 1
628        except FileNotFoundError:
629            pass
630
631    MAXFD = 256
632    if hasattr(os, 'sysconf'):
633        try:
634            MAXFD = os.sysconf("SC_OPEN_MAX")
635        except (OSError, ValueError):
636            # gh-118201: ValueError is raised intermittently on iOS
637            pass
638
639    old_modes = None
640    if sys.platform == 'win32':
641        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
642        # on invalid file descriptor if Python is compiled in debug mode
643        try:
644            import msvcrt
645            msvcrt.CrtSetReportMode
646        except (AttributeError, ImportError):
647            # no msvcrt or a release build
648            pass
649        else:
650            old_modes = {}
651            for report_type in (msvcrt.CRT_WARN,
652                                msvcrt.CRT_ERROR,
653                                msvcrt.CRT_ASSERT):
654                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
655                                                                 0)
656
657    try:
658        count = 0
659        for fd in range(MAXFD):
660            try:
661                # Prefer dup() over fstat(). fstat() can require input/output
662                # whereas dup() doesn't.
663                fd2 = os.dup(fd)
664            except OSError as e:
665                if e.errno != errno.EBADF:
666                    raise
667            else:
668                os.close(fd2)
669                count += 1
670    finally:
671        if old_modes is not None:
672            for report_type in (msvcrt.CRT_WARN,
673                                msvcrt.CRT_ERROR,
674                                msvcrt.CRT_ASSERT):
675                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
676
677    return count
678
679
680if hasattr(os, "umask"):
681    @contextlib.contextmanager
682    def temp_umask(umask):
683        """Context manager that temporarily sets the process umask."""
684        oldmask = os.umask(umask)
685        try:
686            yield
687        finally:
688            os.umask(oldmask)
689else:
690    @contextlib.contextmanager
691    def temp_umask(umask):
692        """no-op on platforms without umask()"""
693        yield
694
695
696class EnvironmentVarGuard(collections.abc.MutableMapping):
697
698    """Class to help protect the environment variable properly.  Can be used as
699    a context manager."""
700
701    def __init__(self):
702        self._environ = os.environ
703        self._changed = {}
704
705    def __getitem__(self, envvar):
706        return self._environ[envvar]
707
708    def __setitem__(self, envvar, value):
709        # Remember the initial value on the first access
710        if envvar not in self._changed:
711            self._changed[envvar] = self._environ.get(envvar)
712        self._environ[envvar] = value
713
714    def __delitem__(self, envvar):
715        # Remember the initial value on the first access
716        if envvar not in self._changed:
717            self._changed[envvar] = self._environ.get(envvar)
718        if envvar in self._environ:
719            del self._environ[envvar]
720
721    def keys(self):
722        return self._environ.keys()
723
724    def __iter__(self):
725        return iter(self._environ)
726
727    def __len__(self):
728        return len(self._environ)
729
730    def set(self, envvar, value):
731        self[envvar] = value
732
733    def unset(self, envvar):
734        del self[envvar]
735
736    def copy(self):
737        # We do what os.environ.copy() does.
738        return dict(self)
739
740    def __enter__(self):
741        return self
742
743    def __exit__(self, *ignore_exc):
744        for (k, v) in self._changed.items():
745            if v is None:
746                if k in self._environ:
747                    del self._environ[k]
748            else:
749                self._environ[k] = v
750        os.environ = self._environ
751
752
753try:
754    if support.MS_WINDOWS:
755        import ctypes
756        kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
757
758        ERROR_FILE_NOT_FOUND = 2
759        DDD_REMOVE_DEFINITION = 2
760        DDD_EXACT_MATCH_ON_REMOVE = 4
761        DDD_NO_BROADCAST_SYSTEM = 8
762    else:
763        raise AttributeError
764except (ImportError, AttributeError):
765    def subst_drive(path):
766        raise unittest.SkipTest('ctypes or kernel32 is not available')
767else:
768    @contextlib.contextmanager
769    def subst_drive(path):
770        """Temporarily yield a substitute drive for a given path."""
771        for c in reversed(string.ascii_uppercase):
772            drive = f'{c}:'
773            if (not kernel32.QueryDosDeviceW(drive, None, 0) and
774                    ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
775                break
776        else:
777            raise unittest.SkipTest('no available logical drive')
778        if not kernel32.DefineDosDeviceW(
779                DDD_NO_BROADCAST_SYSTEM, drive, path):
780            raise ctypes.WinError(ctypes.get_last_error())
781        try:
782            yield drive
783        finally:
784            if not kernel32.DefineDosDeviceW(
785                    DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
786                    drive, path):
787                raise ctypes.WinError(ctypes.get_last_error())
788