• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import unittest.mock
5import shutil
6import tempfile
7import sys
8import stat
9import os
10import os.path
11import errno
12import functools
13import pathlib
14import subprocess
15import random
16import string
17import contextlib
18import io
19from shutil import (make_archive,
20                    register_archive_format, unregister_archive_format,
21                    get_archive_formats, Error, unpack_archive,
22                    register_unpack_format, RegistryError,
23                    unregister_unpack_format, get_unpack_formats,
24                    SameFileError, _GiveupOnFastCopy)
25import tarfile
26import zipfile
27try:
28    import posix
29except ImportError:
30    posix = None
31
32from test import support
33from test.support import TESTFN, FakePath
34
35TESTFN2 = TESTFN + "2"
36MACOS = sys.platform.startswith("darwin")
37AIX = sys.platform[:3] == 'aix'
38try:
39    import grp
40    import pwd
41    UID_GID_SUPPORT = True
42except ImportError:
43    UID_GID_SUPPORT = False
44
45try:
46    import _winapi
47except ImportError:
48    _winapi = None
49
50def _fake_rename(*args, **kwargs):
51    # Pretend the destination path is on a different filesystem.
52    raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
53
54def mock_rename(func):
55    @functools.wraps(func)
56    def wrap(*args, **kwargs):
57        try:
58            builtin_rename = os.rename
59            os.rename = _fake_rename
60            return func(*args, **kwargs)
61        finally:
62            os.rename = builtin_rename
63    return wrap
64
65def write_file(path, content, binary=False):
66    """Write *content* to a file located at *path*.
67
68    If *path* is a tuple instead of a string, os.path.join will be used to
69    make a path.  If *binary* is true, the file will be opened in binary
70    mode.
71    """
72    if isinstance(path, tuple):
73        path = os.path.join(*path)
74    with open(path, 'wb' if binary else 'w') as fp:
75        fp.write(content)
76
77def write_test_file(path, size):
78    """Create a test file with an arbitrary size and random text content."""
79    def chunks(total, step):
80        assert total >= step
81        while total > step:
82            yield step
83            total -= step
84        if total:
85            yield total
86
87    bufsize = min(size, 8192)
88    chunk = b"".join([random.choice(string.ascii_letters).encode()
89                      for i in range(bufsize)])
90    with open(path, 'wb') as f:
91        for csize in chunks(size, bufsize):
92            f.write(chunk)
93    assert os.path.getsize(path) == size
94
95def read_file(path, binary=False):
96    """Return contents from a file located at *path*.
97
98    If *path* is a tuple instead of a string, os.path.join will be used to
99    make a path.  If *binary* is true, the file will be opened in binary
100    mode.
101    """
102    if isinstance(path, tuple):
103        path = os.path.join(*path)
104    with open(path, 'rb' if binary else 'r') as fp:
105        return fp.read()
106
107def rlistdir(path):
108    res = []
109    for name in sorted(os.listdir(path)):
110        p = os.path.join(path, name)
111        if os.path.isdir(p) and not os.path.islink(p):
112            res.append(name + '/')
113            for n in rlistdir(p):
114                res.append(name + '/' + n)
115        else:
116            res.append(name)
117    return res
118
119def supports_file2file_sendfile():
120    # ...apparently Linux and Solaris are the only ones
121    if not hasattr(os, "sendfile"):
122        return False
123    srcname = None
124    dstname = None
125    try:
126        with tempfile.NamedTemporaryFile("wb", delete=False) as f:
127            srcname = f.name
128            f.write(b"0123456789")
129
130        with open(srcname, "rb") as src:
131            with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
132                dstname = dst.name
133                infd = src.fileno()
134                outfd = dst.fileno()
135                try:
136                    os.sendfile(outfd, infd, 0, 2)
137                except OSError:
138                    return False
139                else:
140                    return True
141    finally:
142        if srcname is not None:
143            support.unlink(srcname)
144        if dstname is not None:
145            support.unlink(dstname)
146
147
148SUPPORTS_SENDFILE = supports_file2file_sendfile()
149
150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
151# The AIX command 'dump -o program' gives XCOFF header information
152# The second word of the last line in the maxdata value
153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
154def _maxdataOK():
155    if AIX and sys.maxsize == 2147483647:
156        hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
157        maxdata=hdrs.split("\n")[-1].split()[1]
158        return int(maxdata,16) >= 0x20000000
159    else:
160        return True
161
162class TestShutil(unittest.TestCase):
163
164    def setUp(self):
165        super(TestShutil, self).setUp()
166        self.tempdirs = []
167
168    def tearDown(self):
169        super(TestShutil, self).tearDown()
170        while self.tempdirs:
171            d = self.tempdirs.pop()
172            shutil.rmtree(d, os.name in ('nt', 'cygwin'))
173
174
175    def mkdtemp(self):
176        """Create a temporary directory that will be cleaned up.
177
178        Returns the path of the directory.
179        """
180        basedir = None
181        if sys.platform == "win32":
182            basedir = os.path.realpath(os.getcwd())
183        d = tempfile.mkdtemp(dir=basedir)
184        self.tempdirs.append(d)
185        return d
186
187    def test_rmtree_works_on_bytes(self):
188        tmp = self.mkdtemp()
189        victim = os.path.join(tmp, 'killme')
190        os.mkdir(victim)
191        write_file(os.path.join(victim, 'somefile'), 'foo')
192        victim = os.fsencode(victim)
193        self.assertIsInstance(victim, bytes)
194        shutil.rmtree(victim)
195
196    @support.skip_unless_symlink
197    def test_rmtree_fails_on_symlink(self):
198        tmp = self.mkdtemp()
199        dir_ = os.path.join(tmp, 'dir')
200        os.mkdir(dir_)
201        link = os.path.join(tmp, 'link')
202        os.symlink(dir_, link)
203        self.assertRaises(OSError, shutil.rmtree, link)
204        self.assertTrue(os.path.exists(dir_))
205        self.assertTrue(os.path.lexists(link))
206        errors = []
207        def onerror(*args):
208            errors.append(args)
209        shutil.rmtree(link, onerror=onerror)
210        self.assertEqual(len(errors), 1)
211        self.assertIs(errors[0][0], os.path.islink)
212        self.assertEqual(errors[0][1], link)
213        self.assertIsInstance(errors[0][2][1], OSError)
214
215    @support.skip_unless_symlink
216    def test_rmtree_works_on_symlinks(self):
217        tmp = self.mkdtemp()
218        dir1 = os.path.join(tmp, 'dir1')
219        dir2 = os.path.join(dir1, 'dir2')
220        dir3 = os.path.join(tmp, 'dir3')
221        for d in dir1, dir2, dir3:
222            os.mkdir(d)
223        file1 = os.path.join(tmp, 'file1')
224        write_file(file1, 'foo')
225        link1 = os.path.join(dir1, 'link1')
226        os.symlink(dir2, link1)
227        link2 = os.path.join(dir1, 'link2')
228        os.symlink(dir3, link2)
229        link3 = os.path.join(dir1, 'link3')
230        os.symlink(file1, link3)
231        # make sure symlinks are removed but not followed
232        shutil.rmtree(dir1)
233        self.assertFalse(os.path.exists(dir1))
234        self.assertTrue(os.path.exists(dir3))
235        self.assertTrue(os.path.exists(file1))
236
237    @unittest.skipUnless(_winapi, 'only relevant on Windows')
238    def test_rmtree_fails_on_junctions(self):
239        tmp = self.mkdtemp()
240        dir_ = os.path.join(tmp, 'dir')
241        os.mkdir(dir_)
242        link = os.path.join(tmp, 'link')
243        _winapi.CreateJunction(dir_, link)
244        self.assertRaises(OSError, shutil.rmtree, link)
245        self.assertTrue(os.path.exists(dir_))
246        self.assertTrue(os.path.lexists(link))
247        errors = []
248        def onerror(*args):
249            errors.append(args)
250        shutil.rmtree(link, onerror=onerror)
251        self.assertEqual(len(errors), 1)
252        self.assertIs(errors[0][0], os.path.islink)
253        self.assertEqual(errors[0][1], link)
254        self.assertIsInstance(errors[0][2][1], OSError)
255
256    @unittest.skipUnless(_winapi, 'only relevant on Windows')
257    def test_rmtree_works_on_junctions(self):
258        tmp = self.mkdtemp()
259        dir1 = os.path.join(tmp, 'dir1')
260        dir2 = os.path.join(dir1, 'dir2')
261        dir3 = os.path.join(tmp, 'dir3')
262        for d in dir1, dir2, dir3:
263            os.mkdir(d)
264        file1 = os.path.join(tmp, 'file1')
265        write_file(file1, 'foo')
266        link1 = os.path.join(dir1, 'link1')
267        _winapi.CreateJunction(dir2, link1)
268        link2 = os.path.join(dir1, 'link2')
269        _winapi.CreateJunction(dir3, link2)
270        link3 = os.path.join(dir1, 'link3')
271        _winapi.CreateJunction(file1, link3)
272        # make sure junctions are removed but not followed
273        shutil.rmtree(dir1)
274        self.assertFalse(os.path.exists(dir1))
275        self.assertTrue(os.path.exists(dir3))
276        self.assertTrue(os.path.exists(file1))
277
278    def test_rmtree_errors(self):
279        # filename is guaranteed not to exist
280        filename = tempfile.mktemp()
281        self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
282        # test that ignore_errors option is honored
283        shutil.rmtree(filename, ignore_errors=True)
284
285        # existing file
286        tmpdir = self.mkdtemp()
287        write_file((tmpdir, "tstfile"), "")
288        filename = os.path.join(tmpdir, "tstfile")
289        with self.assertRaises(NotADirectoryError) as cm:
290            shutil.rmtree(filename)
291        # The reason for this rather odd construct is that Windows sprinkles
292        # a \*.* at the end of file names. But only sometimes on some buildbots
293        possible_args = [filename, os.path.join(filename, '*.*')]
294        self.assertIn(cm.exception.filename, possible_args)
295        self.assertTrue(os.path.exists(filename))
296        # test that ignore_errors option is honored
297        shutil.rmtree(filename, ignore_errors=True)
298        self.assertTrue(os.path.exists(filename))
299        errors = []
300        def onerror(*args):
301            errors.append(args)
302        shutil.rmtree(filename, onerror=onerror)
303        self.assertEqual(len(errors), 2)
304        self.assertIs(errors[0][0], os.scandir)
305        self.assertEqual(errors[0][1], filename)
306        self.assertIsInstance(errors[0][2][1], NotADirectoryError)
307        self.assertIn(errors[0][2][1].filename, possible_args)
308        self.assertIs(errors[1][0], os.rmdir)
309        self.assertEqual(errors[1][1], filename)
310        self.assertIsInstance(errors[1][2][1], NotADirectoryError)
311        self.assertIn(errors[1][2][1].filename, possible_args)
312
313
314    @unittest.skipIf(sys.platform[:6] == 'cygwin',
315                     "This test can't be run on Cygwin (issue #1071513).")
316    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
317                     "This test can't be run reliably as root (issue #1076467).")
318    def test_on_error(self):
319        self.errorState = 0
320        os.mkdir(TESTFN)
321        self.addCleanup(shutil.rmtree, TESTFN)
322
323        self.child_file_path = os.path.join(TESTFN, 'a')
324        self.child_dir_path = os.path.join(TESTFN, 'b')
325        support.create_empty_file(self.child_file_path)
326        os.mkdir(self.child_dir_path)
327        old_dir_mode = os.stat(TESTFN).st_mode
328        old_child_file_mode = os.stat(self.child_file_path).st_mode
329        old_child_dir_mode = os.stat(self.child_dir_path).st_mode
330        # Make unwritable.
331        new_mode = stat.S_IREAD|stat.S_IEXEC
332        os.chmod(self.child_file_path, new_mode)
333        os.chmod(self.child_dir_path, new_mode)
334        os.chmod(TESTFN, new_mode)
335
336        self.addCleanup(os.chmod, TESTFN, old_dir_mode)
337        self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
338        self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
339
340        shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
341        # Test whether onerror has actually been called.
342        self.assertEqual(self.errorState, 3,
343                         "Expected call to onerror function did not happen.")
344
345    def check_args_to_onerror(self, func, arg, exc):
346        # test_rmtree_errors deliberately runs rmtree
347        # on a directory that is chmod 500, which will fail.
348        # This function is run when shutil.rmtree fails.
349        # 99.9% of the time it initially fails to remove
350        # a file in the directory, so the first time through
351        # func is os.remove.
352        # However, some Linux machines running ZFS on
353        # FUSE experienced a failure earlier in the process
354        # at os.listdir.  The first failure may legally
355        # be either.
356        if self.errorState < 2:
357            if func is os.unlink:
358                self.assertEqual(arg, self.child_file_path)
359            elif func is os.rmdir:
360                self.assertEqual(arg, self.child_dir_path)
361            else:
362                self.assertIs(func, os.listdir)
363                self.assertIn(arg, [TESTFN, self.child_dir_path])
364            self.assertTrue(issubclass(exc[0], OSError))
365            self.errorState += 1
366        else:
367            self.assertEqual(func, os.rmdir)
368            self.assertEqual(arg, TESTFN)
369            self.assertTrue(issubclass(exc[0], OSError))
370            self.errorState = 3
371
372    def test_rmtree_does_not_choke_on_failing_lstat(self):
373        try:
374            orig_lstat = os.lstat
375            def raiser(fn, *args, **kwargs):
376                if fn != TESTFN:
377                    raise OSError()
378                else:
379                    return orig_lstat(fn)
380            os.lstat = raiser
381
382            os.mkdir(TESTFN)
383            write_file((TESTFN, 'foo'), 'foo')
384            shutil.rmtree(TESTFN)
385        finally:
386            os.lstat = orig_lstat
387
388    @support.skip_unless_symlink
389    def test_copymode_follow_symlinks(self):
390        tmp_dir = self.mkdtemp()
391        src = os.path.join(tmp_dir, 'foo')
392        dst = os.path.join(tmp_dir, 'bar')
393        src_link = os.path.join(tmp_dir, 'baz')
394        dst_link = os.path.join(tmp_dir, 'quux')
395        write_file(src, 'foo')
396        write_file(dst, 'foo')
397        os.symlink(src, src_link)
398        os.symlink(dst, dst_link)
399        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
400        # file to file
401        os.chmod(dst, stat.S_IRWXO)
402        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
403        shutil.copymode(src, dst)
404        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
405        # On Windows, os.chmod does not follow symlinks (issue #15411)
406        if os.name != 'nt':
407            # follow src link
408            os.chmod(dst, stat.S_IRWXO)
409            shutil.copymode(src_link, dst)
410            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
411            # follow dst link
412            os.chmod(dst, stat.S_IRWXO)
413            shutil.copymode(src, dst_link)
414            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
415            # follow both links
416            os.chmod(dst, stat.S_IRWXO)
417            shutil.copymode(src_link, dst_link)
418            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
419
420    @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
421    @support.skip_unless_symlink
422    def test_copymode_symlink_to_symlink(self):
423        tmp_dir = self.mkdtemp()
424        src = os.path.join(tmp_dir, 'foo')
425        dst = os.path.join(tmp_dir, 'bar')
426        src_link = os.path.join(tmp_dir, 'baz')
427        dst_link = os.path.join(tmp_dir, 'quux')
428        write_file(src, 'foo')
429        write_file(dst, 'foo')
430        os.symlink(src, src_link)
431        os.symlink(dst, dst_link)
432        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
433        os.chmod(dst, stat.S_IRWXU)
434        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
435        # link to link
436        os.lchmod(dst_link, stat.S_IRWXO)
437        shutil.copymode(src_link, dst_link, follow_symlinks=False)
438        self.assertEqual(os.lstat(src_link).st_mode,
439                         os.lstat(dst_link).st_mode)
440        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
441        # src link - use chmod
442        os.lchmod(dst_link, stat.S_IRWXO)
443        shutil.copymode(src_link, dst, follow_symlinks=False)
444        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
445        # dst link - use chmod
446        os.lchmod(dst_link, stat.S_IRWXO)
447        shutil.copymode(src, dst_link, follow_symlinks=False)
448        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
449
450    @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
451    @support.skip_unless_symlink
452    def test_copymode_symlink_to_symlink_wo_lchmod(self):
453        tmp_dir = self.mkdtemp()
454        src = os.path.join(tmp_dir, 'foo')
455        dst = os.path.join(tmp_dir, 'bar')
456        src_link = os.path.join(tmp_dir, 'baz')
457        dst_link = os.path.join(tmp_dir, 'quux')
458        write_file(src, 'foo')
459        write_file(dst, 'foo')
460        os.symlink(src, src_link)
461        os.symlink(dst, dst_link)
462        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
463
464    @support.skip_unless_symlink
465    def test_copystat_symlinks(self):
466        tmp_dir = self.mkdtemp()
467        src = os.path.join(tmp_dir, 'foo')
468        dst = os.path.join(tmp_dir, 'bar')
469        src_link = os.path.join(tmp_dir, 'baz')
470        dst_link = os.path.join(tmp_dir, 'qux')
471        write_file(src, 'foo')
472        src_stat = os.stat(src)
473        os.utime(src, (src_stat.st_atime,
474                       src_stat.st_mtime - 42.0))  # ensure different mtimes
475        write_file(dst, 'bar')
476        self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
477        os.symlink(src, src_link)
478        os.symlink(dst, dst_link)
479        if hasattr(os, 'lchmod'):
480            os.lchmod(src_link, stat.S_IRWXO)
481        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
482            os.lchflags(src_link, stat.UF_NODUMP)
483        src_link_stat = os.lstat(src_link)
484        # follow
485        if hasattr(os, 'lchmod'):
486            shutil.copystat(src_link, dst_link, follow_symlinks=True)
487            self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
488        # don't follow
489        shutil.copystat(src_link, dst_link, follow_symlinks=False)
490        dst_link_stat = os.lstat(dst_link)
491        if os.utime in os.supports_follow_symlinks:
492            for attr in 'st_atime', 'st_mtime':
493                # The modification times may be truncated in the new file.
494                self.assertLessEqual(getattr(src_link_stat, attr),
495                                     getattr(dst_link_stat, attr) + 1)
496        if hasattr(os, 'lchmod'):
497            self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
498        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
499            self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
500        # tell to follow but dst is not a link
501        shutil.copystat(src_link, dst, follow_symlinks=False)
502        self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
503                        00000.1)
504
505    @unittest.skipUnless(hasattr(os, 'chflags') and
506                         hasattr(errno, 'EOPNOTSUPP') and
507                         hasattr(errno, 'ENOTSUP'),
508                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
509    def test_copystat_handles_harmless_chflags_errors(self):
510        tmpdir = self.mkdtemp()
511        file1 = os.path.join(tmpdir, 'file1')
512        file2 = os.path.join(tmpdir, 'file2')
513        write_file(file1, 'xxx')
514        write_file(file2, 'xxx')
515
516        def make_chflags_raiser(err):
517            ex = OSError()
518
519            def _chflags_raiser(path, flags, *, follow_symlinks=True):
520                ex.errno = err
521                raise ex
522            return _chflags_raiser
523        old_chflags = os.chflags
524        try:
525            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
526                os.chflags = make_chflags_raiser(err)
527                shutil.copystat(file1, file2)
528            # assert others errors break it
529            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
530            self.assertRaises(OSError, shutil.copystat, file1, file2)
531        finally:
532            os.chflags = old_chflags
533
534    @support.skip_unless_xattr
535    def test_copyxattr(self):
536        tmp_dir = self.mkdtemp()
537        src = os.path.join(tmp_dir, 'foo')
538        write_file(src, 'foo')
539        dst = os.path.join(tmp_dir, 'bar')
540        write_file(dst, 'bar')
541
542        # no xattr == no problem
543        shutil._copyxattr(src, dst)
544        # common case
545        os.setxattr(src, 'user.foo', b'42')
546        os.setxattr(src, 'user.bar', b'43')
547        shutil._copyxattr(src, dst)
548        self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
549        self.assertEqual(
550                os.getxattr(src, 'user.foo'),
551                os.getxattr(dst, 'user.foo'))
552        # check errors don't affect other attrs
553        os.remove(dst)
554        write_file(dst, 'bar')
555        os_error = OSError(errno.EPERM, 'EPERM')
556
557        def _raise_on_user_foo(fname, attr, val, **kwargs):
558            if attr == 'user.foo':
559                raise os_error
560            else:
561                orig_setxattr(fname, attr, val, **kwargs)
562        try:
563            orig_setxattr = os.setxattr
564            os.setxattr = _raise_on_user_foo
565            shutil._copyxattr(src, dst)
566            self.assertIn('user.bar', os.listxattr(dst))
567        finally:
568            os.setxattr = orig_setxattr
569        # the source filesystem not supporting xattrs should be ok, too.
570        def _raise_on_src(fname, *, follow_symlinks=True):
571            if fname == src:
572                raise OSError(errno.ENOTSUP, 'Operation not supported')
573            return orig_listxattr(fname, follow_symlinks=follow_symlinks)
574        try:
575            orig_listxattr = os.listxattr
576            os.listxattr = _raise_on_src
577            shutil._copyxattr(src, dst)
578        finally:
579            os.listxattr = orig_listxattr
580
581        # test that shutil.copystat copies xattrs
582        src = os.path.join(tmp_dir, 'the_original')
583        srcro = os.path.join(tmp_dir, 'the_original_ro')
584        write_file(src, src)
585        write_file(srcro, srcro)
586        os.setxattr(src, 'user.the_value', b'fiddly')
587        os.setxattr(srcro, 'user.the_value', b'fiddly')
588        os.chmod(srcro, 0o444)
589        dst = os.path.join(tmp_dir, 'the_copy')
590        dstro = os.path.join(tmp_dir, 'the_copy_ro')
591        write_file(dst, dst)
592        write_file(dstro, dstro)
593        shutil.copystat(src, dst)
594        shutil.copystat(srcro, dstro)
595        self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
596        self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
597
598    @support.skip_unless_symlink
599    @support.skip_unless_xattr
600    @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
601                         'root privileges required')
602    def test_copyxattr_symlinks(self):
603        # On Linux, it's only possible to access non-user xattr for symlinks;
604        # which in turn require root privileges. This test should be expanded
605        # as soon as other platforms gain support for extended attributes.
606        tmp_dir = self.mkdtemp()
607        src = os.path.join(tmp_dir, 'foo')
608        src_link = os.path.join(tmp_dir, 'baz')
609        write_file(src, 'foo')
610        os.symlink(src, src_link)
611        os.setxattr(src, 'trusted.foo', b'42')
612        os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
613        dst = os.path.join(tmp_dir, 'bar')
614        dst_link = os.path.join(tmp_dir, 'qux')
615        write_file(dst, 'bar')
616        os.symlink(dst, dst_link)
617        shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
618        self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
619        self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
620        shutil._copyxattr(src_link, dst, follow_symlinks=False)
621        self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
622
623    @support.skip_unless_symlink
624    def test_copy_symlinks(self):
625        tmp_dir = self.mkdtemp()
626        src = os.path.join(tmp_dir, 'foo')
627        dst = os.path.join(tmp_dir, 'bar')
628        src_link = os.path.join(tmp_dir, 'baz')
629        write_file(src, 'foo')
630        os.symlink(src, src_link)
631        if hasattr(os, 'lchmod'):
632            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
633        # don't follow
634        shutil.copy(src_link, dst, follow_symlinks=True)
635        self.assertFalse(os.path.islink(dst))
636        self.assertEqual(read_file(src), read_file(dst))
637        os.remove(dst)
638        # follow
639        shutil.copy(src_link, dst, follow_symlinks=False)
640        self.assertTrue(os.path.islink(dst))
641        self.assertEqual(os.readlink(dst), os.readlink(src_link))
642        if hasattr(os, 'lchmod'):
643            self.assertEqual(os.lstat(src_link).st_mode,
644                             os.lstat(dst).st_mode)
645
646    @support.skip_unless_symlink
647    def test_copy2_symlinks(self):
648        tmp_dir = self.mkdtemp()
649        src = os.path.join(tmp_dir, 'foo')
650        dst = os.path.join(tmp_dir, 'bar')
651        src_link = os.path.join(tmp_dir, 'baz')
652        write_file(src, 'foo')
653        os.symlink(src, src_link)
654        if hasattr(os, 'lchmod'):
655            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
656        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
657            os.lchflags(src_link, stat.UF_NODUMP)
658        src_stat = os.stat(src)
659        src_link_stat = os.lstat(src_link)
660        # follow
661        shutil.copy2(src_link, dst, follow_symlinks=True)
662        self.assertFalse(os.path.islink(dst))
663        self.assertEqual(read_file(src), read_file(dst))
664        os.remove(dst)
665        # don't follow
666        shutil.copy2(src_link, dst, follow_symlinks=False)
667        self.assertTrue(os.path.islink(dst))
668        self.assertEqual(os.readlink(dst), os.readlink(src_link))
669        dst_stat = os.lstat(dst)
670        if os.utime in os.supports_follow_symlinks:
671            for attr in 'st_atime', 'st_mtime':
672                # The modification times may be truncated in the new file.
673                self.assertLessEqual(getattr(src_link_stat, attr),
674                                     getattr(dst_stat, attr) + 1)
675        if hasattr(os, 'lchmod'):
676            self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
677            self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
678        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
679            self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
680
681    @support.skip_unless_xattr
682    def test_copy2_xattr(self):
683        tmp_dir = self.mkdtemp()
684        src = os.path.join(tmp_dir, 'foo')
685        dst = os.path.join(tmp_dir, 'bar')
686        write_file(src, 'foo')
687        os.setxattr(src, 'user.foo', b'42')
688        shutil.copy2(src, dst)
689        self.assertEqual(
690                os.getxattr(src, 'user.foo'),
691                os.getxattr(dst, 'user.foo'))
692        os.remove(dst)
693
694    @support.skip_unless_symlink
695    def test_copyfile_symlinks(self):
696        tmp_dir = self.mkdtemp()
697        src = os.path.join(tmp_dir, 'src')
698        dst = os.path.join(tmp_dir, 'dst')
699        dst_link = os.path.join(tmp_dir, 'dst_link')
700        link = os.path.join(tmp_dir, 'link')
701        write_file(src, 'foo')
702        os.symlink(src, link)
703        # don't follow
704        shutil.copyfile(link, dst_link, follow_symlinks=False)
705        self.assertTrue(os.path.islink(dst_link))
706        self.assertEqual(os.readlink(link), os.readlink(dst_link))
707        # follow
708        shutil.copyfile(link, dst)
709        self.assertFalse(os.path.islink(dst))
710
711    def test_rmtree_uses_safe_fd_version_if_available(self):
712        _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
713                             os.supports_dir_fd and
714                             os.listdir in os.supports_fd and
715                             os.stat in os.supports_follow_symlinks)
716        if _use_fd_functions:
717            self.assertTrue(shutil._use_fd_functions)
718            self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
719            tmp_dir = self.mkdtemp()
720            d = os.path.join(tmp_dir, 'a')
721            os.mkdir(d)
722            try:
723                real_rmtree = shutil._rmtree_safe_fd
724                class Called(Exception): pass
725                def _raiser(*args, **kwargs):
726                    raise Called
727                shutil._rmtree_safe_fd = _raiser
728                self.assertRaises(Called, shutil.rmtree, d)
729            finally:
730                shutil._rmtree_safe_fd = real_rmtree
731        else:
732            self.assertFalse(shutil._use_fd_functions)
733            self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
734
735    def test_rmtree_dont_delete_file(self):
736        # When called on a file instead of a directory, don't delete it.
737        handle, path = tempfile.mkstemp()
738        os.close(handle)
739        self.assertRaises(NotADirectoryError, shutil.rmtree, path)
740        os.remove(path)
741
742    def test_copytree_simple(self):
743        src_dir = tempfile.mkdtemp()
744        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
745        self.addCleanup(shutil.rmtree, src_dir)
746        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
747        write_file((src_dir, 'test.txt'), '123')
748        os.mkdir(os.path.join(src_dir, 'test_dir'))
749        write_file((src_dir, 'test_dir', 'test.txt'), '456')
750
751        shutil.copytree(src_dir, dst_dir)
752        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
753        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
754        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
755                                                    'test.txt')))
756        actual = read_file((dst_dir, 'test.txt'))
757        self.assertEqual(actual, '123')
758        actual = read_file((dst_dir, 'test_dir', 'test.txt'))
759        self.assertEqual(actual, '456')
760
761    def test_copytree_dirs_exist_ok(self):
762        src_dir = tempfile.mkdtemp()
763        dst_dir = tempfile.mkdtemp()
764        self.addCleanup(shutil.rmtree, src_dir)
765        self.addCleanup(shutil.rmtree, dst_dir)
766
767        write_file((src_dir, 'nonexisting.txt'), '123')
768        os.mkdir(os.path.join(src_dir, 'existing_dir'))
769        os.mkdir(os.path.join(dst_dir, 'existing_dir'))
770        write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
771        write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
772
773        shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
774        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
775        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
776        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
777                                                    'existing.txt')))
778        actual = read_file((dst_dir, 'nonexisting.txt'))
779        self.assertEqual(actual, '123')
780        actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
781        self.assertEqual(actual, 'has been replaced')
782
783        with self.assertRaises(FileExistsError):
784            shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
785
786    @support.skip_unless_symlink
787    def test_copytree_symlinks(self):
788        tmp_dir = self.mkdtemp()
789        src_dir = os.path.join(tmp_dir, 'src')
790        dst_dir = os.path.join(tmp_dir, 'dst')
791        sub_dir = os.path.join(src_dir, 'sub')
792        os.mkdir(src_dir)
793        os.mkdir(sub_dir)
794        write_file((src_dir, 'file.txt'), 'foo')
795        src_link = os.path.join(sub_dir, 'link')
796        dst_link = os.path.join(dst_dir, 'sub/link')
797        os.symlink(os.path.join(src_dir, 'file.txt'),
798                   src_link)
799        if hasattr(os, 'lchmod'):
800            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
801        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
802            os.lchflags(src_link, stat.UF_NODUMP)
803        src_stat = os.lstat(src_link)
804        shutil.copytree(src_dir, dst_dir, symlinks=True)
805        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
806        actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
807        # Bad practice to blindly strip the prefix as it may be required to
808        # correctly refer to the file, but we're only comparing paths here.
809        if os.name == 'nt' and actual.startswith('\\\\?\\'):
810            actual = actual[4:]
811        self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
812        dst_stat = os.lstat(dst_link)
813        if hasattr(os, 'lchmod'):
814            self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
815        if hasattr(os, 'lchflags'):
816            self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
817
818    def test_copytree_with_exclude(self):
819        # creating data
820        join = os.path.join
821        exists = os.path.exists
822        src_dir = tempfile.mkdtemp()
823        try:
824            dst_dir = join(tempfile.mkdtemp(), 'destination')
825            write_file((src_dir, 'test.txt'), '123')
826            write_file((src_dir, 'test.tmp'), '123')
827            os.mkdir(join(src_dir, 'test_dir'))
828            write_file((src_dir, 'test_dir', 'test.txt'), '456')
829            os.mkdir(join(src_dir, 'test_dir2'))
830            write_file((src_dir, 'test_dir2', 'test.txt'), '456')
831            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
832            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
833            write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
834            write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
835
836            # testing glob-like patterns
837            try:
838                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
839                shutil.copytree(src_dir, dst_dir, ignore=patterns)
840                # checking the result: some elements should not be copied
841                self.assertTrue(exists(join(dst_dir, 'test.txt')))
842                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
843                self.assertFalse(exists(join(dst_dir, 'test_dir2')))
844            finally:
845                shutil.rmtree(dst_dir)
846            try:
847                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
848                shutil.copytree(src_dir, dst_dir, ignore=patterns)
849                # checking the result: some elements should not be copied
850                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
851                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
852                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
853            finally:
854                shutil.rmtree(dst_dir)
855
856            # testing callable-style
857            try:
858                def _filter(src, names):
859                    res = []
860                    for name in names:
861                        path = os.path.join(src, name)
862
863                        if (os.path.isdir(path) and
864                            path.split()[-1] == 'subdir'):
865                            res.append(name)
866                        elif os.path.splitext(path)[-1] in ('.py'):
867                            res.append(name)
868                    return res
869
870                shutil.copytree(src_dir, dst_dir, ignore=_filter)
871
872                # checking the result: some elements should not be copied
873                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
874                                             'test.py')))
875                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
876
877            finally:
878                shutil.rmtree(dst_dir)
879        finally:
880            shutil.rmtree(src_dir)
881            shutil.rmtree(os.path.dirname(dst_dir))
882
883    def test_copytree_retains_permissions(self):
884        tmp_dir = tempfile.mkdtemp()
885        src_dir = os.path.join(tmp_dir, 'source')
886        os.mkdir(src_dir)
887        dst_dir = os.path.join(tmp_dir, 'destination')
888        self.addCleanup(shutil.rmtree, tmp_dir)
889
890        os.chmod(src_dir, 0o777)
891        write_file((src_dir, 'permissive.txt'), '123')
892        os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
893        write_file((src_dir, 'restrictive.txt'), '456')
894        os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
895        restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
896        os.chmod(restrictive_subdir, 0o600)
897
898        shutil.copytree(src_dir, dst_dir)
899        self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
900        self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
901                          os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
902        self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
903                          os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
904        restrictive_subdir_dst = os.path.join(dst_dir,
905                                              os.path.split(restrictive_subdir)[1])
906        self.assertEqual(os.stat(restrictive_subdir).st_mode,
907                          os.stat(restrictive_subdir_dst).st_mode)
908
909    @unittest.mock.patch('os.chmod')
910    def test_copytree_winerror(self, mock_patch):
911        # When copying to VFAT, copystat() raises OSError. On Windows, the
912        # exception object has a meaningful 'winerror' attribute, but not
913        # on other operating systems. Do not assume 'winerror' is set.
914        src_dir = tempfile.mkdtemp()
915        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
916        self.addCleanup(shutil.rmtree, src_dir)
917        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
918
919        mock_patch.side_effect = PermissionError('ka-boom')
920        with self.assertRaises(shutil.Error):
921            shutil.copytree(src_dir, dst_dir)
922
923    def test_copytree_custom_copy_function(self):
924        # See: https://bugs.python.org/issue35648
925        def custom_cpfun(a, b):
926            flag.append(None)
927            self.assertIsInstance(a, str)
928            self.assertIsInstance(b, str)
929            self.assertEqual(a, os.path.join(src, 'foo'))
930            self.assertEqual(b, os.path.join(dst, 'foo'))
931
932        flag = []
933        src = tempfile.mkdtemp()
934        self.addCleanup(support.rmtree, src)
935        dst = tempfile.mktemp()
936        self.addCleanup(support.rmtree, dst)
937        with open(os.path.join(src, 'foo'), 'w') as f:
938            f.close()
939        shutil.copytree(src, dst, copy_function=custom_cpfun)
940        self.assertEqual(len(flag), 1)
941
942    @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
943    def test_dont_copy_file_onto_link_to_itself(self):
944        # bug 851123.
945        os.mkdir(TESTFN)
946        src = os.path.join(TESTFN, 'cheese')
947        dst = os.path.join(TESTFN, 'shop')
948        try:
949            with open(src, 'w') as f:
950                f.write('cheddar')
951            try:
952                os.link(src, dst)
953            except PermissionError as e:
954                self.skipTest('os.link(): %s' % e)
955            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
956            with open(src, 'r') as f:
957                self.assertEqual(f.read(), 'cheddar')
958            os.remove(dst)
959        finally:
960            shutil.rmtree(TESTFN, ignore_errors=True)
961
962    @support.skip_unless_symlink
963    def test_dont_copy_file_onto_symlink_to_itself(self):
964        # bug 851123.
965        os.mkdir(TESTFN)
966        src = os.path.join(TESTFN, 'cheese')
967        dst = os.path.join(TESTFN, 'shop')
968        try:
969            with open(src, 'w') as f:
970                f.write('cheddar')
971            # Using `src` here would mean we end up with a symlink pointing
972            # to TESTFN/TESTFN/cheese, while it should point at
973            # TESTFN/cheese.
974            os.symlink('cheese', dst)
975            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
976            with open(src, 'r') as f:
977                self.assertEqual(f.read(), 'cheddar')
978            os.remove(dst)
979        finally:
980            shutil.rmtree(TESTFN, ignore_errors=True)
981
982    @support.skip_unless_symlink
983    def test_rmtree_on_symlink(self):
984        # bug 1669.
985        os.mkdir(TESTFN)
986        try:
987            src = os.path.join(TESTFN, 'cheese')
988            dst = os.path.join(TESTFN, 'shop')
989            os.mkdir(src)
990            os.symlink(src, dst)
991            self.assertRaises(OSError, shutil.rmtree, dst)
992            shutil.rmtree(dst, ignore_errors=True)
993        finally:
994            shutil.rmtree(TESTFN, ignore_errors=True)
995
996    @unittest.skipUnless(_winapi, 'only relevant on Windows')
997    def test_rmtree_on_junction(self):
998        os.mkdir(TESTFN)
999        try:
1000            src = os.path.join(TESTFN, 'cheese')
1001            dst = os.path.join(TESTFN, 'shop')
1002            os.mkdir(src)
1003            open(os.path.join(src, 'spam'), 'wb').close()
1004            _winapi.CreateJunction(src, dst)
1005            self.assertRaises(OSError, shutil.rmtree, dst)
1006            shutil.rmtree(dst, ignore_errors=True)
1007        finally:
1008            shutil.rmtree(TESTFN, ignore_errors=True)
1009
1010    # Issue #3002: copyfile and copytree block indefinitely on named pipes
1011    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1012    def test_copyfile_named_pipe(self):
1013        try:
1014            os.mkfifo(TESTFN)
1015        except PermissionError as e:
1016            self.skipTest('os.mkfifo(): %s' % e)
1017        try:
1018            self.assertRaises(shutil.SpecialFileError,
1019                                shutil.copyfile, TESTFN, TESTFN2)
1020            self.assertRaises(shutil.SpecialFileError,
1021                                shutil.copyfile, __file__, TESTFN)
1022        finally:
1023            os.remove(TESTFN)
1024
1025    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1026    @support.skip_unless_symlink
1027    def test_copytree_named_pipe(self):
1028        os.mkdir(TESTFN)
1029        try:
1030            subdir = os.path.join(TESTFN, "subdir")
1031            os.mkdir(subdir)
1032            pipe = os.path.join(subdir, "mypipe")
1033            try:
1034                os.mkfifo(pipe)
1035            except PermissionError as e:
1036                self.skipTest('os.mkfifo(): %s' % e)
1037            try:
1038                shutil.copytree(TESTFN, TESTFN2)
1039            except shutil.Error as e:
1040                errors = e.args[0]
1041                self.assertEqual(len(errors), 1)
1042                src, dst, error_msg = errors[0]
1043                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
1044            else:
1045                self.fail("shutil.Error should have been raised")
1046        finally:
1047            shutil.rmtree(TESTFN, ignore_errors=True)
1048            shutil.rmtree(TESTFN2, ignore_errors=True)
1049
1050    def test_copytree_special_func(self):
1051
1052        src_dir = self.mkdtemp()
1053        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1054        write_file((src_dir, 'test.txt'), '123')
1055        os.mkdir(os.path.join(src_dir, 'test_dir'))
1056        write_file((src_dir, 'test_dir', 'test.txt'), '456')
1057
1058        copied = []
1059        def _copy(src, dst):
1060            copied.append((src, dst))
1061
1062        shutil.copytree(src_dir, dst_dir, copy_function=_copy)
1063        self.assertEqual(len(copied), 2)
1064
1065    @support.skip_unless_symlink
1066    def test_copytree_dangling_symlinks(self):
1067
1068        # a dangling symlink raises an error at the end
1069        src_dir = self.mkdtemp()
1070        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1071        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
1072        os.mkdir(os.path.join(src_dir, 'test_dir'))
1073        write_file((src_dir, 'test_dir', 'test.txt'), '456')
1074        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
1075
1076        # a dangling symlink is ignored with the proper flag
1077        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1078        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
1079        self.assertNotIn('test.txt', os.listdir(dst_dir))
1080
1081        # a dangling symlink is copied if symlinks=True
1082        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
1083        shutil.copytree(src_dir, dst_dir, symlinks=True)
1084        self.assertIn('test.txt', os.listdir(dst_dir))
1085
1086    @support.skip_unless_symlink
1087    def test_copytree_symlink_dir(self):
1088        src_dir = self.mkdtemp()
1089        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1090        os.mkdir(os.path.join(src_dir, 'real_dir'))
1091        with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1092            pass
1093        os.symlink(os.path.join(src_dir, 'real_dir'),
1094                   os.path.join(src_dir, 'link_to_dir'),
1095                   target_is_directory=True)
1096
1097        shutil.copytree(src_dir, dst_dir, symlinks=False)
1098        self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1099        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1100
1101        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1102        shutil.copytree(src_dir, dst_dir, symlinks=True)
1103        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1104        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1105
1106    def _copy_file(self, method):
1107        fname = 'test.txt'
1108        tmpdir = self.mkdtemp()
1109        write_file((tmpdir, fname), 'xxx')
1110        file1 = os.path.join(tmpdir, fname)
1111        tmpdir2 = self.mkdtemp()
1112        method(file1, tmpdir2)
1113        file2 = os.path.join(tmpdir2, fname)
1114        return (file1, file2)
1115
1116    def test_copy(self):
1117        # Ensure that the copied file exists and has the same mode bits.
1118        file1, file2 = self._copy_file(shutil.copy)
1119        self.assertTrue(os.path.exists(file2))
1120        self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1121
1122    @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1123    def test_copy2(self):
1124        # Ensure that the copied file exists and has the same mode and
1125        # modification time bits.
1126        file1, file2 = self._copy_file(shutil.copy2)
1127        self.assertTrue(os.path.exists(file2))
1128        file1_stat = os.stat(file1)
1129        file2_stat = os.stat(file2)
1130        self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1131        for attr in 'st_atime', 'st_mtime':
1132            # The modification times may be truncated in the new file.
1133            self.assertLessEqual(getattr(file1_stat, attr),
1134                                 getattr(file2_stat, attr) + 1)
1135        if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1136            self.assertEqual(getattr(file1_stat, 'st_flags'),
1137                             getattr(file2_stat, 'st_flags'))
1138
1139    @support.requires_zlib
1140    def test_make_tarball(self):
1141        # creating something to tar
1142        root_dir, base_dir = self._create_files('')
1143
1144        tmpdir2 = self.mkdtemp()
1145        # force shutil to create the directory
1146        os.rmdir(tmpdir2)
1147        # working with relative paths
1148        work_dir = os.path.dirname(tmpdir2)
1149        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1150
1151        with support.change_cwd(work_dir):
1152            base_name = os.path.abspath(rel_base_name)
1153            tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
1154
1155        # check if the compressed tarball was created
1156        self.assertEqual(tarball, base_name + '.tar.gz')
1157        self.assertTrue(os.path.isfile(tarball))
1158        self.assertTrue(tarfile.is_tarfile(tarball))
1159        with tarfile.open(tarball, 'r:gz') as tf:
1160            self.assertCountEqual(tf.getnames(),
1161                                  ['.', './sub', './sub2',
1162                                   './file1', './file2', './sub/file3'])
1163
1164        # trying an uncompressed one
1165        with support.change_cwd(work_dir):
1166            tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
1167        self.assertEqual(tarball, base_name + '.tar')
1168        self.assertTrue(os.path.isfile(tarball))
1169        self.assertTrue(tarfile.is_tarfile(tarball))
1170        with tarfile.open(tarball, 'r') as tf:
1171            self.assertCountEqual(tf.getnames(),
1172                                  ['.', './sub', './sub2',
1173                                  './file1', './file2', './sub/file3'])
1174
1175    def _tarinfo(self, path):
1176        with tarfile.open(path) as tar:
1177            names = tar.getnames()
1178            names.sort()
1179            return tuple(names)
1180
1181    def _create_files(self, base_dir='dist'):
1182        # creating something to tar
1183        root_dir = self.mkdtemp()
1184        dist = os.path.join(root_dir, base_dir)
1185        os.makedirs(dist, exist_ok=True)
1186        write_file((dist, 'file1'), 'xxx')
1187        write_file((dist, 'file2'), 'xxx')
1188        os.mkdir(os.path.join(dist, 'sub'))
1189        write_file((dist, 'sub', 'file3'), 'xxx')
1190        os.mkdir(os.path.join(dist, 'sub2'))
1191        if base_dir:
1192            write_file((root_dir, 'outer'), 'xxx')
1193        return root_dir, base_dir
1194
1195    @support.requires_zlib
1196    @unittest.skipUnless(shutil.which('tar'),
1197                         'Need the tar command to run')
1198    def test_tarfile_vs_tar(self):
1199        root_dir, base_dir = self._create_files()
1200        base_name = os.path.join(self.mkdtemp(), 'archive')
1201        tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
1202
1203        # check if the compressed tarball was created
1204        self.assertEqual(tarball, base_name + '.tar.gz')
1205        self.assertTrue(os.path.isfile(tarball))
1206
1207        # now create another tarball using `tar`
1208        tarball2 = os.path.join(root_dir, 'archive2.tar')
1209        tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
1210        subprocess.check_call(tar_cmd, cwd=root_dir,
1211                              stdout=subprocess.DEVNULL)
1212
1213        self.assertTrue(os.path.isfile(tarball2))
1214        # let's compare both tarballs
1215        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
1216
1217        # trying an uncompressed one
1218        tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1219        self.assertEqual(tarball, base_name + '.tar')
1220        self.assertTrue(os.path.isfile(tarball))
1221
1222        # now for a dry_run
1223        tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1224                               dry_run=True)
1225        self.assertEqual(tarball, base_name + '.tar')
1226        self.assertTrue(os.path.isfile(tarball))
1227
1228    @support.requires_zlib
1229    def test_make_zipfile(self):
1230        # creating something to zip
1231        root_dir, base_dir = self._create_files()
1232
1233        tmpdir2 = self.mkdtemp()
1234        # force shutil to create the directory
1235        os.rmdir(tmpdir2)
1236        # working with relative paths
1237        work_dir = os.path.dirname(tmpdir2)
1238        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1239
1240        with support.change_cwd(work_dir):
1241            base_name = os.path.abspath(rel_base_name)
1242            res = make_archive(rel_base_name, 'zip', root_dir)
1243
1244        self.assertEqual(res, base_name + '.zip')
1245        self.assertTrue(os.path.isfile(res))
1246        self.assertTrue(zipfile.is_zipfile(res))
1247        with zipfile.ZipFile(res) as zf:
1248            self.assertCountEqual(zf.namelist(),
1249                    ['dist/', 'dist/sub/', 'dist/sub2/',
1250                     'dist/file1', 'dist/file2', 'dist/sub/file3',
1251                     'outer'])
1252
1253        with support.change_cwd(work_dir):
1254            base_name = os.path.abspath(rel_base_name)
1255            res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
1256
1257        self.assertEqual(res, base_name + '.zip')
1258        self.assertTrue(os.path.isfile(res))
1259        self.assertTrue(zipfile.is_zipfile(res))
1260        with zipfile.ZipFile(res) as zf:
1261            self.assertCountEqual(zf.namelist(),
1262                    ['dist/', 'dist/sub/', 'dist/sub2/',
1263                     'dist/file1', 'dist/file2', 'dist/sub/file3'])
1264
1265    @support.requires_zlib
1266    @unittest.skipUnless(shutil.which('zip'),
1267                         'Need the zip command to run')
1268    def test_zipfile_vs_zip(self):
1269        root_dir, base_dir = self._create_files()
1270        base_name = os.path.join(self.mkdtemp(), 'archive')
1271        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1272
1273        # check if ZIP file  was created
1274        self.assertEqual(archive, base_name + '.zip')
1275        self.assertTrue(os.path.isfile(archive))
1276
1277        # now create another ZIP file using `zip`
1278        archive2 = os.path.join(root_dir, 'archive2.zip')
1279        zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
1280        subprocess.check_call(zip_cmd, cwd=root_dir,
1281                              stdout=subprocess.DEVNULL)
1282
1283        self.assertTrue(os.path.isfile(archive2))
1284        # let's compare both ZIP files
1285        with zipfile.ZipFile(archive) as zf:
1286            names = zf.namelist()
1287        with zipfile.ZipFile(archive2) as zf:
1288            names2 = zf.namelist()
1289        self.assertEqual(sorted(names), sorted(names2))
1290
1291    @support.requires_zlib
1292    @unittest.skipUnless(shutil.which('unzip'),
1293                         'Need the unzip command to run')
1294    def test_unzip_zipfile(self):
1295        root_dir, base_dir = self._create_files()
1296        base_name = os.path.join(self.mkdtemp(), 'archive')
1297        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1298
1299        # check if ZIP file  was created
1300        self.assertEqual(archive, base_name + '.zip')
1301        self.assertTrue(os.path.isfile(archive))
1302
1303        # now check the ZIP file using `unzip -t`
1304        zip_cmd = ['unzip', '-t', archive]
1305        with support.change_cwd(root_dir):
1306            try:
1307                subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1308            except subprocess.CalledProcessError as exc:
1309                details = exc.output.decode(errors="replace")
1310                if 'unrecognized option: t' in details:
1311                    self.skipTest("unzip doesn't support -t")
1312                msg = "{}\n\n**Unzip Output**\n{}"
1313                self.fail(msg.format(exc, details))
1314
1315    def test_make_archive(self):
1316        tmpdir = self.mkdtemp()
1317        base_name = os.path.join(tmpdir, 'archive')
1318        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1319
1320    @support.requires_zlib
1321    def test_make_archive_owner_group(self):
1322        # testing make_archive with owner and group, with various combinations
1323        # this works even if there's not gid/uid support
1324        if UID_GID_SUPPORT:
1325            group = grp.getgrgid(0)[0]
1326            owner = pwd.getpwuid(0)[0]
1327        else:
1328            group = owner = 'root'
1329
1330        root_dir, base_dir = self._create_files()
1331        base_name = os.path.join(self.mkdtemp(), 'archive')
1332        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1333                           group=group)
1334        self.assertTrue(os.path.isfile(res))
1335
1336        res = make_archive(base_name, 'zip', root_dir, base_dir)
1337        self.assertTrue(os.path.isfile(res))
1338
1339        res = make_archive(base_name, 'tar', root_dir, base_dir,
1340                           owner=owner, group=group)
1341        self.assertTrue(os.path.isfile(res))
1342
1343        res = make_archive(base_name, 'tar', root_dir, base_dir,
1344                           owner='kjhkjhkjg', group='oihohoh')
1345        self.assertTrue(os.path.isfile(res))
1346
1347
1348    @support.requires_zlib
1349    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1350    def test_tarfile_root_owner(self):
1351        root_dir, base_dir = self._create_files()
1352        base_name = os.path.join(self.mkdtemp(), 'archive')
1353        group = grp.getgrgid(0)[0]
1354        owner = pwd.getpwuid(0)[0]
1355        with support.change_cwd(root_dir):
1356            archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1357                                        owner=owner, group=group)
1358
1359        # check if the compressed tarball was created
1360        self.assertTrue(os.path.isfile(archive_name))
1361
1362        # now checks the rights
1363        archive = tarfile.open(archive_name)
1364        try:
1365            for member in archive.getmembers():
1366                self.assertEqual(member.uid, 0)
1367                self.assertEqual(member.gid, 0)
1368        finally:
1369            archive.close()
1370
1371    def test_make_archive_cwd(self):
1372        current_dir = os.getcwd()
1373        def _breaks(*args, **kw):
1374            raise RuntimeError()
1375
1376        register_archive_format('xxx', _breaks, [], 'xxx file')
1377        try:
1378            try:
1379                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1380            except Exception:
1381                pass
1382            self.assertEqual(os.getcwd(), current_dir)
1383        finally:
1384            unregister_archive_format('xxx')
1385
1386    def test_make_tarfile_in_curdir(self):
1387        # Issue #21280
1388        root_dir = self.mkdtemp()
1389        with support.change_cwd(root_dir):
1390            self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1391            self.assertTrue(os.path.isfile('test.tar'))
1392
1393    @support.requires_zlib
1394    def test_make_zipfile_in_curdir(self):
1395        # Issue #21280
1396        root_dir = self.mkdtemp()
1397        with support.change_cwd(root_dir):
1398            self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1399            self.assertTrue(os.path.isfile('test.zip'))
1400
1401    def test_register_archive_format(self):
1402
1403        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1404        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1405                          1)
1406        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1407                          [(1, 2), (1, 2, 3)])
1408
1409        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1410        formats = [name for name, params in get_archive_formats()]
1411        self.assertIn('xxx', formats)
1412
1413        unregister_archive_format('xxx')
1414        formats = [name for name, params in get_archive_formats()]
1415        self.assertNotIn('xxx', formats)
1416
1417    def check_unpack_archive(self, format):
1418        self.check_unpack_archive_with_converter(format, lambda path: path)
1419        self.check_unpack_archive_with_converter(format, pathlib.Path)
1420        self.check_unpack_archive_with_converter(format, FakePath)
1421
1422    def check_unpack_archive_with_converter(self, format, converter):
1423        root_dir, base_dir = self._create_files()
1424        expected = rlistdir(root_dir)
1425        expected.remove('outer')
1426
1427        base_name = os.path.join(self.mkdtemp(), 'archive')
1428        filename = make_archive(base_name, format, root_dir, base_dir)
1429
1430        # let's try to unpack it now
1431        tmpdir2 = self.mkdtemp()
1432        unpack_archive(converter(filename), converter(tmpdir2))
1433        self.assertEqual(rlistdir(tmpdir2), expected)
1434
1435        # and again, this time with the format specified
1436        tmpdir3 = self.mkdtemp()
1437        unpack_archive(converter(filename), converter(tmpdir3), format=format)
1438        self.assertEqual(rlistdir(tmpdir3), expected)
1439
1440        self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1441        self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
1442
1443    def test_unpack_archive_tar(self):
1444        self.check_unpack_archive('tar')
1445
1446    @support.requires_zlib
1447    def test_unpack_archive_gztar(self):
1448        self.check_unpack_archive('gztar')
1449
1450    @support.requires_bz2
1451    def test_unpack_archive_bztar(self):
1452        self.check_unpack_archive('bztar')
1453
1454    @support.requires_lzma
1455    @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
1456    def test_unpack_archive_xztar(self):
1457        self.check_unpack_archive('xztar')
1458
1459    @support.requires_zlib
1460    def test_unpack_archive_zip(self):
1461        self.check_unpack_archive('zip')
1462
1463    def test_unpack_registry(self):
1464
1465        formats = get_unpack_formats()
1466
1467        def _boo(filename, extract_dir, extra):
1468            self.assertEqual(extra, 1)
1469            self.assertEqual(filename, 'stuff.boo')
1470            self.assertEqual(extract_dir, 'xx')
1471
1472        register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1473        unpack_archive('stuff.boo', 'xx')
1474
1475        # trying to register a .boo unpacker again
1476        self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1477                          ['.boo'], _boo)
1478
1479        # should work now
1480        unregister_unpack_format('Boo')
1481        register_unpack_format('Boo2', ['.boo'], _boo)
1482        self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1483        self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1484
1485        # let's leave a clean state
1486        unregister_unpack_format('Boo2')
1487        self.assertEqual(get_unpack_formats(), formats)
1488
1489    @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1490                         "disk_usage not available on this platform")
1491    def test_disk_usage(self):
1492        usage = shutil.disk_usage(os.path.dirname(__file__))
1493        for attr in ('total', 'used', 'free'):
1494            self.assertIsInstance(getattr(usage, attr), int)
1495        self.assertGreater(usage.total, 0)
1496        self.assertGreater(usage.used, 0)
1497        self.assertGreaterEqual(usage.free, 0)
1498        self.assertGreaterEqual(usage.total, usage.used)
1499        self.assertGreater(usage.total, usage.free)
1500
1501        # bpo-32557: Check that disk_usage() also accepts a filename
1502        shutil.disk_usage(__file__)
1503
1504    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1505    @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1506    def test_chown(self):
1507
1508        # cleaned-up automatically by TestShutil.tearDown method
1509        dirname = self.mkdtemp()
1510        filename = tempfile.mktemp(dir=dirname)
1511        write_file(filename, 'testing chown function')
1512
1513        with self.assertRaises(ValueError):
1514            shutil.chown(filename)
1515
1516        with self.assertRaises(LookupError):
1517            shutil.chown(filename, user='non-existing username')
1518
1519        with self.assertRaises(LookupError):
1520            shutil.chown(filename, group='non-existing groupname')
1521
1522        with self.assertRaises(TypeError):
1523            shutil.chown(filename, b'spam')
1524
1525        with self.assertRaises(TypeError):
1526            shutil.chown(filename, 3.14)
1527
1528        uid = os.getuid()
1529        gid = os.getgid()
1530
1531        def check_chown(path, uid=None, gid=None):
1532            s = os.stat(filename)
1533            if uid is not None:
1534                self.assertEqual(uid, s.st_uid)
1535            if gid is not None:
1536                self.assertEqual(gid, s.st_gid)
1537
1538        shutil.chown(filename, uid, gid)
1539        check_chown(filename, uid, gid)
1540        shutil.chown(filename, uid)
1541        check_chown(filename, uid)
1542        shutil.chown(filename, user=uid)
1543        check_chown(filename, uid)
1544        shutil.chown(filename, group=gid)
1545        check_chown(filename, gid=gid)
1546
1547        shutil.chown(dirname, uid, gid)
1548        check_chown(dirname, uid, gid)
1549        shutil.chown(dirname, uid)
1550        check_chown(dirname, uid)
1551        shutil.chown(dirname, user=uid)
1552        check_chown(dirname, uid)
1553        shutil.chown(dirname, group=gid)
1554        check_chown(dirname, gid=gid)
1555
1556        user = pwd.getpwuid(uid)[0]
1557        group = grp.getgrgid(gid)[0]
1558        shutil.chown(filename, user, group)
1559        check_chown(filename, uid, gid)
1560        shutil.chown(dirname, user, group)
1561        check_chown(dirname, uid, gid)
1562
1563    def test_copy_return_value(self):
1564        # copy and copy2 both return their destination path.
1565        for fn in (shutil.copy, shutil.copy2):
1566            src_dir = self.mkdtemp()
1567            dst_dir = self.mkdtemp()
1568            src = os.path.join(src_dir, 'foo')
1569            write_file(src, 'foo')
1570            rv = fn(src, dst_dir)
1571            self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1572            rv = fn(src, os.path.join(dst_dir, 'bar'))
1573            self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1574
1575    def test_copyfile_return_value(self):
1576        # copytree returns its destination path.
1577        src_dir = self.mkdtemp()
1578        dst_dir = self.mkdtemp()
1579        dst_file = os.path.join(dst_dir, 'bar')
1580        src_file = os.path.join(src_dir, 'foo')
1581        write_file(src_file, 'foo')
1582        rv = shutil.copyfile(src_file, dst_file)
1583        self.assertTrue(os.path.exists(rv))
1584        self.assertEqual(read_file(src_file), read_file(dst_file))
1585
1586    def test_copyfile_same_file(self):
1587        # copyfile() should raise SameFileError if the source and destination
1588        # are the same.
1589        src_dir = self.mkdtemp()
1590        src_file = os.path.join(src_dir, 'foo')
1591        write_file(src_file, 'foo')
1592        self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1593        # But Error should work too, to stay backward compatible.
1594        self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1595        # Make sure file is not corrupted.
1596        self.assertEqual(read_file(src_file), 'foo')
1597
1598    def test_copytree_return_value(self):
1599        # copytree returns its destination path.
1600        src_dir = self.mkdtemp()
1601        dst_dir = src_dir + "dest"
1602        self.addCleanup(shutil.rmtree, dst_dir, True)
1603        src = os.path.join(src_dir, 'foo')
1604        write_file(src, 'foo')
1605        rv = shutil.copytree(src_dir, dst_dir)
1606        self.assertEqual(['foo'], os.listdir(rv))
1607
1608    def test_copytree_subdirectory(self):
1609        # copytree where dst is a subdirectory of src, see Issue 38688
1610        base_dir = self.mkdtemp()
1611        self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
1612        src_dir = os.path.join(base_dir, "t", "pg")
1613        dst_dir = os.path.join(src_dir, "somevendor", "1.0")
1614        os.makedirs(src_dir)
1615        src = os.path.join(src_dir, 'pol')
1616        write_file(src, 'pol')
1617        rv = shutil.copytree(src_dir, dst_dir)
1618        self.assertEqual(['pol'], os.listdir(rv))
1619
1620
1621class TestWhich(unittest.TestCase):
1622
1623    def setUp(self):
1624        self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
1625        self.addCleanup(shutil.rmtree, self.temp_dir, True)
1626        # Give the temp_file an ".exe" suffix for all.
1627        # It's needed on Windows and not harmful on other platforms.
1628        self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1629                                                     prefix="Tmp",
1630                                                     suffix=".Exe")
1631        os.chmod(self.temp_file.name, stat.S_IXUSR)
1632        self.addCleanup(self.temp_file.close)
1633        self.dir, self.file = os.path.split(self.temp_file.name)
1634        self.env_path = self.dir
1635        self.curdir = os.curdir
1636        self.ext = ".EXE"
1637
1638    def test_basic(self):
1639        # Given an EXE in a directory, it should be returned.
1640        rv = shutil.which(self.file, path=self.dir)
1641        self.assertEqual(rv, self.temp_file.name)
1642
1643    def test_absolute_cmd(self):
1644        # When given the fully qualified path to an executable that exists,
1645        # it should be returned.
1646        rv = shutil.which(self.temp_file.name, path=self.temp_dir)
1647        self.assertEqual(rv, self.temp_file.name)
1648
1649    def test_relative_cmd(self):
1650        # When given the relative path with a directory part to an executable
1651        # that exists, it should be returned.
1652        base_dir, tail_dir = os.path.split(self.dir)
1653        relpath = os.path.join(tail_dir, self.file)
1654        with support.change_cwd(path=base_dir):
1655            rv = shutil.which(relpath, path=self.temp_dir)
1656            self.assertEqual(rv, relpath)
1657        # But it shouldn't be searched in PATH directories (issue #16957).
1658        with support.change_cwd(path=self.dir):
1659            rv = shutil.which(relpath, path=base_dir)
1660            self.assertIsNone(rv)
1661
1662    def test_cwd(self):
1663        # Issue #16957
1664        base_dir = os.path.dirname(self.dir)
1665        with support.change_cwd(path=self.dir):
1666            rv = shutil.which(self.file, path=base_dir)
1667            if sys.platform == "win32":
1668                # Windows: current directory implicitly on PATH
1669                self.assertEqual(rv, os.path.join(self.curdir, self.file))
1670            else:
1671                # Other platforms: shouldn't match in the current directory.
1672                self.assertIsNone(rv)
1673
1674    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1675                     'non-root user required')
1676    def test_non_matching_mode(self):
1677        # Set the file read-only and ask for writeable files.
1678        os.chmod(self.temp_file.name, stat.S_IREAD)
1679        if os.access(self.temp_file.name, os.W_OK):
1680            self.skipTest("can't set the file read-only")
1681        rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1682        self.assertIsNone(rv)
1683
1684    def test_relative_path(self):
1685        base_dir, tail_dir = os.path.split(self.dir)
1686        with support.change_cwd(path=base_dir):
1687            rv = shutil.which(self.file, path=tail_dir)
1688            self.assertEqual(rv, os.path.join(tail_dir, self.file))
1689
1690    def test_nonexistent_file(self):
1691        # Return None when no matching executable file is found on the path.
1692        rv = shutil.which("foo.exe", path=self.dir)
1693        self.assertIsNone(rv)
1694
1695    @unittest.skipUnless(sys.platform == "win32",
1696                         "pathext check is Windows-only")
1697    def test_pathext_checking(self):
1698        # Ask for the file without the ".exe" extension, then ensure that
1699        # it gets found properly with the extension.
1700        rv = shutil.which(self.file[:-4], path=self.dir)
1701        self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
1702
1703    def test_environ_path(self):
1704        with support.EnvironmentVarGuard() as env:
1705            env['PATH'] = self.env_path
1706            rv = shutil.which(self.file)
1707            self.assertEqual(rv, self.temp_file.name)
1708
1709    def test_environ_path_empty(self):
1710        # PATH='': no match
1711        with support.EnvironmentVarGuard() as env:
1712            env['PATH'] = ''
1713            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1714                                     create=True), \
1715                 support.swap_attr(os, 'defpath', self.dir), \
1716                 support.change_cwd(self.dir):
1717                rv = shutil.which(self.file)
1718                self.assertIsNone(rv)
1719
1720    def test_environ_path_cwd(self):
1721        expected_cwd = os.path.basename(self.temp_file.name)
1722        if sys.platform == "win32":
1723            curdir = os.curdir
1724            if isinstance(expected_cwd, bytes):
1725                curdir = os.fsencode(curdir)
1726            expected_cwd = os.path.join(curdir, expected_cwd)
1727
1728        # PATH=':': explicitly looks in the current directory
1729        with support.EnvironmentVarGuard() as env:
1730            env['PATH'] = os.pathsep
1731            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1732                                     create=True), \
1733                 support.swap_attr(os, 'defpath', self.dir):
1734                rv = shutil.which(self.file)
1735                self.assertIsNone(rv)
1736
1737                # look in current directory
1738                with support.change_cwd(self.dir):
1739                    rv = shutil.which(self.file)
1740                    self.assertEqual(rv, expected_cwd)
1741
1742    def test_environ_path_missing(self):
1743        with support.EnvironmentVarGuard() as env:
1744            env.pop('PATH', None)
1745
1746            # without confstr
1747            with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1748                                     create=True), \
1749                 support.swap_attr(os, 'defpath', self.dir):
1750                rv = shutil.which(self.file)
1751            self.assertEqual(rv, self.temp_file.name)
1752
1753            # with confstr
1754            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1755                                     create=True), \
1756                 support.swap_attr(os, 'defpath', ''):
1757                rv = shutil.which(self.file)
1758            self.assertEqual(rv, self.temp_file.name)
1759
1760    def test_empty_path(self):
1761        base_dir = os.path.dirname(self.dir)
1762        with support.change_cwd(path=self.dir), \
1763             support.EnvironmentVarGuard() as env:
1764            env['PATH'] = self.env_path
1765            rv = shutil.which(self.file, path='')
1766            self.assertIsNone(rv)
1767
1768    def test_empty_path_no_PATH(self):
1769        with support.EnvironmentVarGuard() as env:
1770            env.pop('PATH', None)
1771            rv = shutil.which(self.file)
1772            self.assertIsNone(rv)
1773
1774    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1775    def test_pathext(self):
1776        ext = ".xyz"
1777        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1778                                                   prefix="Tmp2", suffix=ext)
1779        os.chmod(temp_filexyz.name, stat.S_IXUSR)
1780        self.addCleanup(temp_filexyz.close)
1781
1782        # strip path and extension
1783        program = os.path.basename(temp_filexyz.name)
1784        program = os.path.splitext(program)[0]
1785
1786        with support.EnvironmentVarGuard() as env:
1787            env['PATHEXT'] = ext
1788            rv = shutil.which(program, path=self.temp_dir)
1789            self.assertEqual(rv, temp_filexyz.name)
1790
1791
1792class TestWhichBytes(TestWhich):
1793    def setUp(self):
1794        TestWhich.setUp(self)
1795        self.dir = os.fsencode(self.dir)
1796        self.file = os.fsencode(self.file)
1797        self.temp_file.name = os.fsencode(self.temp_file.name)
1798        self.curdir = os.fsencode(self.curdir)
1799        self.ext = os.fsencode(self.ext)
1800
1801
1802class TestMove(unittest.TestCase):
1803
1804    def setUp(self):
1805        filename = "foo"
1806        basedir = None
1807        if sys.platform == "win32":
1808            basedir = os.path.realpath(os.getcwd())
1809        self.src_dir = tempfile.mkdtemp(dir=basedir)
1810        self.dst_dir = tempfile.mkdtemp(dir=basedir)
1811        self.src_file = os.path.join(self.src_dir, filename)
1812        self.dst_file = os.path.join(self.dst_dir, filename)
1813        with open(self.src_file, "wb") as f:
1814            f.write(b"spam")
1815
1816    def tearDown(self):
1817        for d in (self.src_dir, self.dst_dir):
1818            try:
1819                if d:
1820                    shutil.rmtree(d)
1821            except:
1822                pass
1823
1824    def _check_move_file(self, src, dst, real_dst):
1825        with open(src, "rb") as f:
1826            contents = f.read()
1827        shutil.move(src, dst)
1828        with open(real_dst, "rb") as f:
1829            self.assertEqual(contents, f.read())
1830        self.assertFalse(os.path.exists(src))
1831
1832    def _check_move_dir(self, src, dst, real_dst):
1833        contents = sorted(os.listdir(src))
1834        shutil.move(src, dst)
1835        self.assertEqual(contents, sorted(os.listdir(real_dst)))
1836        self.assertFalse(os.path.exists(src))
1837
1838    def test_move_file(self):
1839        # Move a file to another location on the same filesystem.
1840        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1841
1842    def test_move_file_to_dir(self):
1843        # Move a file inside an existing dir on the same filesystem.
1844        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1845
1846    @mock_rename
1847    def test_move_file_other_fs(self):
1848        # Move a file to an existing dir on another filesystem.
1849        self.test_move_file()
1850
1851    @mock_rename
1852    def test_move_file_to_dir_other_fs(self):
1853        # Move a file to another location on another filesystem.
1854        self.test_move_file_to_dir()
1855
1856    def test_move_dir(self):
1857        # Move a dir to another location on the same filesystem.
1858        dst_dir = tempfile.mktemp()
1859        try:
1860            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1861        finally:
1862            try:
1863                shutil.rmtree(dst_dir)
1864            except:
1865                pass
1866
1867    @mock_rename
1868    def test_move_dir_other_fs(self):
1869        # Move a dir to another location on another filesystem.
1870        self.test_move_dir()
1871
1872    def test_move_dir_to_dir(self):
1873        # Move a dir inside an existing dir on the same filesystem.
1874        self._check_move_dir(self.src_dir, self.dst_dir,
1875            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1876
1877    @mock_rename
1878    def test_move_dir_to_dir_other_fs(self):
1879        # Move a dir inside an existing dir on another filesystem.
1880        self.test_move_dir_to_dir()
1881
1882    def test_move_dir_sep_to_dir(self):
1883        self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1884            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1885
1886    @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1887    def test_move_dir_altsep_to_dir(self):
1888        self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1889            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1890
1891    def test_existing_file_inside_dest_dir(self):
1892        # A file with the same name inside the destination dir already exists.
1893        with open(self.dst_file, "wb"):
1894            pass
1895        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1896
1897    def test_dont_move_dir_in_itself(self):
1898        # Moving a dir inside itself raises an Error.
1899        dst = os.path.join(self.src_dir, "bar")
1900        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1901
1902    def test_destinsrc_false_negative(self):
1903        os.mkdir(TESTFN)
1904        try:
1905            for src, dst in [('srcdir', 'srcdir/dest')]:
1906                src = os.path.join(TESTFN, src)
1907                dst = os.path.join(TESTFN, dst)
1908                self.assertTrue(shutil._destinsrc(src, dst),
1909                             msg='_destinsrc() wrongly concluded that '
1910                             'dst (%s) is not in src (%s)' % (dst, src))
1911        finally:
1912            shutil.rmtree(TESTFN, ignore_errors=True)
1913
1914    def test_destinsrc_false_positive(self):
1915        os.mkdir(TESTFN)
1916        try:
1917            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1918                src = os.path.join(TESTFN, src)
1919                dst = os.path.join(TESTFN, dst)
1920                self.assertFalse(shutil._destinsrc(src, dst),
1921                            msg='_destinsrc() wrongly concluded that '
1922                            'dst (%s) is in src (%s)' % (dst, src))
1923        finally:
1924            shutil.rmtree(TESTFN, ignore_errors=True)
1925
1926    @support.skip_unless_symlink
1927    @mock_rename
1928    def test_move_file_symlink(self):
1929        dst = os.path.join(self.src_dir, 'bar')
1930        os.symlink(self.src_file, dst)
1931        shutil.move(dst, self.dst_file)
1932        self.assertTrue(os.path.islink(self.dst_file))
1933        self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1934
1935    @support.skip_unless_symlink
1936    @mock_rename
1937    def test_move_file_symlink_to_dir(self):
1938        filename = "bar"
1939        dst = os.path.join(self.src_dir, filename)
1940        os.symlink(self.src_file, dst)
1941        shutil.move(dst, self.dst_dir)
1942        final_link = os.path.join(self.dst_dir, filename)
1943        self.assertTrue(os.path.islink(final_link))
1944        self.assertTrue(os.path.samefile(self.src_file, final_link))
1945
1946    @support.skip_unless_symlink
1947    @mock_rename
1948    def test_move_dangling_symlink(self):
1949        src = os.path.join(self.src_dir, 'baz')
1950        dst = os.path.join(self.src_dir, 'bar')
1951        os.symlink(src, dst)
1952        dst_link = os.path.join(self.dst_dir, 'quux')
1953        shutil.move(dst, dst_link)
1954        self.assertTrue(os.path.islink(dst_link))
1955        self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
1956
1957    @support.skip_unless_symlink
1958    @mock_rename
1959    def test_move_dir_symlink(self):
1960        src = os.path.join(self.src_dir, 'baz')
1961        dst = os.path.join(self.src_dir, 'bar')
1962        os.mkdir(src)
1963        os.symlink(src, dst)
1964        dst_link = os.path.join(self.dst_dir, 'quux')
1965        shutil.move(dst, dst_link)
1966        self.assertTrue(os.path.islink(dst_link))
1967        self.assertTrue(os.path.samefile(src, dst_link))
1968
1969    def test_move_return_value(self):
1970        rv = shutil.move(self.src_file, self.dst_dir)
1971        self.assertEqual(rv,
1972                os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1973
1974    def test_move_as_rename_return_value(self):
1975        rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1976        self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1977
1978    @mock_rename
1979    def test_move_file_special_function(self):
1980        moved = []
1981        def _copy(src, dst):
1982            moved.append((src, dst))
1983        shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1984        self.assertEqual(len(moved), 1)
1985
1986    @mock_rename
1987    def test_move_dir_special_function(self):
1988        moved = []
1989        def _copy(src, dst):
1990            moved.append((src, dst))
1991        support.create_empty_file(os.path.join(self.src_dir, 'child'))
1992        support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1993        shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1994        self.assertEqual(len(moved), 3)
1995
1996
1997class TestCopyFile(unittest.TestCase):
1998
1999    _delete = False
2000
2001    class Faux(object):
2002        _entered = False
2003        _exited_with = None
2004        _raised = False
2005        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2006            self._raise_in_exit = raise_in_exit
2007            self._suppress_at_exit = suppress_at_exit
2008        def read(self, *args):
2009            return ''
2010        def __enter__(self):
2011            self._entered = True
2012        def __exit__(self, exc_type, exc_val, exc_tb):
2013            self._exited_with = exc_type, exc_val, exc_tb
2014            if self._raise_in_exit:
2015                self._raised = True
2016                raise OSError("Cannot close")
2017            return self._suppress_at_exit
2018
2019    def tearDown(self):
2020        if self._delete:
2021            del shutil.open
2022
2023    def _set_shutil_open(self, func):
2024        shutil.open = func
2025        self._delete = True
2026
2027    def test_w_source_open_fails(self):
2028        def _open(filename, mode='r'):
2029            if filename == 'srcfile':
2030                raise OSError('Cannot open "srcfile"')
2031            assert 0  # shouldn't reach here.
2032
2033        self._set_shutil_open(_open)
2034
2035        self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
2036
2037    @unittest.skipIf(MACOS, "skipped on macOS")
2038    def test_w_dest_open_fails(self):
2039
2040        srcfile = self.Faux()
2041
2042        def _open(filename, mode='r'):
2043            if filename == 'srcfile':
2044                return srcfile
2045            if filename == 'destfile':
2046                raise OSError('Cannot open "destfile"')
2047            assert 0  # shouldn't reach here.
2048
2049        self._set_shutil_open(_open)
2050
2051        shutil.copyfile('srcfile', 'destfile')
2052        self.assertTrue(srcfile._entered)
2053        self.assertTrue(srcfile._exited_with[0] is OSError)
2054        self.assertEqual(srcfile._exited_with[1].args,
2055                         ('Cannot open "destfile"',))
2056
2057    @unittest.skipIf(MACOS, "skipped on macOS")
2058    def test_w_dest_close_fails(self):
2059
2060        srcfile = self.Faux()
2061        destfile = self.Faux(True)
2062
2063        def _open(filename, mode='r'):
2064            if filename == 'srcfile':
2065                return srcfile
2066            if filename == 'destfile':
2067                return destfile
2068            assert 0  # shouldn't reach here.
2069
2070        self._set_shutil_open(_open)
2071
2072        shutil.copyfile('srcfile', 'destfile')
2073        self.assertTrue(srcfile._entered)
2074        self.assertTrue(destfile._entered)
2075        self.assertTrue(destfile._raised)
2076        self.assertTrue(srcfile._exited_with[0] is OSError)
2077        self.assertEqual(srcfile._exited_with[1].args,
2078                         ('Cannot close',))
2079
2080    @unittest.skipIf(MACOS, "skipped on macOS")
2081    def test_w_source_close_fails(self):
2082
2083        srcfile = self.Faux(True)
2084        destfile = self.Faux()
2085
2086        def _open(filename, mode='r'):
2087            if filename == 'srcfile':
2088                return srcfile
2089            if filename == 'destfile':
2090                return destfile
2091            assert 0  # shouldn't reach here.
2092
2093        self._set_shutil_open(_open)
2094
2095        self.assertRaises(OSError,
2096                          shutil.copyfile, 'srcfile', 'destfile')
2097        self.assertTrue(srcfile._entered)
2098        self.assertTrue(destfile._entered)
2099        self.assertFalse(destfile._raised)
2100        self.assertTrue(srcfile._exited_with[0] is None)
2101        self.assertTrue(srcfile._raised)
2102
2103    def test_move_dir_caseinsensitive(self):
2104        # Renames a folder to the same name
2105        # but a different case.
2106
2107        self.src_dir = tempfile.mkdtemp()
2108        self.addCleanup(shutil.rmtree, self.src_dir, True)
2109        dst_dir = os.path.join(
2110                os.path.dirname(self.src_dir),
2111                os.path.basename(self.src_dir).upper())
2112        self.assertNotEqual(self.src_dir, dst_dir)
2113
2114        try:
2115            shutil.move(self.src_dir, dst_dir)
2116            self.assertTrue(os.path.isdir(dst_dir))
2117        finally:
2118            os.rmdir(dst_dir)
2119
2120
2121class TestCopyFileObj(unittest.TestCase):
2122    FILESIZE = 2 * 1024 * 1024
2123
2124    @classmethod
2125    def setUpClass(cls):
2126        write_test_file(TESTFN, cls.FILESIZE)
2127
2128    @classmethod
2129    def tearDownClass(cls):
2130        support.unlink(TESTFN)
2131        support.unlink(TESTFN2)
2132
2133    def tearDown(self):
2134        support.unlink(TESTFN2)
2135
2136    @contextlib.contextmanager
2137    def get_files(self):
2138        with open(TESTFN, "rb") as src:
2139            with open(TESTFN2, "wb") as dst:
2140                yield (src, dst)
2141
2142    def assert_files_eq(self, src, dst):
2143        with open(src, 'rb') as fsrc:
2144            with open(dst, 'rb') as fdst:
2145                self.assertEqual(fsrc.read(), fdst.read())
2146
2147    def test_content(self):
2148        with self.get_files() as (src, dst):
2149            shutil.copyfileobj(src, dst)
2150        self.assert_files_eq(TESTFN, TESTFN2)
2151
2152    def test_file_not_closed(self):
2153        with self.get_files() as (src, dst):
2154            shutil.copyfileobj(src, dst)
2155            assert not src.closed
2156            assert not dst.closed
2157
2158    def test_file_offset(self):
2159        with self.get_files() as (src, dst):
2160            shutil.copyfileobj(src, dst)
2161            self.assertEqual(src.tell(), self.FILESIZE)
2162            self.assertEqual(dst.tell(), self.FILESIZE)
2163
2164    @unittest.skipIf(os.name != 'nt', "Windows only")
2165    def test_win_impl(self):
2166        # Make sure alternate Windows implementation is called.
2167        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2168            shutil.copyfile(TESTFN, TESTFN2)
2169        assert m.called
2170
2171        # File size is 2 MiB but max buf size should be 1 MiB.
2172        self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2173
2174        # If file size < 1 MiB memoryview() length must be equal to
2175        # the actual file size.
2176        with tempfile.NamedTemporaryFile(delete=False) as f:
2177            f.write(b'foo')
2178        fname = f.name
2179        self.addCleanup(support.unlink, fname)
2180        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2181            shutil.copyfile(fname, TESTFN2)
2182        self.assertEqual(m.call_args[0][2], 3)
2183
2184        # Empty files should not rely on readinto() variant.
2185        with tempfile.NamedTemporaryFile(delete=False) as f:
2186            pass
2187        fname = f.name
2188        self.addCleanup(support.unlink, fname)
2189        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2190            shutil.copyfile(fname, TESTFN2)
2191        assert not m.called
2192        self.assert_files_eq(fname, TESTFN2)
2193
2194
2195class _ZeroCopyFileTest(object):
2196    """Tests common to all zero-copy APIs."""
2197    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
2198    FILEDATA = b""
2199    PATCHPOINT = ""
2200
2201    @classmethod
2202    def setUpClass(cls):
2203        write_test_file(TESTFN, cls.FILESIZE)
2204        with open(TESTFN, 'rb') as f:
2205            cls.FILEDATA = f.read()
2206            assert len(cls.FILEDATA) == cls.FILESIZE
2207
2208    @classmethod
2209    def tearDownClass(cls):
2210        support.unlink(TESTFN)
2211
2212    def tearDown(self):
2213        support.unlink(TESTFN2)
2214
2215    @contextlib.contextmanager
2216    def get_files(self):
2217        with open(TESTFN, "rb") as src:
2218            with open(TESTFN2, "wb") as dst:
2219                yield (src, dst)
2220
2221    def zerocopy_fun(self, *args, **kwargs):
2222        raise NotImplementedError("must be implemented in subclass")
2223
2224    def reset(self):
2225        self.tearDown()
2226        self.tearDownClass()
2227        self.setUpClass()
2228        self.setUp()
2229
2230    # ---
2231
2232    def test_regular_copy(self):
2233        with self.get_files() as (src, dst):
2234            self.zerocopy_fun(src, dst)
2235        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2236        # Make sure the fallback function is not called.
2237        with self.get_files() as (src, dst):
2238            with unittest.mock.patch('shutil.copyfileobj') as m:
2239                shutil.copyfile(TESTFN, TESTFN2)
2240            assert not m.called
2241
2242    def test_same_file(self):
2243        self.addCleanup(self.reset)
2244        with self.get_files() as (src, dst):
2245            with self.assertRaises(Exception):
2246                self.zerocopy_fun(src, src)
2247        # Make sure src file is not corrupted.
2248        self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2249
2250    def test_non_existent_src(self):
2251        name = tempfile.mktemp()
2252        with self.assertRaises(FileNotFoundError) as cm:
2253            shutil.copyfile(name, "new")
2254        self.assertEqual(cm.exception.filename, name)
2255
2256    def test_empty_file(self):
2257        srcname = TESTFN + 'src'
2258        dstname = TESTFN + 'dst'
2259        self.addCleanup(lambda: support.unlink(srcname))
2260        self.addCleanup(lambda: support.unlink(dstname))
2261        with open(srcname, "wb"):
2262            pass
2263
2264        with open(srcname, "rb") as src:
2265            with open(dstname, "wb") as dst:
2266                self.zerocopy_fun(src, dst)
2267
2268        self.assertEqual(read_file(dstname, binary=True), b"")
2269
2270    def test_unhandled_exception(self):
2271        with unittest.mock.patch(self.PATCHPOINT,
2272                                 side_effect=ZeroDivisionError):
2273            self.assertRaises(ZeroDivisionError,
2274                              shutil.copyfile, TESTFN, TESTFN2)
2275
2276    def test_exception_on_first_call(self):
2277        # Emulate a case where the first call to the zero-copy
2278        # function raises an exception in which case the function is
2279        # supposed to give up immediately.
2280        with unittest.mock.patch(self.PATCHPOINT,
2281                                 side_effect=OSError(errno.EINVAL, "yo")):
2282            with self.get_files() as (src, dst):
2283                with self.assertRaises(_GiveupOnFastCopy):
2284                    self.zerocopy_fun(src, dst)
2285
2286    def test_filesystem_full(self):
2287        # Emulate a case where filesystem is full and sendfile() fails
2288        # on first call.
2289        with unittest.mock.patch(self.PATCHPOINT,
2290                                 side_effect=OSError(errno.ENOSPC, "yo")):
2291            with self.get_files() as (src, dst):
2292                self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2293
2294
2295@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2296class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2297    PATCHPOINT = "os.sendfile"
2298
2299    def zerocopy_fun(self, fsrc, fdst):
2300        return shutil._fastcopy_sendfile(fsrc, fdst)
2301
2302    def test_non_regular_file_src(self):
2303        with io.BytesIO(self.FILEDATA) as src:
2304            with open(TESTFN2, "wb") as dst:
2305                with self.assertRaises(_GiveupOnFastCopy):
2306                    self.zerocopy_fun(src, dst)
2307                shutil.copyfileobj(src, dst)
2308
2309        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2310
2311    def test_non_regular_file_dst(self):
2312        with open(TESTFN, "rb") as src:
2313            with io.BytesIO() as dst:
2314                with self.assertRaises(_GiveupOnFastCopy):
2315                    self.zerocopy_fun(src, dst)
2316                shutil.copyfileobj(src, dst)
2317                dst.seek(0)
2318                self.assertEqual(dst.read(), self.FILEDATA)
2319
2320    def test_exception_on_second_call(self):
2321        def sendfile(*args, **kwargs):
2322            if not flag:
2323                flag.append(None)
2324                return orig_sendfile(*args, **kwargs)
2325            else:
2326                raise OSError(errno.EBADF, "yo")
2327
2328        flag = []
2329        orig_sendfile = os.sendfile
2330        with unittest.mock.patch('os.sendfile', create=True,
2331                                 side_effect=sendfile):
2332            with self.get_files() as (src, dst):
2333                with self.assertRaises(OSError) as cm:
2334                    shutil._fastcopy_sendfile(src, dst)
2335        assert flag
2336        self.assertEqual(cm.exception.errno, errno.EBADF)
2337
2338    def test_cant_get_size(self):
2339        # Emulate a case where src file size cannot be determined.
2340        # Internally bufsize will be set to a small value and
2341        # sendfile() will be called repeatedly.
2342        with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2343            with self.get_files() as (src, dst):
2344                shutil._fastcopy_sendfile(src, dst)
2345                assert m.called
2346        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2347
2348    def test_small_chunks(self):
2349        # Force internal file size detection to be smaller than the
2350        # actual file size. We want to force sendfile() to be called
2351        # multiple times, also in order to emulate a src fd which gets
2352        # bigger while it is being copied.
2353        mock = unittest.mock.Mock()
2354        mock.st_size = 65536 + 1
2355        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2356            with self.get_files() as (src, dst):
2357                shutil._fastcopy_sendfile(src, dst)
2358                assert m.called
2359        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2360
2361    def test_big_chunk(self):
2362        # Force internal file size detection to be +100MB bigger than
2363        # the actual file size. Make sure sendfile() does not rely on
2364        # file size value except for (maybe) a better throughput /
2365        # performance.
2366        mock = unittest.mock.Mock()
2367        mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2368        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2369            with self.get_files() as (src, dst):
2370                shutil._fastcopy_sendfile(src, dst)
2371                assert m.called
2372        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2373
2374    def test_blocksize_arg(self):
2375        with unittest.mock.patch('os.sendfile',
2376                                 side_effect=ZeroDivisionError) as m:
2377            self.assertRaises(ZeroDivisionError,
2378                              shutil.copyfile, TESTFN, TESTFN2)
2379            blocksize = m.call_args[0][3]
2380            # Make sure file size and the block size arg passed to
2381            # sendfile() are the same.
2382            self.assertEqual(blocksize, os.path.getsize(TESTFN))
2383            # ...unless we're dealing with a small file.
2384            support.unlink(TESTFN2)
2385            write_file(TESTFN2, b"hello", binary=True)
2386            self.addCleanup(support.unlink, TESTFN2 + '3')
2387            self.assertRaises(ZeroDivisionError,
2388                              shutil.copyfile, TESTFN2, TESTFN2 + '3')
2389            blocksize = m.call_args[0][3]
2390            self.assertEqual(blocksize, 2 ** 23)
2391
2392    def test_file2file_not_supported(self):
2393        # Emulate a case where sendfile() only support file->socket
2394        # fds. In such a case copyfile() is supposed to skip the
2395        # fast-copy attempt from then on.
2396        assert shutil._USE_CP_SENDFILE
2397        try:
2398            with unittest.mock.patch(
2399                    self.PATCHPOINT,
2400                    side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2401                with self.get_files() as (src, dst):
2402                    with self.assertRaises(_GiveupOnFastCopy):
2403                        shutil._fastcopy_sendfile(src, dst)
2404                assert m.called
2405            assert not shutil._USE_CP_SENDFILE
2406
2407            with unittest.mock.patch(self.PATCHPOINT) as m:
2408                shutil.copyfile(TESTFN, TESTFN2)
2409                assert not m.called
2410        finally:
2411            shutil._USE_CP_SENDFILE = True
2412
2413
2414@unittest.skipIf(not MACOS, 'macOS only')
2415class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
2416    PATCHPOINT = "posix._fcopyfile"
2417
2418    def zerocopy_fun(self, src, dst):
2419        return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
2420
2421
2422class TermsizeTests(unittest.TestCase):
2423    def test_does_not_crash(self):
2424        """Check if get_terminal_size() returns a meaningful value.
2425
2426        There's no easy portable way to actually check the size of the
2427        terminal, so let's check if it returns something sensible instead.
2428        """
2429        size = shutil.get_terminal_size()
2430        self.assertGreaterEqual(size.columns, 0)
2431        self.assertGreaterEqual(size.lines, 0)
2432
2433    def test_os_environ_first(self):
2434        "Check if environment variables have precedence"
2435
2436        with support.EnvironmentVarGuard() as env:
2437            env['COLUMNS'] = '777'
2438            del env['LINES']
2439            size = shutil.get_terminal_size()
2440        self.assertEqual(size.columns, 777)
2441
2442        with support.EnvironmentVarGuard() as env:
2443            del env['COLUMNS']
2444            env['LINES'] = '888'
2445            size = shutil.get_terminal_size()
2446        self.assertEqual(size.lines, 888)
2447
2448    def test_bad_environ(self):
2449        with support.EnvironmentVarGuard() as env:
2450            env['COLUMNS'] = 'xxx'
2451            env['LINES'] = 'yyy'
2452            size = shutil.get_terminal_size()
2453        self.assertGreaterEqual(size.columns, 0)
2454        self.assertGreaterEqual(size.lines, 0)
2455
2456    @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
2457    @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2458                         'need os.get_terminal_size()')
2459    def test_stty_match(self):
2460        """Check if stty returns the same results ignoring env
2461
2462        This test will fail if stdin and stdout are connected to
2463        different terminals with different sizes. Nevertheless, such
2464        situations should be pretty rare.
2465        """
2466        try:
2467            size = subprocess.check_output(['stty', 'size']).decode().split()
2468        except (FileNotFoundError, PermissionError,
2469                subprocess.CalledProcessError):
2470            self.skipTest("stty invocation failed")
2471        expected = (int(size[1]), int(size[0])) # reversed order
2472
2473        with support.EnvironmentVarGuard() as env:
2474            del env['LINES']
2475            del env['COLUMNS']
2476            actual = shutil.get_terminal_size()
2477
2478        self.assertEqual(expected, actual)
2479
2480    def test_fallback(self):
2481        with support.EnvironmentVarGuard() as env:
2482            del env['LINES']
2483            del env['COLUMNS']
2484
2485            # sys.__stdout__ has no fileno()
2486            with support.swap_attr(sys, '__stdout__', None):
2487                size = shutil.get_terminal_size(fallback=(10, 20))
2488            self.assertEqual(size.columns, 10)
2489            self.assertEqual(size.lines, 20)
2490
2491            # sys.__stdout__ is not a terminal on Unix
2492            # or fileno() not in (0, 1, 2) on Windows
2493            with open(os.devnull, 'w') as f, \
2494                 support.swap_attr(sys, '__stdout__', f):
2495                size = shutil.get_terminal_size(fallback=(30, 40))
2496            self.assertEqual(size.columns, 30)
2497            self.assertEqual(size.lines, 40)
2498
2499
2500class PublicAPITests(unittest.TestCase):
2501    """Ensures that the correct values are exposed in the public API."""
2502
2503    def test_module_all_attribute(self):
2504        self.assertTrue(hasattr(shutil, '__all__'))
2505        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2506                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2507                      'SpecialFileError', 'ExecError', 'make_archive',
2508                      'get_archive_formats', 'register_archive_format',
2509                      'unregister_archive_format', 'get_unpack_formats',
2510                      'register_unpack_format', 'unregister_unpack_format',
2511                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
2512                      'get_terminal_size', 'SameFileError']
2513        if hasattr(os, 'statvfs') or os.name == 'nt':
2514            target_api.append('disk_usage')
2515        self.assertEqual(set(shutil.__all__), set(target_api))
2516
2517
2518if __name__ == '__main__':
2519    unittest.main()
2520