• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import unittest.mock
5import shutil
6import tempfile
7import sys
8import stat
9import os
10import os.path
11import errno
12import functools
13import pathlib
14import subprocess
15import random
16import string
17import contextlib
18import io
19from shutil import (make_archive,
20                    register_archive_format, unregister_archive_format,
21                    get_archive_formats, Error, unpack_archive,
22                    register_unpack_format, RegistryError,
23                    unregister_unpack_format, get_unpack_formats,
24                    SameFileError, _GiveupOnFastCopy)
25import tarfile
26import zipfile
27try:
28    import posix
29except ImportError:
30    posix = None
31
32from test import support
33from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
35
36TESTFN2 = TESTFN + "2"
37TESTFN_SRC = TESTFN + "_SRC"
38TESTFN_DST = TESTFN + "_DST"
39MACOS = sys.platform.startswith("darwin")
40SOLARIS = sys.platform.startswith("sunos")
41AIX = sys.platform[:3] == 'aix'
42try:
43    import grp
44    import pwd
45    UID_GID_SUPPORT = True
46except ImportError:
47    UID_GID_SUPPORT = False
48
49try:
50    import _winapi
51except ImportError:
52    _winapi = None
53
54def _fake_rename(*args, **kwargs):
55    # Pretend the destination path is on a different filesystem.
56    raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
57
58def mock_rename(func):
59    @functools.wraps(func)
60    def wrap(*args, **kwargs):
61        try:
62            builtin_rename = os.rename
63            os.rename = _fake_rename
64            return func(*args, **kwargs)
65        finally:
66            os.rename = builtin_rename
67    return wrap
68
69def write_file(path, content, binary=False):
70    """Write *content* to a file located at *path*.
71
72    If *path* is a tuple instead of a string, os.path.join will be used to
73    make a path.  If *binary* is true, the file will be opened in binary
74    mode.
75    """
76    if isinstance(path, tuple):
77        path = os.path.join(*path)
78    mode = 'wb' if binary else 'w'
79    encoding = None if binary else "utf-8"
80    with open(path, mode, encoding=encoding) as fp:
81        fp.write(content)
82
83def write_test_file(path, size):
84    """Create a test file with an arbitrary size and random text content."""
85    def chunks(total, step):
86        assert total >= step
87        while total > step:
88            yield step
89            total -= step
90        if total:
91            yield total
92
93    bufsize = min(size, 8192)
94    chunk = b"".join([random.choice(string.ascii_letters).encode()
95                      for i in range(bufsize)])
96    with open(path, 'wb') as f:
97        for csize in chunks(size, bufsize):
98            f.write(chunk)
99    assert os.path.getsize(path) == size
100
101def read_file(path, binary=False):
102    """Return contents from a file located at *path*.
103
104    If *path* is a tuple instead of a string, os.path.join will be used to
105    make a path.  If *binary* is true, the file will be opened in binary
106    mode.
107    """
108    if isinstance(path, tuple):
109        path = os.path.join(*path)
110    mode = 'rb' if binary else 'r'
111    encoding = None if binary else "utf-8"
112    with open(path, mode, encoding=encoding) as fp:
113        return fp.read()
114
115def rlistdir(path):
116    res = []
117    for name in sorted(os.listdir(path)):
118        p = os.path.join(path, name)
119        if os.path.isdir(p) and not os.path.islink(p):
120            res.append(name + '/')
121            for n in rlistdir(p):
122                res.append(name + '/' + n)
123        else:
124            res.append(name)
125    return res
126
127def supports_file2file_sendfile():
128    # ...apparently Linux and Solaris are the only ones
129    if not hasattr(os, "sendfile"):
130        return False
131    srcname = None
132    dstname = None
133    try:
134        with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
135            srcname = f.name
136            f.write(b"0123456789")
137
138        with open(srcname, "rb") as src:
139            with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
140                dstname = dst.name
141                infd = src.fileno()
142                outfd = dst.fileno()
143                try:
144                    os.sendfile(outfd, infd, 0, 2)
145                except OSError:
146                    return False
147                else:
148                    return True
149    finally:
150        if srcname is not None:
151            os_helper.unlink(srcname)
152        if dstname is not None:
153            os_helper.unlink(dstname)
154
155
156SUPPORTS_SENDFILE = supports_file2file_sendfile()
157
158# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
159# The AIX command 'dump -o program' gives XCOFF header information
160# The second word of the last line in the maxdata value
161# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
162def _maxdataOK():
163    if AIX and sys.maxsize == 2147483647:
164        hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
165        maxdata=hdrs.split("\n")[-1].split()[1]
166        return int(maxdata,16) >= 0x20000000
167    else:
168        return True
169
170
171class BaseTest:
172
173    def mkdtemp(self, prefix=None):
174        """Create a temporary directory that will be cleaned up.
175
176        Returns the path of the directory.
177        """
178        d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
179        self.addCleanup(os_helper.rmtree, d)
180        return d
181
182
183class TestRmTree(BaseTest, unittest.TestCase):
184
185    def test_rmtree_works_on_bytes(self):
186        tmp = self.mkdtemp()
187        victim = os.path.join(tmp, 'killme')
188        os.mkdir(victim)
189        write_file(os.path.join(victim, 'somefile'), 'foo')
190        victim = os.fsencode(victim)
191        self.assertIsInstance(victim, bytes)
192        shutil.rmtree(victim)
193
194    @os_helper.skip_unless_symlink
195    def test_rmtree_fails_on_symlink(self):
196        tmp = self.mkdtemp()
197        dir_ = os.path.join(tmp, 'dir')
198        os.mkdir(dir_)
199        link = os.path.join(tmp, 'link')
200        os.symlink(dir_, link)
201        self.assertRaises(OSError, shutil.rmtree, link)
202        self.assertTrue(os.path.exists(dir_))
203        self.assertTrue(os.path.lexists(link))
204        errors = []
205        def onerror(*args):
206            errors.append(args)
207        shutil.rmtree(link, onerror=onerror)
208        self.assertEqual(len(errors), 1)
209        self.assertIs(errors[0][0], os.path.islink)
210        self.assertEqual(errors[0][1], link)
211        self.assertIsInstance(errors[0][2][1], OSError)
212
213    @os_helper.skip_unless_symlink
214    def test_rmtree_works_on_symlinks(self):
215        tmp = self.mkdtemp()
216        dir1 = os.path.join(tmp, 'dir1')
217        dir2 = os.path.join(dir1, 'dir2')
218        dir3 = os.path.join(tmp, 'dir3')
219        for d in dir1, dir2, dir3:
220            os.mkdir(d)
221        file1 = os.path.join(tmp, 'file1')
222        write_file(file1, 'foo')
223        link1 = os.path.join(dir1, 'link1')
224        os.symlink(dir2, link1)
225        link2 = os.path.join(dir1, 'link2')
226        os.symlink(dir3, link2)
227        link3 = os.path.join(dir1, 'link3')
228        os.symlink(file1, link3)
229        # make sure symlinks are removed but not followed
230        shutil.rmtree(dir1)
231        self.assertFalse(os.path.exists(dir1))
232        self.assertTrue(os.path.exists(dir3))
233        self.assertTrue(os.path.exists(file1))
234
235    @unittest.skipUnless(_winapi, 'only relevant on Windows')
236    def test_rmtree_fails_on_junctions(self):
237        tmp = self.mkdtemp()
238        dir_ = os.path.join(tmp, 'dir')
239        os.mkdir(dir_)
240        link = os.path.join(tmp, 'link')
241        _winapi.CreateJunction(dir_, link)
242        self.addCleanup(os_helper.unlink, link)
243        self.assertRaises(OSError, shutil.rmtree, link)
244        self.assertTrue(os.path.exists(dir_))
245        self.assertTrue(os.path.lexists(link))
246        errors = []
247        def onerror(*args):
248            errors.append(args)
249        shutil.rmtree(link, onerror=onerror)
250        self.assertEqual(len(errors), 1)
251        self.assertIs(errors[0][0], os.path.islink)
252        self.assertEqual(errors[0][1], link)
253        self.assertIsInstance(errors[0][2][1], OSError)
254
255    @unittest.skipUnless(_winapi, 'only relevant on Windows')
256    def test_rmtree_works_on_junctions(self):
257        tmp = self.mkdtemp()
258        dir1 = os.path.join(tmp, 'dir1')
259        dir2 = os.path.join(dir1, 'dir2')
260        dir3 = os.path.join(tmp, 'dir3')
261        for d in dir1, dir2, dir3:
262            os.mkdir(d)
263        file1 = os.path.join(tmp, 'file1')
264        write_file(file1, 'foo')
265        link1 = os.path.join(dir1, 'link1')
266        _winapi.CreateJunction(dir2, link1)
267        link2 = os.path.join(dir1, 'link2')
268        _winapi.CreateJunction(dir3, link2)
269        link3 = os.path.join(dir1, 'link3')
270        _winapi.CreateJunction(file1, link3)
271        # make sure junctions are removed but not followed
272        shutil.rmtree(dir1)
273        self.assertFalse(os.path.exists(dir1))
274        self.assertTrue(os.path.exists(dir3))
275        self.assertTrue(os.path.exists(file1))
276
277    def test_rmtree_errors(self):
278        # filename is guaranteed not to exist
279        filename = tempfile.mktemp(dir=self.mkdtemp())
280        self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
281        # test that ignore_errors option is honored
282        shutil.rmtree(filename, ignore_errors=True)
283
284        # existing file
285        tmpdir = self.mkdtemp()
286        write_file((tmpdir, "tstfile"), "")
287        filename = os.path.join(tmpdir, "tstfile")
288        with self.assertRaises(NotADirectoryError) as cm:
289            shutil.rmtree(filename)
290        self.assertEqual(cm.exception.filename, filename)
291        self.assertTrue(os.path.exists(filename))
292        # test that ignore_errors option is honored
293        shutil.rmtree(filename, ignore_errors=True)
294        self.assertTrue(os.path.exists(filename))
295        errors = []
296        def onerror(*args):
297            errors.append(args)
298        shutil.rmtree(filename, onerror=onerror)
299        self.assertEqual(len(errors), 2)
300        self.assertIs(errors[0][0], os.scandir)
301        self.assertEqual(errors[0][1], filename)
302        self.assertIsInstance(errors[0][2][1], NotADirectoryError)
303        self.assertEqual(errors[0][2][1].filename, filename)
304        self.assertIs(errors[1][0], os.rmdir)
305        self.assertEqual(errors[1][1], filename)
306        self.assertIsInstance(errors[1][2][1], NotADirectoryError)
307        self.assertEqual(errors[1][2][1].filename, filename)
308
309
310    @unittest.skipIf(sys.platform[:6] == 'cygwin',
311                     "This test can't be run on Cygwin (issue #1071513).")
312    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
313                     "This test can't be run reliably as root (issue #1076467).")
314    def test_on_error(self):
315        self.errorState = 0
316        os.mkdir(TESTFN)
317        self.addCleanup(shutil.rmtree, TESTFN)
318
319        self.child_file_path = os.path.join(TESTFN, 'a')
320        self.child_dir_path = os.path.join(TESTFN, 'b')
321        os_helper.create_empty_file(self.child_file_path)
322        os.mkdir(self.child_dir_path)
323        old_dir_mode = os.stat(TESTFN).st_mode
324        old_child_file_mode = os.stat(self.child_file_path).st_mode
325        old_child_dir_mode = os.stat(self.child_dir_path).st_mode
326        # Make unwritable.
327        new_mode = stat.S_IREAD|stat.S_IEXEC
328        os.chmod(self.child_file_path, new_mode)
329        os.chmod(self.child_dir_path, new_mode)
330        os.chmod(TESTFN, new_mode)
331
332        self.addCleanup(os.chmod, TESTFN, old_dir_mode)
333        self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
334        self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
335
336        shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
337        # Test whether onerror has actually been called.
338        self.assertEqual(self.errorState, 3,
339                         "Expected call to onerror function did not happen.")
340
341    def check_args_to_onerror(self, func, arg, exc):
342        # test_rmtree_errors deliberately runs rmtree
343        # on a directory that is chmod 500, which will fail.
344        # This function is run when shutil.rmtree fails.
345        # 99.9% of the time it initially fails to remove
346        # a file in the directory, so the first time through
347        # func is os.remove.
348        # However, some Linux machines running ZFS on
349        # FUSE experienced a failure earlier in the process
350        # at os.listdir.  The first failure may legally
351        # be either.
352        if self.errorState < 2:
353            if func is os.unlink:
354                self.assertEqual(arg, self.child_file_path)
355            elif func is os.rmdir:
356                self.assertEqual(arg, self.child_dir_path)
357            else:
358                self.assertIs(func, os.listdir)
359                self.assertIn(arg, [TESTFN, self.child_dir_path])
360            self.assertTrue(issubclass(exc[0], OSError))
361            self.errorState += 1
362        else:
363            self.assertEqual(func, os.rmdir)
364            self.assertEqual(arg, TESTFN)
365            self.assertTrue(issubclass(exc[0], OSError))
366            self.errorState = 3
367
368    def test_rmtree_does_not_choke_on_failing_lstat(self):
369        try:
370            orig_lstat = os.lstat
371            def raiser(fn, *args, **kwargs):
372                if fn != TESTFN:
373                    raise OSError()
374                else:
375                    return orig_lstat(fn)
376            os.lstat = raiser
377
378            os.mkdir(TESTFN)
379            write_file((TESTFN, 'foo'), 'foo')
380            shutil.rmtree(TESTFN)
381        finally:
382            os.lstat = orig_lstat
383
384    def test_rmtree_uses_safe_fd_version_if_available(self):
385        _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
386                             os.supports_dir_fd and
387                             os.listdir in os.supports_fd and
388                             os.stat in os.supports_follow_symlinks)
389        if _use_fd_functions:
390            self.assertTrue(shutil._use_fd_functions)
391            self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
392            tmp_dir = self.mkdtemp()
393            d = os.path.join(tmp_dir, 'a')
394            os.mkdir(d)
395            try:
396                real_rmtree = shutil._rmtree_safe_fd
397                class Called(Exception): pass
398                def _raiser(*args, **kwargs):
399                    raise Called
400                shutil._rmtree_safe_fd = _raiser
401                self.assertRaises(Called, shutil.rmtree, d)
402            finally:
403                shutil._rmtree_safe_fd = real_rmtree
404        else:
405            self.assertFalse(shutil._use_fd_functions)
406            self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
407
408    def test_rmtree_dont_delete_file(self):
409        # When called on a file instead of a directory, don't delete it.
410        handle, path = tempfile.mkstemp(dir=self.mkdtemp())
411        os.close(handle)
412        self.assertRaises(NotADirectoryError, shutil.rmtree, path)
413        os.remove(path)
414
415    @os_helper.skip_unless_symlink
416    def test_rmtree_on_symlink(self):
417        # bug 1669.
418        os.mkdir(TESTFN)
419        try:
420            src = os.path.join(TESTFN, 'cheese')
421            dst = os.path.join(TESTFN, 'shop')
422            os.mkdir(src)
423            os.symlink(src, dst)
424            self.assertRaises(OSError, shutil.rmtree, dst)
425            shutil.rmtree(dst, ignore_errors=True)
426        finally:
427            shutil.rmtree(TESTFN, ignore_errors=True)
428
429    @unittest.skipUnless(_winapi, 'only relevant on Windows')
430    def test_rmtree_on_junction(self):
431        os.mkdir(TESTFN)
432        try:
433            src = os.path.join(TESTFN, 'cheese')
434            dst = os.path.join(TESTFN, 'shop')
435            os.mkdir(src)
436            open(os.path.join(src, 'spam'), 'wb').close()
437            _winapi.CreateJunction(src, dst)
438            self.assertRaises(OSError, shutil.rmtree, dst)
439            shutil.rmtree(dst, ignore_errors=True)
440        finally:
441            shutil.rmtree(TESTFN, ignore_errors=True)
442
443
444class TestCopyTree(BaseTest, unittest.TestCase):
445
446    def test_copytree_simple(self):
447        src_dir = self.mkdtemp()
448        dst_dir = os.path.join(self.mkdtemp(), 'destination')
449        self.addCleanup(shutil.rmtree, src_dir)
450        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
451        write_file((src_dir, 'test.txt'), '123')
452        os.mkdir(os.path.join(src_dir, 'test_dir'))
453        write_file((src_dir, 'test_dir', 'test.txt'), '456')
454
455        shutil.copytree(src_dir, dst_dir)
456        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
457        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
458        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
459                                                    'test.txt')))
460        actual = read_file((dst_dir, 'test.txt'))
461        self.assertEqual(actual, '123')
462        actual = read_file((dst_dir, 'test_dir', 'test.txt'))
463        self.assertEqual(actual, '456')
464
465    def test_copytree_dirs_exist_ok(self):
466        src_dir = self.mkdtemp()
467        dst_dir = self.mkdtemp()
468        self.addCleanup(shutil.rmtree, src_dir)
469        self.addCleanup(shutil.rmtree, dst_dir)
470
471        write_file((src_dir, 'nonexisting.txt'), '123')
472        os.mkdir(os.path.join(src_dir, 'existing_dir'))
473        os.mkdir(os.path.join(dst_dir, 'existing_dir'))
474        write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
475        write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
476
477        shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
478        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
479        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
480        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
481                                                    'existing.txt')))
482        actual = read_file((dst_dir, 'nonexisting.txt'))
483        self.assertEqual(actual, '123')
484        actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
485        self.assertEqual(actual, 'has been replaced')
486
487        with self.assertRaises(FileExistsError):
488            shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
489
490    @os_helper.skip_unless_symlink
491    def test_copytree_symlinks(self):
492        tmp_dir = self.mkdtemp()
493        src_dir = os.path.join(tmp_dir, 'src')
494        dst_dir = os.path.join(tmp_dir, 'dst')
495        sub_dir = os.path.join(src_dir, 'sub')
496        os.mkdir(src_dir)
497        os.mkdir(sub_dir)
498        write_file((src_dir, 'file.txt'), 'foo')
499        src_link = os.path.join(sub_dir, 'link')
500        dst_link = os.path.join(dst_dir, 'sub/link')
501        os.symlink(os.path.join(src_dir, 'file.txt'),
502                   src_link)
503        if hasattr(os, 'lchmod'):
504            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
505        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
506            os.lchflags(src_link, stat.UF_NODUMP)
507        src_stat = os.lstat(src_link)
508        shutil.copytree(src_dir, dst_dir, symlinks=True)
509        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
510        actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
511        # Bad practice to blindly strip the prefix as it may be required to
512        # correctly refer to the file, but we're only comparing paths here.
513        if os.name == 'nt' and actual.startswith('\\\\?\\'):
514            actual = actual[4:]
515        self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
516        dst_stat = os.lstat(dst_link)
517        if hasattr(os, 'lchmod'):
518            self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
519        if hasattr(os, 'lchflags'):
520            self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
521
522    def test_copytree_with_exclude(self):
523        # creating data
524        join = os.path.join
525        exists = os.path.exists
526        src_dir = self.mkdtemp()
527        try:
528            dst_dir = join(self.mkdtemp(), 'destination')
529            write_file((src_dir, 'test.txt'), '123')
530            write_file((src_dir, 'test.tmp'), '123')
531            os.mkdir(join(src_dir, 'test_dir'))
532            write_file((src_dir, 'test_dir', 'test.txt'), '456')
533            os.mkdir(join(src_dir, 'test_dir2'))
534            write_file((src_dir, 'test_dir2', 'test.txt'), '456')
535            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
536            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
537            write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
538            write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
539
540            # testing glob-like patterns
541            try:
542                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
543                shutil.copytree(src_dir, dst_dir, ignore=patterns)
544                # checking the result: some elements should not be copied
545                self.assertTrue(exists(join(dst_dir, 'test.txt')))
546                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
547                self.assertFalse(exists(join(dst_dir, 'test_dir2')))
548            finally:
549                shutil.rmtree(dst_dir)
550            try:
551                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
552                shutil.copytree(src_dir, dst_dir, ignore=patterns)
553                # checking the result: some elements should not be copied
554                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
555                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
556                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
557            finally:
558                shutil.rmtree(dst_dir)
559
560            # testing callable-style
561            try:
562                def _filter(src, names):
563                    res = []
564                    for name in names:
565                        path = os.path.join(src, name)
566
567                        if (os.path.isdir(path) and
568                            path.split()[-1] == 'subdir'):
569                            res.append(name)
570                        elif os.path.splitext(path)[-1] in ('.py'):
571                            res.append(name)
572                    return res
573
574                shutil.copytree(src_dir, dst_dir, ignore=_filter)
575
576                # checking the result: some elements should not be copied
577                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
578                                             'test.py')))
579                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
580
581            finally:
582                shutil.rmtree(dst_dir)
583        finally:
584            shutil.rmtree(src_dir)
585            shutil.rmtree(os.path.dirname(dst_dir))
586
587    def test_copytree_arg_types_of_ignore(self):
588        join = os.path.join
589        exists = os.path.exists
590
591        tmp_dir = self.mkdtemp()
592        src_dir = join(tmp_dir, "source")
593
594        os.mkdir(join(src_dir))
595        os.mkdir(join(src_dir, 'test_dir'))
596        os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
597        write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
598
599        invokations = []
600
601        def _ignore(src, names):
602            invokations.append(src)
603            self.assertIsInstance(src, str)
604            self.assertIsInstance(names, list)
605            self.assertEqual(len(names), len(set(names)))
606            for name in names:
607                self.assertIsInstance(name, str)
608            return []
609
610        dst_dir = join(self.mkdtemp(), 'destination')
611        shutil.copytree(src_dir, dst_dir, ignore=_ignore)
612        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
613                                    'test.txt')))
614
615        dst_dir = join(self.mkdtemp(), 'destination')
616        shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
617        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
618                                    'test.txt')))
619
620        dst_dir = join(self.mkdtemp(), 'destination')
621        src_dir_entry = list(os.scandir(tmp_dir))[0]
622        self.assertIsInstance(src_dir_entry, os.DirEntry)
623        shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
624        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
625                                    'test.txt')))
626
627        self.assertEqual(len(invokations), 9)
628
629    def test_copytree_retains_permissions(self):
630        tmp_dir = self.mkdtemp()
631        src_dir = os.path.join(tmp_dir, 'source')
632        os.mkdir(src_dir)
633        dst_dir = os.path.join(tmp_dir, 'destination')
634        self.addCleanup(shutil.rmtree, tmp_dir)
635
636        os.chmod(src_dir, 0o777)
637        write_file((src_dir, 'permissive.txt'), '123')
638        os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
639        write_file((src_dir, 'restrictive.txt'), '456')
640        os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
641        restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
642        self.addCleanup(os_helper.rmtree, restrictive_subdir)
643        os.chmod(restrictive_subdir, 0o600)
644
645        shutil.copytree(src_dir, dst_dir)
646        self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
647        self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
648                          os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
649        self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
650                          os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
651        restrictive_subdir_dst = os.path.join(dst_dir,
652                                              os.path.split(restrictive_subdir)[1])
653        self.assertEqual(os.stat(restrictive_subdir).st_mode,
654                          os.stat(restrictive_subdir_dst).st_mode)
655
656    @unittest.mock.patch('os.chmod')
657    def test_copytree_winerror(self, mock_patch):
658        # When copying to VFAT, copystat() raises OSError. On Windows, the
659        # exception object has a meaningful 'winerror' attribute, but not
660        # on other operating systems. Do not assume 'winerror' is set.
661        src_dir = self.mkdtemp()
662        dst_dir = os.path.join(self.mkdtemp(), 'destination')
663        self.addCleanup(shutil.rmtree, src_dir)
664        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
665
666        mock_patch.side_effect = PermissionError('ka-boom')
667        with self.assertRaises(shutil.Error):
668            shutil.copytree(src_dir, dst_dir)
669
670    def test_copytree_custom_copy_function(self):
671        # See: https://bugs.python.org/issue35648
672        def custom_cpfun(a, b):
673            flag.append(None)
674            self.assertIsInstance(a, str)
675            self.assertIsInstance(b, str)
676            self.assertEqual(a, os.path.join(src, 'foo'))
677            self.assertEqual(b, os.path.join(dst, 'foo'))
678
679        flag = []
680        src = self.mkdtemp()
681        dst = tempfile.mktemp(dir=self.mkdtemp())
682        with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f:
683            f.close()
684        shutil.copytree(src, dst, copy_function=custom_cpfun)
685        self.assertEqual(len(flag), 1)
686
687    # Issue #3002: copyfile and copytree block indefinitely on named pipes
688    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
689    @os_helper.skip_unless_symlink
690    @unittest.skipIf(sys.platform == "vxworks",
691                    "fifo requires special path on VxWorks")
692    def test_copytree_named_pipe(self):
693        os.mkdir(TESTFN)
694        try:
695            subdir = os.path.join(TESTFN, "subdir")
696            os.mkdir(subdir)
697            pipe = os.path.join(subdir, "mypipe")
698            try:
699                os.mkfifo(pipe)
700            except PermissionError as e:
701                self.skipTest('os.mkfifo(): %s' % e)
702            try:
703                shutil.copytree(TESTFN, TESTFN2)
704            except shutil.Error as e:
705                errors = e.args[0]
706                self.assertEqual(len(errors), 1)
707                src, dst, error_msg = errors[0]
708                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
709            else:
710                self.fail("shutil.Error should have been raised")
711        finally:
712            shutil.rmtree(TESTFN, ignore_errors=True)
713            shutil.rmtree(TESTFN2, ignore_errors=True)
714
715    def test_copytree_special_func(self):
716        src_dir = self.mkdtemp()
717        dst_dir = os.path.join(self.mkdtemp(), 'destination')
718        write_file((src_dir, 'test.txt'), '123')
719        os.mkdir(os.path.join(src_dir, 'test_dir'))
720        write_file((src_dir, 'test_dir', 'test.txt'), '456')
721
722        copied = []
723        def _copy(src, dst):
724            copied.append((src, dst))
725
726        shutil.copytree(src_dir, dst_dir, copy_function=_copy)
727        self.assertEqual(len(copied), 2)
728
729    @os_helper.skip_unless_symlink
730    def test_copytree_dangling_symlinks(self):
731        # a dangling symlink raises an error at the end
732        src_dir = self.mkdtemp()
733        dst_dir = os.path.join(self.mkdtemp(), 'destination')
734        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
735        os.mkdir(os.path.join(src_dir, 'test_dir'))
736        write_file((src_dir, 'test_dir', 'test.txt'), '456')
737        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
738
739        # a dangling symlink is ignored with the proper flag
740        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
741        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
742        self.assertNotIn('test.txt', os.listdir(dst_dir))
743
744        # a dangling symlink is copied if symlinks=True
745        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
746        shutil.copytree(src_dir, dst_dir, symlinks=True)
747        self.assertIn('test.txt', os.listdir(dst_dir))
748
749    @os_helper.skip_unless_symlink
750    def test_copytree_symlink_dir(self):
751        src_dir = self.mkdtemp()
752        dst_dir = os.path.join(self.mkdtemp(), 'destination')
753        os.mkdir(os.path.join(src_dir, 'real_dir'))
754        with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'):
755            pass
756        os.symlink(os.path.join(src_dir, 'real_dir'),
757                   os.path.join(src_dir, 'link_to_dir'),
758                   target_is_directory=True)
759
760        shutil.copytree(src_dir, dst_dir, symlinks=False)
761        self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
762        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
763
764        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
765        shutil.copytree(src_dir, dst_dir, symlinks=True)
766        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
767        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
768
769    def test_copytree_return_value(self):
770        # copytree returns its destination path.
771        src_dir = self.mkdtemp()
772        dst_dir = src_dir + "dest"
773        self.addCleanup(shutil.rmtree, dst_dir, True)
774        src = os.path.join(src_dir, 'foo')
775        write_file(src, 'foo')
776        rv = shutil.copytree(src_dir, dst_dir)
777        self.assertEqual(['foo'], os.listdir(rv))
778
779    def test_copytree_subdirectory(self):
780        # copytree where dst is a subdirectory of src, see Issue 38688
781        base_dir = self.mkdtemp()
782        self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
783        src_dir = os.path.join(base_dir, "t", "pg")
784        dst_dir = os.path.join(src_dir, "somevendor", "1.0")
785        os.makedirs(src_dir)
786        src = os.path.join(src_dir, 'pol')
787        write_file(src, 'pol')
788        rv = shutil.copytree(src_dir, dst_dir)
789        self.assertEqual(['pol'], os.listdir(rv))
790
791class TestCopy(BaseTest, unittest.TestCase):
792
793    ### shutil.copymode
794
795    @os_helper.skip_unless_symlink
796    def test_copymode_follow_symlinks(self):
797        tmp_dir = self.mkdtemp()
798        src = os.path.join(tmp_dir, 'foo')
799        dst = os.path.join(tmp_dir, 'bar')
800        src_link = os.path.join(tmp_dir, 'baz')
801        dst_link = os.path.join(tmp_dir, 'quux')
802        write_file(src, 'foo')
803        write_file(dst, 'foo')
804        os.symlink(src, src_link)
805        os.symlink(dst, dst_link)
806        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
807        # file to file
808        os.chmod(dst, stat.S_IRWXO)
809        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
810        shutil.copymode(src, dst)
811        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
812        # On Windows, os.chmod does not follow symlinks (issue #15411)
813        if os.name != 'nt':
814            # follow src link
815            os.chmod(dst, stat.S_IRWXO)
816            shutil.copymode(src_link, dst)
817            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
818            # follow dst link
819            os.chmod(dst, stat.S_IRWXO)
820            shutil.copymode(src, dst_link)
821            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
822            # follow both links
823            os.chmod(dst, stat.S_IRWXO)
824            shutil.copymode(src_link, dst_link)
825            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
826
827    @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
828    @os_helper.skip_unless_symlink
829    def test_copymode_symlink_to_symlink(self):
830        tmp_dir = self.mkdtemp()
831        src = os.path.join(tmp_dir, 'foo')
832        dst = os.path.join(tmp_dir, 'bar')
833        src_link = os.path.join(tmp_dir, 'baz')
834        dst_link = os.path.join(tmp_dir, 'quux')
835        write_file(src, 'foo')
836        write_file(dst, 'foo')
837        os.symlink(src, src_link)
838        os.symlink(dst, dst_link)
839        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
840        os.chmod(dst, stat.S_IRWXU)
841        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
842        # link to link
843        os.lchmod(dst_link, stat.S_IRWXO)
844        shutil.copymode(src_link, dst_link, follow_symlinks=False)
845        self.assertEqual(os.lstat(src_link).st_mode,
846                         os.lstat(dst_link).st_mode)
847        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
848        # src link - use chmod
849        os.lchmod(dst_link, stat.S_IRWXO)
850        shutil.copymode(src_link, dst, follow_symlinks=False)
851        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
852        # dst link - use chmod
853        os.lchmod(dst_link, stat.S_IRWXO)
854        shutil.copymode(src, dst_link, follow_symlinks=False)
855        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
856
857    @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
858    @os_helper.skip_unless_symlink
859    def test_copymode_symlink_to_symlink_wo_lchmod(self):
860        tmp_dir = self.mkdtemp()
861        src = os.path.join(tmp_dir, 'foo')
862        dst = os.path.join(tmp_dir, 'bar')
863        src_link = os.path.join(tmp_dir, 'baz')
864        dst_link = os.path.join(tmp_dir, 'quux')
865        write_file(src, 'foo')
866        write_file(dst, 'foo')
867        os.symlink(src, src_link)
868        os.symlink(dst, dst_link)
869        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
870
871    ### shutil.copystat
872
873    @os_helper.skip_unless_symlink
874    def test_copystat_symlinks(self):
875        tmp_dir = self.mkdtemp()
876        src = os.path.join(tmp_dir, 'foo')
877        dst = os.path.join(tmp_dir, 'bar')
878        src_link = os.path.join(tmp_dir, 'baz')
879        dst_link = os.path.join(tmp_dir, 'qux')
880        write_file(src, 'foo')
881        src_stat = os.stat(src)
882        os.utime(src, (src_stat.st_atime,
883                       src_stat.st_mtime - 42.0))  # ensure different mtimes
884        write_file(dst, 'bar')
885        self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
886        os.symlink(src, src_link)
887        os.symlink(dst, dst_link)
888        if hasattr(os, 'lchmod'):
889            os.lchmod(src_link, stat.S_IRWXO)
890        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
891            os.lchflags(src_link, stat.UF_NODUMP)
892        src_link_stat = os.lstat(src_link)
893        # follow
894        if hasattr(os, 'lchmod'):
895            shutil.copystat(src_link, dst_link, follow_symlinks=True)
896            self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
897        # don't follow
898        shutil.copystat(src_link, dst_link, follow_symlinks=False)
899        dst_link_stat = os.lstat(dst_link)
900        if os.utime in os.supports_follow_symlinks:
901            for attr in 'st_atime', 'st_mtime':
902                # The modification times may be truncated in the new file.
903                self.assertLessEqual(getattr(src_link_stat, attr),
904                                     getattr(dst_link_stat, attr) + 1)
905        if hasattr(os, 'lchmod'):
906            self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
907        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
908            self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
909        # tell to follow but dst is not a link
910        shutil.copystat(src_link, dst, follow_symlinks=False)
911        self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
912                        00000.1)
913
914    @unittest.skipUnless(hasattr(os, 'chflags') and
915                         hasattr(errno, 'EOPNOTSUPP') and
916                         hasattr(errno, 'ENOTSUP'),
917                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
918    def test_copystat_handles_harmless_chflags_errors(self):
919        tmpdir = self.mkdtemp()
920        file1 = os.path.join(tmpdir, 'file1')
921        file2 = os.path.join(tmpdir, 'file2')
922        write_file(file1, 'xxx')
923        write_file(file2, 'xxx')
924
925        def make_chflags_raiser(err):
926            ex = OSError()
927
928            def _chflags_raiser(path, flags, *, follow_symlinks=True):
929                ex.errno = err
930                raise ex
931            return _chflags_raiser
932        old_chflags = os.chflags
933        try:
934            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
935                os.chflags = make_chflags_raiser(err)
936                shutil.copystat(file1, file2)
937            # assert others errors break it
938            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
939            self.assertRaises(OSError, shutil.copystat, file1, file2)
940        finally:
941            os.chflags = old_chflags
942
943    ### shutil.copyxattr
944
945    @os_helper.skip_unless_xattr
946    def test_copyxattr(self):
947        tmp_dir = self.mkdtemp()
948        src = os.path.join(tmp_dir, 'foo')
949        write_file(src, 'foo')
950        dst = os.path.join(tmp_dir, 'bar')
951        write_file(dst, 'bar')
952
953        # no xattr == no problem
954        shutil._copyxattr(src, dst)
955        # common case
956        os.setxattr(src, 'user.foo', b'42')
957        os.setxattr(src, 'user.bar', b'43')
958        shutil._copyxattr(src, dst)
959        self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
960        self.assertEqual(
961                os.getxattr(src, 'user.foo'),
962                os.getxattr(dst, 'user.foo'))
963        # check errors don't affect other attrs
964        os.remove(dst)
965        write_file(dst, 'bar')
966        os_error = OSError(errno.EPERM, 'EPERM')
967
968        def _raise_on_user_foo(fname, attr, val, **kwargs):
969            if attr == 'user.foo':
970                raise os_error
971            else:
972                orig_setxattr(fname, attr, val, **kwargs)
973        try:
974            orig_setxattr = os.setxattr
975            os.setxattr = _raise_on_user_foo
976            shutil._copyxattr(src, dst)
977            self.assertIn('user.bar', os.listxattr(dst))
978        finally:
979            os.setxattr = orig_setxattr
980        # the source filesystem not supporting xattrs should be ok, too.
981        def _raise_on_src(fname, *, follow_symlinks=True):
982            if fname == src:
983                raise OSError(errno.ENOTSUP, 'Operation not supported')
984            return orig_listxattr(fname, follow_symlinks=follow_symlinks)
985        try:
986            orig_listxattr = os.listxattr
987            os.listxattr = _raise_on_src
988            shutil._copyxattr(src, dst)
989        finally:
990            os.listxattr = orig_listxattr
991
992        # test that shutil.copystat copies xattrs
993        src = os.path.join(tmp_dir, 'the_original')
994        srcro = os.path.join(tmp_dir, 'the_original_ro')
995        write_file(src, src)
996        write_file(srcro, srcro)
997        os.setxattr(src, 'user.the_value', b'fiddly')
998        os.setxattr(srcro, 'user.the_value', b'fiddly')
999        os.chmod(srcro, 0o444)
1000        dst = os.path.join(tmp_dir, 'the_copy')
1001        dstro = os.path.join(tmp_dir, 'the_copy_ro')
1002        write_file(dst, dst)
1003        write_file(dstro, dstro)
1004        shutil.copystat(src, dst)
1005        shutil.copystat(srcro, dstro)
1006        self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1007        self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1008
1009    @os_helper.skip_unless_symlink
1010    @os_helper.skip_unless_xattr
1011    @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1012                         'root privileges required')
1013    def test_copyxattr_symlinks(self):
1014        # On Linux, it's only possible to access non-user xattr for symlinks;
1015        # which in turn require root privileges. This test should be expanded
1016        # as soon as other platforms gain support for extended attributes.
1017        tmp_dir = self.mkdtemp()
1018        src = os.path.join(tmp_dir, 'foo')
1019        src_link = os.path.join(tmp_dir, 'baz')
1020        write_file(src, 'foo')
1021        os.symlink(src, src_link)
1022        os.setxattr(src, 'trusted.foo', b'42')
1023        os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1024        dst = os.path.join(tmp_dir, 'bar')
1025        dst_link = os.path.join(tmp_dir, 'qux')
1026        write_file(dst, 'bar')
1027        os.symlink(dst, dst_link)
1028        shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1029        self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1030        self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1031        shutil._copyxattr(src_link, dst, follow_symlinks=False)
1032        self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1033
1034    ### shutil.copy
1035
1036    def _copy_file(self, method):
1037        fname = 'test.txt'
1038        tmpdir = self.mkdtemp()
1039        write_file((tmpdir, fname), 'xxx')
1040        file1 = os.path.join(tmpdir, fname)
1041        tmpdir2 = self.mkdtemp()
1042        method(file1, tmpdir2)
1043        file2 = os.path.join(tmpdir2, fname)
1044        return (file1, file2)
1045
1046    def test_copy(self):
1047        # Ensure that the copied file exists and has the same mode bits.
1048        file1, file2 = self._copy_file(shutil.copy)
1049        self.assertTrue(os.path.exists(file2))
1050        self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1051
1052    @os_helper.skip_unless_symlink
1053    def test_copy_symlinks(self):
1054        tmp_dir = self.mkdtemp()
1055        src = os.path.join(tmp_dir, 'foo')
1056        dst = os.path.join(tmp_dir, 'bar')
1057        src_link = os.path.join(tmp_dir, 'baz')
1058        write_file(src, 'foo')
1059        os.symlink(src, src_link)
1060        if hasattr(os, 'lchmod'):
1061            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1062        # don't follow
1063        shutil.copy(src_link, dst, follow_symlinks=True)
1064        self.assertFalse(os.path.islink(dst))
1065        self.assertEqual(read_file(src), read_file(dst))
1066        os.remove(dst)
1067        # follow
1068        shutil.copy(src_link, dst, follow_symlinks=False)
1069        self.assertTrue(os.path.islink(dst))
1070        self.assertEqual(os.readlink(dst), os.readlink(src_link))
1071        if hasattr(os, 'lchmod'):
1072            self.assertEqual(os.lstat(src_link).st_mode,
1073                             os.lstat(dst).st_mode)
1074
1075    ### shutil.copy2
1076
1077    @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1078    def test_copy2(self):
1079        # Ensure that the copied file exists and has the same mode and
1080        # modification time bits.
1081        file1, file2 = self._copy_file(shutil.copy2)
1082        self.assertTrue(os.path.exists(file2))
1083        file1_stat = os.stat(file1)
1084        file2_stat = os.stat(file2)
1085        self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1086        for attr in 'st_atime', 'st_mtime':
1087            # The modification times may be truncated in the new file.
1088            self.assertLessEqual(getattr(file1_stat, attr),
1089                                 getattr(file2_stat, attr) + 1)
1090        if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1091            self.assertEqual(getattr(file1_stat, 'st_flags'),
1092                             getattr(file2_stat, 'st_flags'))
1093
1094    @os_helper.skip_unless_symlink
1095    def test_copy2_symlinks(self):
1096        tmp_dir = self.mkdtemp()
1097        src = os.path.join(tmp_dir, 'foo')
1098        dst = os.path.join(tmp_dir, 'bar')
1099        src_link = os.path.join(tmp_dir, 'baz')
1100        write_file(src, 'foo')
1101        os.symlink(src, src_link)
1102        if hasattr(os, 'lchmod'):
1103            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1104        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1105            os.lchflags(src_link, stat.UF_NODUMP)
1106        src_stat = os.stat(src)
1107        src_link_stat = os.lstat(src_link)
1108        # follow
1109        shutil.copy2(src_link, dst, follow_symlinks=True)
1110        self.assertFalse(os.path.islink(dst))
1111        self.assertEqual(read_file(src), read_file(dst))
1112        os.remove(dst)
1113        # don't follow
1114        shutil.copy2(src_link, dst, follow_symlinks=False)
1115        self.assertTrue(os.path.islink(dst))
1116        self.assertEqual(os.readlink(dst), os.readlink(src_link))
1117        dst_stat = os.lstat(dst)
1118        if os.utime in os.supports_follow_symlinks:
1119            for attr in 'st_atime', 'st_mtime':
1120                # The modification times may be truncated in the new file.
1121                self.assertLessEqual(getattr(src_link_stat, attr),
1122                                     getattr(dst_stat, attr) + 1)
1123        if hasattr(os, 'lchmod'):
1124            self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1125            self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1126        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1127            self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1128
1129    @os_helper.skip_unless_xattr
1130    def test_copy2_xattr(self):
1131        tmp_dir = self.mkdtemp()
1132        src = os.path.join(tmp_dir, 'foo')
1133        dst = os.path.join(tmp_dir, 'bar')
1134        write_file(src, 'foo')
1135        os.setxattr(src, 'user.foo', b'42')
1136        shutil.copy2(src, dst)
1137        self.assertEqual(
1138                os.getxattr(src, 'user.foo'),
1139                os.getxattr(dst, 'user.foo'))
1140        os.remove(dst)
1141
1142    def test_copy_return_value(self):
1143        # copy and copy2 both return their destination path.
1144        for fn in (shutil.copy, shutil.copy2):
1145            src_dir = self.mkdtemp()
1146            dst_dir = self.mkdtemp()
1147            src = os.path.join(src_dir, 'foo')
1148            write_file(src, 'foo')
1149            rv = fn(src, dst_dir)
1150            self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1151            rv = fn(src, os.path.join(dst_dir, 'bar'))
1152            self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1153
1154    def test_copy_dir(self):
1155        self._test_copy_dir(shutil.copy)
1156
1157    def test_copy2_dir(self):
1158        self._test_copy_dir(shutil.copy2)
1159
1160    def _test_copy_dir(self, copy_func):
1161        src_dir = self.mkdtemp()
1162        src_file = os.path.join(src_dir, 'foo')
1163        dir2 = self.mkdtemp()
1164        dst = os.path.join(src_dir, 'does_not_exist/')
1165        write_file(src_file, 'foo')
1166        if sys.platform == "win32":
1167            err = PermissionError
1168        else:
1169            err = IsADirectoryError
1170        self.assertRaises(err, copy_func, dir2, src_dir)
1171
1172        # raise *err* because of src rather than FileNotFoundError because of dst
1173        self.assertRaises(err, copy_func, dir2, dst)
1174        copy_func(src_file, dir2)     # should not raise exceptions
1175
1176    ### shutil.copyfile
1177
1178    @os_helper.skip_unless_symlink
1179    def test_copyfile_symlinks(self):
1180        tmp_dir = self.mkdtemp()
1181        src = os.path.join(tmp_dir, 'src')
1182        dst = os.path.join(tmp_dir, 'dst')
1183        dst_link = os.path.join(tmp_dir, 'dst_link')
1184        link = os.path.join(tmp_dir, 'link')
1185        write_file(src, 'foo')
1186        os.symlink(src, link)
1187        # don't follow
1188        shutil.copyfile(link, dst_link, follow_symlinks=False)
1189        self.assertTrue(os.path.islink(dst_link))
1190        self.assertEqual(os.readlink(link), os.readlink(dst_link))
1191        # follow
1192        shutil.copyfile(link, dst)
1193        self.assertFalse(os.path.islink(dst))
1194
1195    @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
1196    def test_dont_copy_file_onto_link_to_itself(self):
1197        # bug 851123.
1198        os.mkdir(TESTFN)
1199        src = os.path.join(TESTFN, 'cheese')
1200        dst = os.path.join(TESTFN, 'shop')
1201        try:
1202            with open(src, 'w', encoding='utf-8') as f:
1203                f.write('cheddar')
1204            try:
1205                os.link(src, dst)
1206            except PermissionError as e:
1207                self.skipTest('os.link(): %s' % e)
1208            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
1209            with open(src, 'r', encoding='utf-8') as f:
1210                self.assertEqual(f.read(), 'cheddar')
1211            os.remove(dst)
1212        finally:
1213            shutil.rmtree(TESTFN, ignore_errors=True)
1214
1215    @os_helper.skip_unless_symlink
1216    def test_dont_copy_file_onto_symlink_to_itself(self):
1217        # bug 851123.
1218        os.mkdir(TESTFN)
1219        src = os.path.join(TESTFN, 'cheese')
1220        dst = os.path.join(TESTFN, 'shop')
1221        try:
1222            with open(src, 'w', encoding='utf-8') as f:
1223                f.write('cheddar')
1224            # Using `src` here would mean we end up with a symlink pointing
1225            # to TESTFN/TESTFN/cheese, while it should point at
1226            # TESTFN/cheese.
1227            os.symlink('cheese', dst)
1228            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
1229            with open(src, 'r', encoding='utf-8') as f:
1230                self.assertEqual(f.read(), 'cheddar')
1231            os.remove(dst)
1232        finally:
1233            shutil.rmtree(TESTFN, ignore_errors=True)
1234
1235    # Issue #3002: copyfile and copytree block indefinitely on named pipes
1236    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1237    @unittest.skipIf(sys.platform == "vxworks",
1238                    "fifo requires special path on VxWorks")
1239    def test_copyfile_named_pipe(self):
1240        try:
1241            os.mkfifo(TESTFN)
1242        except PermissionError as e:
1243            self.skipTest('os.mkfifo(): %s' % e)
1244        try:
1245            self.assertRaises(shutil.SpecialFileError,
1246                                shutil.copyfile, TESTFN, TESTFN2)
1247            self.assertRaises(shutil.SpecialFileError,
1248                                shutil.copyfile, __file__, TESTFN)
1249        finally:
1250            os.remove(TESTFN)
1251
1252    def test_copyfile_return_value(self):
1253        # copytree returns its destination path.
1254        src_dir = self.mkdtemp()
1255        dst_dir = self.mkdtemp()
1256        dst_file = os.path.join(dst_dir, 'bar')
1257        src_file = os.path.join(src_dir, 'foo')
1258        write_file(src_file, 'foo')
1259        rv = shutil.copyfile(src_file, dst_file)
1260        self.assertTrue(os.path.exists(rv))
1261        self.assertEqual(read_file(src_file), read_file(dst_file))
1262
1263    def test_copyfile_same_file(self):
1264        # copyfile() should raise SameFileError if the source and destination
1265        # are the same.
1266        src_dir = self.mkdtemp()
1267        src_file = os.path.join(src_dir, 'foo')
1268        write_file(src_file, 'foo')
1269        self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1270        # But Error should work too, to stay backward compatible.
1271        self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1272        # Make sure file is not corrupted.
1273        self.assertEqual(read_file(src_file), 'foo')
1274
1275    @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
1276    def test_copyfile_nonexistent_dir(self):
1277        # Issue 43219
1278        src_dir = self.mkdtemp()
1279        src_file = os.path.join(src_dir, 'foo')
1280        dst = os.path.join(src_dir, 'does_not_exist/')
1281        write_file(src_file, 'foo')
1282        self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
1283
1284    def test_copyfile_copy_dir(self):
1285        # Issue 45234
1286        # test copy() and copyfile() raising proper exceptions when src and/or
1287        # dst are directories
1288        src_dir = self.mkdtemp()
1289        src_file = os.path.join(src_dir, 'foo')
1290        dir2 = self.mkdtemp()
1291        dst = os.path.join(src_dir, 'does_not_exist/')
1292        write_file(src_file, 'foo')
1293        if sys.platform == "win32":
1294            err = PermissionError
1295        else:
1296            err = IsADirectoryError
1297
1298        self.assertRaises(err, shutil.copyfile, src_dir, dst)
1299        self.assertRaises(err, shutil.copyfile, src_file, src_dir)
1300        self.assertRaises(err, shutil.copyfile, dir2, src_dir)
1301
1302
1303class TestArchives(BaseTest, unittest.TestCase):
1304
1305    ### shutil.make_archive
1306
1307    @support.requires_zlib()
1308    def test_make_tarball(self):
1309        # creating something to tar
1310        root_dir, base_dir = self._create_files('')
1311
1312        tmpdir2 = self.mkdtemp()
1313        # force shutil to create the directory
1314        os.rmdir(tmpdir2)
1315        # working with relative paths
1316        work_dir = os.path.dirname(tmpdir2)
1317        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1318
1319        with os_helper.change_cwd(work_dir):
1320            base_name = os.path.abspath(rel_base_name)
1321            tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
1322
1323        # check if the compressed tarball was created
1324        self.assertEqual(tarball, base_name + '.tar.gz')
1325        self.assertTrue(os.path.isfile(tarball))
1326        self.assertTrue(tarfile.is_tarfile(tarball))
1327        with tarfile.open(tarball, 'r:gz') as tf:
1328            self.assertCountEqual(tf.getnames(),
1329                                  ['.', './sub', './sub2',
1330                                   './file1', './file2', './sub/file3'])
1331
1332        # trying an uncompressed one
1333        with os_helper.change_cwd(work_dir):
1334            tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
1335        self.assertEqual(tarball, base_name + '.tar')
1336        self.assertTrue(os.path.isfile(tarball))
1337        self.assertTrue(tarfile.is_tarfile(tarball))
1338        with tarfile.open(tarball, 'r') as tf:
1339            self.assertCountEqual(tf.getnames(),
1340                                  ['.', './sub', './sub2',
1341                                  './file1', './file2', './sub/file3'])
1342
1343    def _tarinfo(self, path):
1344        with tarfile.open(path) as tar:
1345            names = tar.getnames()
1346            names.sort()
1347            return tuple(names)
1348
1349    def _create_files(self, base_dir='dist'):
1350        # creating something to tar
1351        root_dir = self.mkdtemp()
1352        dist = os.path.join(root_dir, base_dir)
1353        os.makedirs(dist, exist_ok=True)
1354        write_file((dist, 'file1'), 'xxx')
1355        write_file((dist, 'file2'), 'xxx')
1356        os.mkdir(os.path.join(dist, 'sub'))
1357        write_file((dist, 'sub', 'file3'), 'xxx')
1358        os.mkdir(os.path.join(dist, 'sub2'))
1359        if base_dir:
1360            write_file((root_dir, 'outer'), 'xxx')
1361        return root_dir, base_dir
1362
1363    @support.requires_zlib()
1364    @unittest.skipUnless(shutil.which('tar'),
1365                         'Need the tar command to run')
1366    def test_tarfile_vs_tar(self):
1367        root_dir, base_dir = self._create_files()
1368        base_name = os.path.join(self.mkdtemp(), 'archive')
1369        tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
1370
1371        # check if the compressed tarball was created
1372        self.assertEqual(tarball, base_name + '.tar.gz')
1373        self.assertTrue(os.path.isfile(tarball))
1374
1375        # now create another tarball using `tar`
1376        tarball2 = os.path.join(root_dir, 'archive2.tar')
1377        tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
1378        subprocess.check_call(tar_cmd, cwd=root_dir,
1379                              stdout=subprocess.DEVNULL)
1380
1381        self.assertTrue(os.path.isfile(tarball2))
1382        # let's compare both tarballs
1383        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
1384
1385        # trying an uncompressed one
1386        tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1387        self.assertEqual(tarball, base_name + '.tar')
1388        self.assertTrue(os.path.isfile(tarball))
1389
1390        # now for a dry_run
1391        tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1392                               dry_run=True)
1393        self.assertEqual(tarball, base_name + '.tar')
1394        self.assertTrue(os.path.isfile(tarball))
1395
1396    @support.requires_zlib()
1397    def test_make_zipfile(self):
1398        # creating something to zip
1399        root_dir, base_dir = self._create_files()
1400
1401        tmpdir2 = self.mkdtemp()
1402        # force shutil to create the directory
1403        os.rmdir(tmpdir2)
1404        # working with relative paths
1405        work_dir = os.path.dirname(tmpdir2)
1406        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1407
1408        with os_helper.change_cwd(work_dir):
1409            base_name = os.path.abspath(rel_base_name)
1410            res = make_archive(rel_base_name, 'zip', root_dir)
1411
1412        self.assertEqual(res, base_name + '.zip')
1413        self.assertTrue(os.path.isfile(res))
1414        self.assertTrue(zipfile.is_zipfile(res))
1415        with zipfile.ZipFile(res) as zf:
1416            self.assertCountEqual(zf.namelist(),
1417                    ['dist/', 'dist/sub/', 'dist/sub2/',
1418                     'dist/file1', 'dist/file2', 'dist/sub/file3',
1419                     'outer'])
1420
1421        with os_helper.change_cwd(work_dir):
1422            base_name = os.path.abspath(rel_base_name)
1423            res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
1424
1425        self.assertEqual(res, base_name + '.zip')
1426        self.assertTrue(os.path.isfile(res))
1427        self.assertTrue(zipfile.is_zipfile(res))
1428        with zipfile.ZipFile(res) as zf:
1429            self.assertCountEqual(zf.namelist(),
1430                    ['dist/', 'dist/sub/', 'dist/sub2/',
1431                     'dist/file1', 'dist/file2', 'dist/sub/file3'])
1432
1433    @support.requires_zlib()
1434    @unittest.skipUnless(shutil.which('zip'),
1435                         'Need the zip command to run')
1436    def test_zipfile_vs_zip(self):
1437        root_dir, base_dir = self._create_files()
1438        base_name = os.path.join(self.mkdtemp(), 'archive')
1439        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1440
1441        # check if ZIP file  was created
1442        self.assertEqual(archive, base_name + '.zip')
1443        self.assertTrue(os.path.isfile(archive))
1444
1445        # now create another ZIP file using `zip`
1446        archive2 = os.path.join(root_dir, 'archive2.zip')
1447        zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
1448        subprocess.check_call(zip_cmd, cwd=root_dir,
1449                              stdout=subprocess.DEVNULL)
1450
1451        self.assertTrue(os.path.isfile(archive2))
1452        # let's compare both ZIP files
1453        with zipfile.ZipFile(archive) as zf:
1454            names = zf.namelist()
1455        with zipfile.ZipFile(archive2) as zf:
1456            names2 = zf.namelist()
1457        self.assertEqual(sorted(names), sorted(names2))
1458
1459    @support.requires_zlib()
1460    @unittest.skipUnless(shutil.which('unzip'),
1461                         'Need the unzip command to run')
1462    def test_unzip_zipfile(self):
1463        root_dir, base_dir = self._create_files()
1464        base_name = os.path.join(self.mkdtemp(), 'archive')
1465        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1466
1467        # check if ZIP file  was created
1468        self.assertEqual(archive, base_name + '.zip')
1469        self.assertTrue(os.path.isfile(archive))
1470
1471        # now check the ZIP file using `unzip -t`
1472        zip_cmd = ['unzip', '-t', archive]
1473        with os_helper.change_cwd(root_dir):
1474            try:
1475                subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1476            except subprocess.CalledProcessError as exc:
1477                details = exc.output.decode(errors="replace")
1478                if 'unrecognized option: t' in details:
1479                    self.skipTest("unzip doesn't support -t")
1480                msg = "{}\n\n**Unzip Output**\n{}"
1481                self.fail(msg.format(exc, details))
1482
1483    def test_make_archive(self):
1484        tmpdir = self.mkdtemp()
1485        base_name = os.path.join(tmpdir, 'archive')
1486        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1487
1488    @support.requires_zlib()
1489    def test_make_archive_owner_group(self):
1490        # testing make_archive with owner and group, with various combinations
1491        # this works even if there's not gid/uid support
1492        if UID_GID_SUPPORT:
1493            group = grp.getgrgid(0)[0]
1494            owner = pwd.getpwuid(0)[0]
1495        else:
1496            group = owner = 'root'
1497
1498        root_dir, base_dir = self._create_files()
1499        base_name = os.path.join(self.mkdtemp(), 'archive')
1500        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1501                           group=group)
1502        self.assertTrue(os.path.isfile(res))
1503
1504        res = make_archive(base_name, 'zip', root_dir, base_dir)
1505        self.assertTrue(os.path.isfile(res))
1506
1507        res = make_archive(base_name, 'tar', root_dir, base_dir,
1508                           owner=owner, group=group)
1509        self.assertTrue(os.path.isfile(res))
1510
1511        res = make_archive(base_name, 'tar', root_dir, base_dir,
1512                           owner='kjhkjhkjg', group='oihohoh')
1513        self.assertTrue(os.path.isfile(res))
1514
1515
1516    @support.requires_zlib()
1517    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1518    def test_tarfile_root_owner(self):
1519        root_dir, base_dir = self._create_files()
1520        base_name = os.path.join(self.mkdtemp(), 'archive')
1521        group = grp.getgrgid(0)[0]
1522        owner = pwd.getpwuid(0)[0]
1523        with os_helper.change_cwd(root_dir):
1524            archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1525                                        owner=owner, group=group)
1526
1527        # check if the compressed tarball was created
1528        self.assertTrue(os.path.isfile(archive_name))
1529
1530        # now checks the rights
1531        archive = tarfile.open(archive_name)
1532        try:
1533            for member in archive.getmembers():
1534                self.assertEqual(member.uid, 0)
1535                self.assertEqual(member.gid, 0)
1536        finally:
1537            archive.close()
1538
1539    def test_make_archive_cwd(self):
1540        current_dir = os.getcwd()
1541        def _breaks(*args, **kw):
1542            raise RuntimeError()
1543
1544        register_archive_format('xxx', _breaks, [], 'xxx file')
1545        try:
1546            try:
1547                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1548            except Exception:
1549                pass
1550            self.assertEqual(os.getcwd(), current_dir)
1551        finally:
1552            unregister_archive_format('xxx')
1553
1554    def test_make_tarfile_in_curdir(self):
1555        # Issue #21280
1556        root_dir = self.mkdtemp()
1557        with os_helper.change_cwd(root_dir):
1558            self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1559            self.assertTrue(os.path.isfile('test.tar'))
1560
1561    @support.requires_zlib()
1562    def test_make_zipfile_in_curdir(self):
1563        # Issue #21280
1564        root_dir = self.mkdtemp()
1565        with os_helper.change_cwd(root_dir):
1566            self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1567            self.assertTrue(os.path.isfile('test.zip'))
1568
1569    def test_register_archive_format(self):
1570
1571        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1572        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1573                          1)
1574        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1575                          [(1, 2), (1, 2, 3)])
1576
1577        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1578        formats = [name for name, params in get_archive_formats()]
1579        self.assertIn('xxx', formats)
1580
1581        unregister_archive_format('xxx')
1582        formats = [name for name, params in get_archive_formats()]
1583        self.assertNotIn('xxx', formats)
1584
1585    ### shutil.unpack_archive
1586
1587    def check_unpack_archive(self, format):
1588        self.check_unpack_archive_with_converter(format, lambda path: path)
1589        self.check_unpack_archive_with_converter(format, pathlib.Path)
1590        self.check_unpack_archive_with_converter(format, FakePath)
1591
1592    def check_unpack_archive_with_converter(self, format, converter):
1593        root_dir, base_dir = self._create_files()
1594        expected = rlistdir(root_dir)
1595        expected.remove('outer')
1596
1597        base_name = os.path.join(self.mkdtemp(), 'archive')
1598        filename = make_archive(base_name, format, root_dir, base_dir)
1599
1600        # let's try to unpack it now
1601        tmpdir2 = self.mkdtemp()
1602        unpack_archive(converter(filename), converter(tmpdir2))
1603        self.assertEqual(rlistdir(tmpdir2), expected)
1604
1605        # and again, this time with the format specified
1606        tmpdir3 = self.mkdtemp()
1607        unpack_archive(converter(filename), converter(tmpdir3), format=format)
1608        self.assertEqual(rlistdir(tmpdir3), expected)
1609
1610        self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1611        self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
1612
1613    def test_unpack_archive_tar(self):
1614        self.check_unpack_archive('tar')
1615
1616    @support.requires_zlib()
1617    def test_unpack_archive_gztar(self):
1618        self.check_unpack_archive('gztar')
1619
1620    @support.requires_bz2()
1621    def test_unpack_archive_bztar(self):
1622        self.check_unpack_archive('bztar')
1623
1624    @support.requires_lzma()
1625    @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
1626    def test_unpack_archive_xztar(self):
1627        self.check_unpack_archive('xztar')
1628
1629    @support.requires_zlib()
1630    def test_unpack_archive_zip(self):
1631        self.check_unpack_archive('zip')
1632
1633    def test_unpack_registry(self):
1634
1635        formats = get_unpack_formats()
1636
1637        def _boo(filename, extract_dir, extra):
1638            self.assertEqual(extra, 1)
1639            self.assertEqual(filename, 'stuff.boo')
1640            self.assertEqual(extract_dir, 'xx')
1641
1642        register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1643        unpack_archive('stuff.boo', 'xx')
1644
1645        # trying to register a .boo unpacker again
1646        self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1647                          ['.boo'], _boo)
1648
1649        # should work now
1650        unregister_unpack_format('Boo')
1651        register_unpack_format('Boo2', ['.boo'], _boo)
1652        self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1653        self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1654
1655        # let's leave a clean state
1656        unregister_unpack_format('Boo2')
1657        self.assertEqual(get_unpack_formats(), formats)
1658
1659
1660class TestMisc(BaseTest, unittest.TestCase):
1661
1662    @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1663                         "disk_usage not available on this platform")
1664    def test_disk_usage(self):
1665        usage = shutil.disk_usage(os.path.dirname(__file__))
1666        for attr in ('total', 'used', 'free'):
1667            self.assertIsInstance(getattr(usage, attr), int)
1668        self.assertGreater(usage.total, 0)
1669        self.assertGreater(usage.used, 0)
1670        self.assertGreaterEqual(usage.free, 0)
1671        self.assertGreaterEqual(usage.total, usage.used)
1672        self.assertGreater(usage.total, usage.free)
1673
1674        # bpo-32557: Check that disk_usage() also accepts a filename
1675        shutil.disk_usage(__file__)
1676
1677    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1678    @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1679    def test_chown(self):
1680        dirname = self.mkdtemp()
1681        filename = tempfile.mktemp(dir=dirname)
1682        write_file(filename, 'testing chown function')
1683
1684        with self.assertRaises(ValueError):
1685            shutil.chown(filename)
1686
1687        with self.assertRaises(LookupError):
1688            shutil.chown(filename, user='non-existing username')
1689
1690        with self.assertRaises(LookupError):
1691            shutil.chown(filename, group='non-existing groupname')
1692
1693        with self.assertRaises(TypeError):
1694            shutil.chown(filename, b'spam')
1695
1696        with self.assertRaises(TypeError):
1697            shutil.chown(filename, 3.14)
1698
1699        uid = os.getuid()
1700        gid = os.getgid()
1701
1702        def check_chown(path, uid=None, gid=None):
1703            s = os.stat(filename)
1704            if uid is not None:
1705                self.assertEqual(uid, s.st_uid)
1706            if gid is not None:
1707                self.assertEqual(gid, s.st_gid)
1708
1709        shutil.chown(filename, uid, gid)
1710        check_chown(filename, uid, gid)
1711        shutil.chown(filename, uid)
1712        check_chown(filename, uid)
1713        shutil.chown(filename, user=uid)
1714        check_chown(filename, uid)
1715        shutil.chown(filename, group=gid)
1716        check_chown(filename, gid=gid)
1717
1718        shutil.chown(dirname, uid, gid)
1719        check_chown(dirname, uid, gid)
1720        shutil.chown(dirname, uid)
1721        check_chown(dirname, uid)
1722        shutil.chown(dirname, user=uid)
1723        check_chown(dirname, uid)
1724        shutil.chown(dirname, group=gid)
1725        check_chown(dirname, gid=gid)
1726
1727        try:
1728            user = pwd.getpwuid(uid)[0]
1729            group = grp.getgrgid(gid)[0]
1730        except KeyError:
1731            # On some systems uid/gid cannot be resolved.
1732            pass
1733        else:
1734            shutil.chown(filename, user, group)
1735            check_chown(filename, uid, gid)
1736            shutil.chown(dirname, user, group)
1737            check_chown(dirname, uid, gid)
1738
1739
1740class TestWhich(BaseTest, unittest.TestCase):
1741
1742    def setUp(self):
1743        self.temp_dir = self.mkdtemp(prefix="Tmp")
1744        # Give the temp_file an ".exe" suffix for all.
1745        # It's needed on Windows and not harmful on other platforms.
1746        self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1747                                                     prefix="Tmp",
1748                                                     suffix=".Exe")
1749        os.chmod(self.temp_file.name, stat.S_IXUSR)
1750        self.addCleanup(self.temp_file.close)
1751        self.dir, self.file = os.path.split(self.temp_file.name)
1752        self.env_path = self.dir
1753        self.curdir = os.curdir
1754        self.ext = ".EXE"
1755
1756    def test_basic(self):
1757        # Given an EXE in a directory, it should be returned.
1758        rv = shutil.which(self.file, path=self.dir)
1759        self.assertEqual(rv, self.temp_file.name)
1760
1761    def test_absolute_cmd(self):
1762        # When given the fully qualified path to an executable that exists,
1763        # it should be returned.
1764        rv = shutil.which(self.temp_file.name, path=self.temp_dir)
1765        self.assertEqual(rv, self.temp_file.name)
1766
1767    def test_relative_cmd(self):
1768        # When given the relative path with a directory part to an executable
1769        # that exists, it should be returned.
1770        base_dir, tail_dir = os.path.split(self.dir)
1771        relpath = os.path.join(tail_dir, self.file)
1772        with os_helper.change_cwd(path=base_dir):
1773            rv = shutil.which(relpath, path=self.temp_dir)
1774            self.assertEqual(rv, relpath)
1775        # But it shouldn't be searched in PATH directories (issue #16957).
1776        with os_helper.change_cwd(path=self.dir):
1777            rv = shutil.which(relpath, path=base_dir)
1778            self.assertIsNone(rv)
1779
1780    def test_cwd(self):
1781        # Issue #16957
1782        base_dir = os.path.dirname(self.dir)
1783        with os_helper.change_cwd(path=self.dir):
1784            rv = shutil.which(self.file, path=base_dir)
1785            if sys.platform == "win32":
1786                # Windows: current directory implicitly on PATH
1787                self.assertEqual(rv, os.path.join(self.curdir, self.file))
1788            else:
1789                # Other platforms: shouldn't match in the current directory.
1790                self.assertIsNone(rv)
1791
1792    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1793                     'non-root user required')
1794    def test_non_matching_mode(self):
1795        # Set the file read-only and ask for writeable files.
1796        os.chmod(self.temp_file.name, stat.S_IREAD)
1797        if os.access(self.temp_file.name, os.W_OK):
1798            self.skipTest("can't set the file read-only")
1799        rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1800        self.assertIsNone(rv)
1801
1802    def test_relative_path(self):
1803        base_dir, tail_dir = os.path.split(self.dir)
1804        with os_helper.change_cwd(path=base_dir):
1805            rv = shutil.which(self.file, path=tail_dir)
1806            self.assertEqual(rv, os.path.join(tail_dir, self.file))
1807
1808    def test_nonexistent_file(self):
1809        # Return None when no matching executable file is found on the path.
1810        rv = shutil.which("foo.exe", path=self.dir)
1811        self.assertIsNone(rv)
1812
1813    @unittest.skipUnless(sys.platform == "win32",
1814                         "pathext check is Windows-only")
1815    def test_pathext_checking(self):
1816        # Ask for the file without the ".exe" extension, then ensure that
1817        # it gets found properly with the extension.
1818        rv = shutil.which(self.file[:-4], path=self.dir)
1819        self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
1820
1821    def test_environ_path(self):
1822        with os_helper.EnvironmentVarGuard() as env:
1823            env['PATH'] = self.env_path
1824            rv = shutil.which(self.file)
1825            self.assertEqual(rv, self.temp_file.name)
1826
1827    def test_environ_path_empty(self):
1828        # PATH='': no match
1829        with os_helper.EnvironmentVarGuard() as env:
1830            env['PATH'] = ''
1831            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1832                                     create=True), \
1833                 support.swap_attr(os, 'defpath', self.dir), \
1834                 os_helper.change_cwd(self.dir):
1835                rv = shutil.which(self.file)
1836                self.assertIsNone(rv)
1837
1838    def test_environ_path_cwd(self):
1839        expected_cwd = os.path.basename(self.temp_file.name)
1840        if sys.platform == "win32":
1841            curdir = os.curdir
1842            if isinstance(expected_cwd, bytes):
1843                curdir = os.fsencode(curdir)
1844            expected_cwd = os.path.join(curdir, expected_cwd)
1845
1846        # PATH=':': explicitly looks in the current directory
1847        with os_helper.EnvironmentVarGuard() as env:
1848            env['PATH'] = os.pathsep
1849            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1850                                     create=True), \
1851                 support.swap_attr(os, 'defpath', self.dir):
1852                rv = shutil.which(self.file)
1853                self.assertIsNone(rv)
1854
1855                # look in current directory
1856                with os_helper.change_cwd(self.dir):
1857                    rv = shutil.which(self.file)
1858                    self.assertEqual(rv, expected_cwd)
1859
1860    def test_environ_path_missing(self):
1861        with os_helper.EnvironmentVarGuard() as env:
1862            env.pop('PATH', None)
1863
1864            # without confstr
1865            with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1866                                     create=True), \
1867                 support.swap_attr(os, 'defpath', self.dir):
1868                rv = shutil.which(self.file)
1869            self.assertEqual(rv, self.temp_file.name)
1870
1871            # with confstr
1872            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1873                                     create=True), \
1874                 support.swap_attr(os, 'defpath', ''):
1875                rv = shutil.which(self.file)
1876            self.assertEqual(rv, self.temp_file.name)
1877
1878    def test_empty_path(self):
1879        base_dir = os.path.dirname(self.dir)
1880        with os_helper.change_cwd(path=self.dir), \
1881             os_helper.EnvironmentVarGuard() as env:
1882            env['PATH'] = self.env_path
1883            rv = shutil.which(self.file, path='')
1884            self.assertIsNone(rv)
1885
1886    def test_empty_path_no_PATH(self):
1887        with os_helper.EnvironmentVarGuard() as env:
1888            env.pop('PATH', None)
1889            rv = shutil.which(self.file)
1890            self.assertIsNone(rv)
1891
1892    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1893    def test_pathext(self):
1894        ext = ".xyz"
1895        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1896                                                   prefix="Tmp2", suffix=ext)
1897        os.chmod(temp_filexyz.name, stat.S_IXUSR)
1898        self.addCleanup(temp_filexyz.close)
1899
1900        # strip path and extension
1901        program = os.path.basename(temp_filexyz.name)
1902        program = os.path.splitext(program)[0]
1903
1904        with os_helper.EnvironmentVarGuard() as env:
1905            env['PATHEXT'] = ext
1906            rv = shutil.which(program, path=self.temp_dir)
1907            self.assertEqual(rv, temp_filexyz.name)
1908
1909    # Issue 40592: See https://bugs.python.org/issue40592
1910    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1911    def test_pathext_with_empty_str(self):
1912        ext = ".xyz"
1913        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1914                                                   prefix="Tmp2", suffix=ext)
1915        self.addCleanup(temp_filexyz.close)
1916
1917        # strip path and extension
1918        program = os.path.basename(temp_filexyz.name)
1919        program = os.path.splitext(program)[0]
1920
1921        with os_helper.EnvironmentVarGuard() as env:
1922            env['PATHEXT'] = f"{ext};"  # note the ;
1923            rv = shutil.which(program, path=self.temp_dir)
1924            self.assertEqual(rv, temp_filexyz.name)
1925
1926
1927class TestWhichBytes(TestWhich):
1928    def setUp(self):
1929        TestWhich.setUp(self)
1930        self.dir = os.fsencode(self.dir)
1931        self.file = os.fsencode(self.file)
1932        self.temp_file.name = os.fsencode(self.temp_file.name)
1933        self.curdir = os.fsencode(self.curdir)
1934        self.ext = os.fsencode(self.ext)
1935
1936
1937class TestMove(BaseTest, unittest.TestCase):
1938
1939    def setUp(self):
1940        filename = "foo"
1941        self.src_dir = self.mkdtemp()
1942        self.dst_dir = self.mkdtemp()
1943        self.src_file = os.path.join(self.src_dir, filename)
1944        self.dst_file = os.path.join(self.dst_dir, filename)
1945        with open(self.src_file, "wb") as f:
1946            f.write(b"spam")
1947
1948    def _check_move_file(self, src, dst, real_dst):
1949        with open(src, "rb") as f:
1950            contents = f.read()
1951        shutil.move(src, dst)
1952        with open(real_dst, "rb") as f:
1953            self.assertEqual(contents, f.read())
1954        self.assertFalse(os.path.exists(src))
1955
1956    def _check_move_dir(self, src, dst, real_dst):
1957        contents = sorted(os.listdir(src))
1958        shutil.move(src, dst)
1959        self.assertEqual(contents, sorted(os.listdir(real_dst)))
1960        self.assertFalse(os.path.exists(src))
1961
1962    def test_move_file(self):
1963        # Move a file to another location on the same filesystem.
1964        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1965
1966    def test_move_file_to_dir(self):
1967        # Move a file inside an existing dir on the same filesystem.
1968        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1969
1970    def test_move_file_to_dir_pathlike_src(self):
1971        # Move a pathlike file to another location on the same filesystem.
1972        src = pathlib.Path(self.src_file)
1973        self._check_move_file(src, self.dst_dir, self.dst_file)
1974
1975    def test_move_file_to_dir_pathlike_dst(self):
1976        # Move a file to another pathlike location on the same filesystem.
1977        dst = pathlib.Path(self.dst_dir)
1978        self._check_move_file(self.src_file, dst, self.dst_file)
1979
1980    @mock_rename
1981    def test_move_file_other_fs(self):
1982        # Move a file to an existing dir on another filesystem.
1983        self.test_move_file()
1984
1985    @mock_rename
1986    def test_move_file_to_dir_other_fs(self):
1987        # Move a file to another location on another filesystem.
1988        self.test_move_file_to_dir()
1989
1990    def test_move_dir(self):
1991        # Move a dir to another location on the same filesystem.
1992        dst_dir = tempfile.mktemp(dir=self.mkdtemp())
1993        try:
1994            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1995        finally:
1996            os_helper.rmtree(dst_dir)
1997
1998    @mock_rename
1999    def test_move_dir_other_fs(self):
2000        # Move a dir to another location on another filesystem.
2001        self.test_move_dir()
2002
2003    def test_move_dir_to_dir(self):
2004        # Move a dir inside an existing dir on the same filesystem.
2005        self._check_move_dir(self.src_dir, self.dst_dir,
2006            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2007
2008    @mock_rename
2009    def test_move_dir_to_dir_other_fs(self):
2010        # Move a dir inside an existing dir on another filesystem.
2011        self.test_move_dir_to_dir()
2012
2013    def test_move_dir_sep_to_dir(self):
2014        self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
2015            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2016
2017    @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
2018    def test_move_dir_altsep_to_dir(self):
2019        self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
2020            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2021
2022    def test_existing_file_inside_dest_dir(self):
2023        # A file with the same name inside the destination dir already exists.
2024        with open(self.dst_file, "wb"):
2025            pass
2026        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
2027
2028    def test_dont_move_dir_in_itself(self):
2029        # Moving a dir inside itself raises an Error.
2030        dst = os.path.join(self.src_dir, "bar")
2031        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
2032
2033    def test_destinsrc_false_negative(self):
2034        os.mkdir(TESTFN)
2035        try:
2036            for src, dst in [('srcdir', 'srcdir/dest')]:
2037                src = os.path.join(TESTFN, src)
2038                dst = os.path.join(TESTFN, dst)
2039                self.assertTrue(shutil._destinsrc(src, dst),
2040                             msg='_destinsrc() wrongly concluded that '
2041                             'dst (%s) is not in src (%s)' % (dst, src))
2042        finally:
2043            os_helper.rmtree(TESTFN)
2044
2045    def test_destinsrc_false_positive(self):
2046        os.mkdir(TESTFN)
2047        try:
2048            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
2049                src = os.path.join(TESTFN, src)
2050                dst = os.path.join(TESTFN, dst)
2051                self.assertFalse(shutil._destinsrc(src, dst),
2052                            msg='_destinsrc() wrongly concluded that '
2053                            'dst (%s) is in src (%s)' % (dst, src))
2054        finally:
2055            os_helper.rmtree(TESTFN)
2056
2057    @os_helper.skip_unless_symlink
2058    @mock_rename
2059    def test_move_file_symlink(self):
2060        dst = os.path.join(self.src_dir, 'bar')
2061        os.symlink(self.src_file, dst)
2062        shutil.move(dst, self.dst_file)
2063        self.assertTrue(os.path.islink(self.dst_file))
2064        self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2065
2066    @os_helper.skip_unless_symlink
2067    @mock_rename
2068    def test_move_file_symlink_to_dir(self):
2069        filename = "bar"
2070        dst = os.path.join(self.src_dir, filename)
2071        os.symlink(self.src_file, dst)
2072        shutil.move(dst, self.dst_dir)
2073        final_link = os.path.join(self.dst_dir, filename)
2074        self.assertTrue(os.path.islink(final_link))
2075        self.assertTrue(os.path.samefile(self.src_file, final_link))
2076
2077    @os_helper.skip_unless_symlink
2078    @mock_rename
2079    def test_move_dangling_symlink(self):
2080        src = os.path.join(self.src_dir, 'baz')
2081        dst = os.path.join(self.src_dir, 'bar')
2082        os.symlink(src, dst)
2083        dst_link = os.path.join(self.dst_dir, 'quux')
2084        shutil.move(dst, dst_link)
2085        self.assertTrue(os.path.islink(dst_link))
2086        self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
2087
2088    @os_helper.skip_unless_symlink
2089    @mock_rename
2090    def test_move_dir_symlink(self):
2091        src = os.path.join(self.src_dir, 'baz')
2092        dst = os.path.join(self.src_dir, 'bar')
2093        os.mkdir(src)
2094        os.symlink(src, dst)
2095        dst_link = os.path.join(self.dst_dir, 'quux')
2096        shutil.move(dst, dst_link)
2097        self.assertTrue(os.path.islink(dst_link))
2098        self.assertTrue(os.path.samefile(src, dst_link))
2099
2100    def test_move_return_value(self):
2101        rv = shutil.move(self.src_file, self.dst_dir)
2102        self.assertEqual(rv,
2103                os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2104
2105    def test_move_as_rename_return_value(self):
2106        rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2107        self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2108
2109    @mock_rename
2110    def test_move_file_special_function(self):
2111        moved = []
2112        def _copy(src, dst):
2113            moved.append((src, dst))
2114        shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2115        self.assertEqual(len(moved), 1)
2116
2117    @mock_rename
2118    def test_move_dir_special_function(self):
2119        moved = []
2120        def _copy(src, dst):
2121            moved.append((src, dst))
2122        os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2123        os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
2124        shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2125        self.assertEqual(len(moved), 3)
2126
2127    def test_move_dir_caseinsensitive(self):
2128        # Renames a folder to the same name
2129        # but a different case.
2130
2131        self.src_dir = self.mkdtemp()
2132        dst_dir = os.path.join(
2133                os.path.dirname(self.src_dir),
2134                os.path.basename(self.src_dir).upper())
2135        self.assertNotEqual(self.src_dir, dst_dir)
2136
2137        try:
2138            shutil.move(self.src_dir, dst_dir)
2139            self.assertTrue(os.path.isdir(dst_dir))
2140        finally:
2141            os.rmdir(dst_dir)
2142
2143
2144    @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0
2145                         and hasattr(os, 'lchflags')
2146                         and hasattr(stat, 'SF_IMMUTABLE')
2147                         and hasattr(stat, 'UF_OPAQUE'),
2148                         'root privileges required')
2149    def test_move_dir_permission_denied(self):
2150        # bpo-42782: shutil.move should not create destination directories
2151        # if the source directory cannot be removed.
2152        try:
2153            os.mkdir(TESTFN_SRC)
2154            os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2155
2156            # Testing on an empty immutable directory
2157            # TESTFN_DST should not exist if shutil.move failed
2158            self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2159            self.assertFalse(TESTFN_DST in os.listdir())
2160
2161            # Create a file and keep the directory immutable
2162            os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2163            os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2164            os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2165
2166            # Testing on a non-empty immutable directory
2167            # TESTFN_DST should not exist if shutil.move failed
2168            self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2169            self.assertFalse(TESTFN_DST in os.listdir())
2170        finally:
2171            if os.path.exists(TESTFN_SRC):
2172                os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2173                os_helper.rmtree(TESTFN_SRC)
2174            if os.path.exists(TESTFN_DST):
2175                os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2176                os_helper.rmtree(TESTFN_DST)
2177
2178
2179class TestCopyFile(unittest.TestCase):
2180
2181    class Faux(object):
2182        _entered = False
2183        _exited_with = None
2184        _raised = False
2185        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2186            self._raise_in_exit = raise_in_exit
2187            self._suppress_at_exit = suppress_at_exit
2188        def read(self, *args):
2189            return ''
2190        def __enter__(self):
2191            self._entered = True
2192        def __exit__(self, exc_type, exc_val, exc_tb):
2193            self._exited_with = exc_type, exc_val, exc_tb
2194            if self._raise_in_exit:
2195                self._raised = True
2196                raise OSError("Cannot close")
2197            return self._suppress_at_exit
2198
2199    def test_w_source_open_fails(self):
2200        def _open(filename, mode='r'):
2201            if filename == 'srcfile':
2202                raise OSError('Cannot open "srcfile"')
2203            assert 0  # shouldn't reach here.
2204
2205        with support.swap_attr(shutil, 'open', _open):
2206            with self.assertRaises(OSError):
2207                shutil.copyfile('srcfile', 'destfile')
2208
2209    @unittest.skipIf(MACOS, "skipped on macOS")
2210    def test_w_dest_open_fails(self):
2211        srcfile = self.Faux()
2212
2213        def _open(filename, mode='r'):
2214            if filename == 'srcfile':
2215                return srcfile
2216            if filename == 'destfile':
2217                raise OSError('Cannot open "destfile"')
2218            assert 0  # shouldn't reach here.
2219
2220        with support.swap_attr(shutil, 'open', _open):
2221            shutil.copyfile('srcfile', 'destfile')
2222        self.assertTrue(srcfile._entered)
2223        self.assertTrue(srcfile._exited_with[0] is OSError)
2224        self.assertEqual(srcfile._exited_with[1].args,
2225                         ('Cannot open "destfile"',))
2226
2227    @unittest.skipIf(MACOS, "skipped on macOS")
2228    def test_w_dest_close_fails(self):
2229        srcfile = self.Faux()
2230        destfile = self.Faux(True)
2231
2232        def _open(filename, mode='r'):
2233            if filename == 'srcfile':
2234                return srcfile
2235            if filename == 'destfile':
2236                return destfile
2237            assert 0  # shouldn't reach here.
2238
2239        with support.swap_attr(shutil, 'open', _open):
2240            shutil.copyfile('srcfile', 'destfile')
2241        self.assertTrue(srcfile._entered)
2242        self.assertTrue(destfile._entered)
2243        self.assertTrue(destfile._raised)
2244        self.assertTrue(srcfile._exited_with[0] is OSError)
2245        self.assertEqual(srcfile._exited_with[1].args,
2246                         ('Cannot close',))
2247
2248    @unittest.skipIf(MACOS, "skipped on macOS")
2249    def test_w_source_close_fails(self):
2250
2251        srcfile = self.Faux(True)
2252        destfile = self.Faux()
2253
2254        def _open(filename, mode='r'):
2255            if filename == 'srcfile':
2256                return srcfile
2257            if filename == 'destfile':
2258                return destfile
2259            assert 0  # shouldn't reach here.
2260
2261        with support.swap_attr(shutil, 'open', _open):
2262            with self.assertRaises(OSError):
2263                shutil.copyfile('srcfile', 'destfile')
2264        self.assertTrue(srcfile._entered)
2265        self.assertTrue(destfile._entered)
2266        self.assertFalse(destfile._raised)
2267        self.assertTrue(srcfile._exited_with[0] is None)
2268        self.assertTrue(srcfile._raised)
2269
2270
2271class TestCopyFileObj(unittest.TestCase):
2272    FILESIZE = 2 * 1024 * 1024
2273
2274    @classmethod
2275    def setUpClass(cls):
2276        write_test_file(TESTFN, cls.FILESIZE)
2277
2278    @classmethod
2279    def tearDownClass(cls):
2280        os_helper.unlink(TESTFN)
2281        os_helper.unlink(TESTFN2)
2282
2283    def tearDown(self):
2284        os_helper.unlink(TESTFN2)
2285
2286    @contextlib.contextmanager
2287    def get_files(self):
2288        with open(TESTFN, "rb") as src:
2289            with open(TESTFN2, "wb") as dst:
2290                yield (src, dst)
2291
2292    def assert_files_eq(self, src, dst):
2293        with open(src, 'rb') as fsrc:
2294            with open(dst, 'rb') as fdst:
2295                self.assertEqual(fsrc.read(), fdst.read())
2296
2297    def test_content(self):
2298        with self.get_files() as (src, dst):
2299            shutil.copyfileobj(src, dst)
2300        self.assert_files_eq(TESTFN, TESTFN2)
2301
2302    def test_file_not_closed(self):
2303        with self.get_files() as (src, dst):
2304            shutil.copyfileobj(src, dst)
2305            assert not src.closed
2306            assert not dst.closed
2307
2308    def test_file_offset(self):
2309        with self.get_files() as (src, dst):
2310            shutil.copyfileobj(src, dst)
2311            self.assertEqual(src.tell(), self.FILESIZE)
2312            self.assertEqual(dst.tell(), self.FILESIZE)
2313
2314    @unittest.skipIf(os.name != 'nt', "Windows only")
2315    def test_win_impl(self):
2316        # Make sure alternate Windows implementation is called.
2317        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2318            shutil.copyfile(TESTFN, TESTFN2)
2319        assert m.called
2320
2321        # File size is 2 MiB but max buf size should be 1 MiB.
2322        self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2323
2324        # If file size < 1 MiB memoryview() length must be equal to
2325        # the actual file size.
2326        with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
2327            f.write(b'foo')
2328        fname = f.name
2329        self.addCleanup(os_helper.unlink, fname)
2330        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2331            shutil.copyfile(fname, TESTFN2)
2332        self.assertEqual(m.call_args[0][2], 3)
2333
2334        # Empty files should not rely on readinto() variant.
2335        with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
2336            pass
2337        fname = f.name
2338        self.addCleanup(os_helper.unlink, fname)
2339        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2340            shutil.copyfile(fname, TESTFN2)
2341        assert not m.called
2342        self.assert_files_eq(fname, TESTFN2)
2343
2344
2345class _ZeroCopyFileTest(object):
2346    """Tests common to all zero-copy APIs."""
2347    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
2348    FILEDATA = b""
2349    PATCHPOINT = ""
2350
2351    @classmethod
2352    def setUpClass(cls):
2353        write_test_file(TESTFN, cls.FILESIZE)
2354        with open(TESTFN, 'rb') as f:
2355            cls.FILEDATA = f.read()
2356            assert len(cls.FILEDATA) == cls.FILESIZE
2357
2358    @classmethod
2359    def tearDownClass(cls):
2360        os_helper.unlink(TESTFN)
2361
2362    def tearDown(self):
2363        os_helper.unlink(TESTFN2)
2364
2365    @contextlib.contextmanager
2366    def get_files(self):
2367        with open(TESTFN, "rb") as src:
2368            with open(TESTFN2, "wb") as dst:
2369                yield (src, dst)
2370
2371    def zerocopy_fun(self, *args, **kwargs):
2372        raise NotImplementedError("must be implemented in subclass")
2373
2374    def reset(self):
2375        self.tearDown()
2376        self.tearDownClass()
2377        self.setUpClass()
2378        self.setUp()
2379
2380    # ---
2381
2382    def test_regular_copy(self):
2383        with self.get_files() as (src, dst):
2384            self.zerocopy_fun(src, dst)
2385        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2386        # Make sure the fallback function is not called.
2387        with self.get_files() as (src, dst):
2388            with unittest.mock.patch('shutil.copyfileobj') as m:
2389                shutil.copyfile(TESTFN, TESTFN2)
2390            assert not m.called
2391
2392    def test_same_file(self):
2393        self.addCleanup(self.reset)
2394        with self.get_files() as (src, dst):
2395            with self.assertRaises(Exception):
2396                self.zerocopy_fun(src, src)
2397        # Make sure src file is not corrupted.
2398        self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2399
2400    def test_non_existent_src(self):
2401        name = tempfile.mktemp(dir=os.getcwd())
2402        with self.assertRaises(FileNotFoundError) as cm:
2403            shutil.copyfile(name, "new")
2404        self.assertEqual(cm.exception.filename, name)
2405
2406    def test_empty_file(self):
2407        srcname = TESTFN + 'src'
2408        dstname = TESTFN + 'dst'
2409        self.addCleanup(lambda: os_helper.unlink(srcname))
2410        self.addCleanup(lambda: os_helper.unlink(dstname))
2411        with open(srcname, "wb"):
2412            pass
2413
2414        with open(srcname, "rb") as src:
2415            with open(dstname, "wb") as dst:
2416                self.zerocopy_fun(src, dst)
2417
2418        self.assertEqual(read_file(dstname, binary=True), b"")
2419
2420    def test_unhandled_exception(self):
2421        with unittest.mock.patch(self.PATCHPOINT,
2422                                 side_effect=ZeroDivisionError):
2423            self.assertRaises(ZeroDivisionError,
2424                              shutil.copyfile, TESTFN, TESTFN2)
2425
2426    def test_exception_on_first_call(self):
2427        # Emulate a case where the first call to the zero-copy
2428        # function raises an exception in which case the function is
2429        # supposed to give up immediately.
2430        with unittest.mock.patch(self.PATCHPOINT,
2431                                 side_effect=OSError(errno.EINVAL, "yo")):
2432            with self.get_files() as (src, dst):
2433                with self.assertRaises(_GiveupOnFastCopy):
2434                    self.zerocopy_fun(src, dst)
2435
2436    def test_filesystem_full(self):
2437        # Emulate a case where filesystem is full and sendfile() fails
2438        # on first call.
2439        with unittest.mock.patch(self.PATCHPOINT,
2440                                 side_effect=OSError(errno.ENOSPC, "yo")):
2441            with self.get_files() as (src, dst):
2442                self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2443
2444
2445@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2446class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2447    PATCHPOINT = "os.sendfile"
2448
2449    def zerocopy_fun(self, fsrc, fdst):
2450        return shutil._fastcopy_sendfile(fsrc, fdst)
2451
2452    def test_non_regular_file_src(self):
2453        with io.BytesIO(self.FILEDATA) as src:
2454            with open(TESTFN2, "wb") as dst:
2455                with self.assertRaises(_GiveupOnFastCopy):
2456                    self.zerocopy_fun(src, dst)
2457                shutil.copyfileobj(src, dst)
2458
2459        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2460
2461    def test_non_regular_file_dst(self):
2462        with open(TESTFN, "rb") as src:
2463            with io.BytesIO() as dst:
2464                with self.assertRaises(_GiveupOnFastCopy):
2465                    self.zerocopy_fun(src, dst)
2466                shutil.copyfileobj(src, dst)
2467                dst.seek(0)
2468                self.assertEqual(dst.read(), self.FILEDATA)
2469
2470    def test_exception_on_second_call(self):
2471        def sendfile(*args, **kwargs):
2472            if not flag:
2473                flag.append(None)
2474                return orig_sendfile(*args, **kwargs)
2475            else:
2476                raise OSError(errno.EBADF, "yo")
2477
2478        flag = []
2479        orig_sendfile = os.sendfile
2480        with unittest.mock.patch('os.sendfile', create=True,
2481                                 side_effect=sendfile):
2482            with self.get_files() as (src, dst):
2483                with self.assertRaises(OSError) as cm:
2484                    shutil._fastcopy_sendfile(src, dst)
2485        assert flag
2486        self.assertEqual(cm.exception.errno, errno.EBADF)
2487
2488    def test_cant_get_size(self):
2489        # Emulate a case where src file size cannot be determined.
2490        # Internally bufsize will be set to a small value and
2491        # sendfile() will be called repeatedly.
2492        with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2493            with self.get_files() as (src, dst):
2494                shutil._fastcopy_sendfile(src, dst)
2495                assert m.called
2496        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2497
2498    def test_small_chunks(self):
2499        # Force internal file size detection to be smaller than the
2500        # actual file size. We want to force sendfile() to be called
2501        # multiple times, also in order to emulate a src fd which gets
2502        # bigger while it is being copied.
2503        mock = unittest.mock.Mock()
2504        mock.st_size = 65536 + 1
2505        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2506            with self.get_files() as (src, dst):
2507                shutil._fastcopy_sendfile(src, dst)
2508                assert m.called
2509        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2510
2511    def test_big_chunk(self):
2512        # Force internal file size detection to be +100MB bigger than
2513        # the actual file size. Make sure sendfile() does not rely on
2514        # file size value except for (maybe) a better throughput /
2515        # performance.
2516        mock = unittest.mock.Mock()
2517        mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2518        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2519            with self.get_files() as (src, dst):
2520                shutil._fastcopy_sendfile(src, dst)
2521                assert m.called
2522        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2523
2524    def test_blocksize_arg(self):
2525        with unittest.mock.patch('os.sendfile',
2526                                 side_effect=ZeroDivisionError) as m:
2527            self.assertRaises(ZeroDivisionError,
2528                              shutil.copyfile, TESTFN, TESTFN2)
2529            blocksize = m.call_args[0][3]
2530            # Make sure file size and the block size arg passed to
2531            # sendfile() are the same.
2532            self.assertEqual(blocksize, os.path.getsize(TESTFN))
2533            # ...unless we're dealing with a small file.
2534            os_helper.unlink(TESTFN2)
2535            write_file(TESTFN2, b"hello", binary=True)
2536            self.addCleanup(os_helper.unlink, TESTFN2 + '3')
2537            self.assertRaises(ZeroDivisionError,
2538                              shutil.copyfile, TESTFN2, TESTFN2 + '3')
2539            blocksize = m.call_args[0][3]
2540            self.assertEqual(blocksize, 2 ** 23)
2541
2542    def test_file2file_not_supported(self):
2543        # Emulate a case where sendfile() only support file->socket
2544        # fds. In such a case copyfile() is supposed to skip the
2545        # fast-copy attempt from then on.
2546        assert shutil._USE_CP_SENDFILE
2547        try:
2548            with unittest.mock.patch(
2549                    self.PATCHPOINT,
2550                    side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2551                with self.get_files() as (src, dst):
2552                    with self.assertRaises(_GiveupOnFastCopy):
2553                        shutil._fastcopy_sendfile(src, dst)
2554                assert m.called
2555            assert not shutil._USE_CP_SENDFILE
2556
2557            with unittest.mock.patch(self.PATCHPOINT) as m:
2558                shutil.copyfile(TESTFN, TESTFN2)
2559                assert not m.called
2560        finally:
2561            shutil._USE_CP_SENDFILE = True
2562
2563
2564@unittest.skipIf(not MACOS, 'macOS only')
2565class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
2566    PATCHPOINT = "posix._fcopyfile"
2567
2568    def zerocopy_fun(self, src, dst):
2569        return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
2570
2571
2572class TestGetTerminalSize(unittest.TestCase):
2573    def test_does_not_crash(self):
2574        """Check if get_terminal_size() returns a meaningful value.
2575
2576        There's no easy portable way to actually check the size of the
2577        terminal, so let's check if it returns something sensible instead.
2578        """
2579        size = shutil.get_terminal_size()
2580        self.assertGreaterEqual(size.columns, 0)
2581        self.assertGreaterEqual(size.lines, 0)
2582
2583    def test_os_environ_first(self):
2584        "Check if environment variables have precedence"
2585
2586        with os_helper.EnvironmentVarGuard() as env:
2587            env['COLUMNS'] = '777'
2588            del env['LINES']
2589            size = shutil.get_terminal_size()
2590        self.assertEqual(size.columns, 777)
2591
2592        with os_helper.EnvironmentVarGuard() as env:
2593            del env['COLUMNS']
2594            env['LINES'] = '888'
2595            size = shutil.get_terminal_size()
2596        self.assertEqual(size.lines, 888)
2597
2598    def test_bad_environ(self):
2599        with os_helper.EnvironmentVarGuard() as env:
2600            env['COLUMNS'] = 'xxx'
2601            env['LINES'] = 'yyy'
2602            size = shutil.get_terminal_size()
2603        self.assertGreaterEqual(size.columns, 0)
2604        self.assertGreaterEqual(size.lines, 0)
2605
2606    @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
2607    @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2608                         'need os.get_terminal_size()')
2609    def test_stty_match(self):
2610        """Check if stty returns the same results ignoring env
2611
2612        This test will fail if stdin and stdout are connected to
2613        different terminals with different sizes. Nevertheless, such
2614        situations should be pretty rare.
2615        """
2616        try:
2617            size = subprocess.check_output(['stty', 'size']).decode().split()
2618        except (FileNotFoundError, PermissionError,
2619                subprocess.CalledProcessError):
2620            self.skipTest("stty invocation failed")
2621        expected = (int(size[1]), int(size[0])) # reversed order
2622
2623        with os_helper.EnvironmentVarGuard() as env:
2624            del env['LINES']
2625            del env['COLUMNS']
2626            actual = shutil.get_terminal_size()
2627
2628        self.assertEqual(expected, actual)
2629
2630    def test_fallback(self):
2631        with os_helper.EnvironmentVarGuard() as env:
2632            del env['LINES']
2633            del env['COLUMNS']
2634
2635            # sys.__stdout__ has no fileno()
2636            with support.swap_attr(sys, '__stdout__', None):
2637                size = shutil.get_terminal_size(fallback=(10, 20))
2638            self.assertEqual(size.columns, 10)
2639            self.assertEqual(size.lines, 20)
2640
2641            # sys.__stdout__ is not a terminal on Unix
2642            # or fileno() not in (0, 1, 2) on Windows
2643            with open(os.devnull, 'w', encoding='utf-8') as f, \
2644                 support.swap_attr(sys, '__stdout__', f):
2645                size = shutil.get_terminal_size(fallback=(30, 40))
2646            self.assertEqual(size.columns, 30)
2647            self.assertEqual(size.lines, 40)
2648
2649
2650class PublicAPITests(unittest.TestCase):
2651    """Ensures that the correct values are exposed in the public API."""
2652
2653    def test_module_all_attribute(self):
2654        self.assertTrue(hasattr(shutil, '__all__'))
2655        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2656                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2657                      'SpecialFileError', 'ExecError', 'make_archive',
2658                      'get_archive_formats', 'register_archive_format',
2659                      'unregister_archive_format', 'get_unpack_formats',
2660                      'register_unpack_format', 'unregister_unpack_format',
2661                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
2662                      'get_terminal_size', 'SameFileError']
2663        if hasattr(os, 'statvfs') or os.name == 'nt':
2664            target_api.append('disk_usage')
2665        self.assertEqual(set(shutil.__all__), set(target_api))
2666
2667
2668if __name__ == '__main__':
2669    unittest.main()
2670