• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import unittest.mock
5import shutil
6import tempfile
7import sys
8import stat
9import os
10import os.path
11import errno
12import functools
13import pathlib
14import subprocess
15import random
16import string
17import contextlib
18import io
19from shutil import (make_archive,
20                    register_archive_format, unregister_archive_format,
21                    get_archive_formats, Error, unpack_archive,
22                    register_unpack_format, RegistryError,
23                    unregister_unpack_format, get_unpack_formats,
24                    SameFileError, _GiveupOnFastCopy)
25import tarfile
26import zipfile
27try:
28    import posix
29except ImportError:
30    posix = None
31
32from test import support
33from test.support import TESTFN, FakePath
34
35TESTFN2 = TESTFN + "2"
36MACOS = sys.platform.startswith("darwin")
37AIX = sys.platform[:3] == 'aix'
38try:
39    import grp
40    import pwd
41    UID_GID_SUPPORT = True
42except ImportError:
43    UID_GID_SUPPORT = False
44
45try:
46    import _winapi
47except ImportError:
48    _winapi = None
49
50def _fake_rename(*args, **kwargs):
51    # Pretend the destination path is on a different filesystem.
52    raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
53
54def mock_rename(func):
55    @functools.wraps(func)
56    def wrap(*args, **kwargs):
57        try:
58            builtin_rename = os.rename
59            os.rename = _fake_rename
60            return func(*args, **kwargs)
61        finally:
62            os.rename = builtin_rename
63    return wrap
64
65def write_file(path, content, binary=False):
66    """Write *content* to a file located at *path*.
67
68    If *path* is a tuple instead of a string, os.path.join will be used to
69    make a path.  If *binary* is true, the file will be opened in binary
70    mode.
71    """
72    if isinstance(path, tuple):
73        path = os.path.join(*path)
74    with open(path, 'wb' if binary else 'w') as fp:
75        fp.write(content)
76
77def write_test_file(path, size):
78    """Create a test file with an arbitrary size and random text content."""
79    def chunks(total, step):
80        assert total >= step
81        while total > step:
82            yield step
83            total -= step
84        if total:
85            yield total
86
87    bufsize = min(size, 8192)
88    chunk = b"".join([random.choice(string.ascii_letters).encode()
89                      for i in range(bufsize)])
90    with open(path, 'wb') as f:
91        for csize in chunks(size, bufsize):
92            f.write(chunk)
93    assert os.path.getsize(path) == size
94
95def read_file(path, binary=False):
96    """Return contents from a file located at *path*.
97
98    If *path* is a tuple instead of a string, os.path.join will be used to
99    make a path.  If *binary* is true, the file will be opened in binary
100    mode.
101    """
102    if isinstance(path, tuple):
103        path = os.path.join(*path)
104    with open(path, 'rb' if binary else 'r') as fp:
105        return fp.read()
106
107def rlistdir(path):
108    res = []
109    for name in sorted(os.listdir(path)):
110        p = os.path.join(path, name)
111        if os.path.isdir(p) and not os.path.islink(p):
112            res.append(name + '/')
113            for n in rlistdir(p):
114                res.append(name + '/' + n)
115        else:
116            res.append(name)
117    return res
118
119def supports_file2file_sendfile():
120    # ...apparently Linux and Solaris are the only ones
121    if not hasattr(os, "sendfile"):
122        return False
123    srcname = None
124    dstname = None
125    try:
126        with tempfile.NamedTemporaryFile("wb", delete=False) as f:
127            srcname = f.name
128            f.write(b"0123456789")
129
130        with open(srcname, "rb") as src:
131            with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
132                dstname = dst.name
133                infd = src.fileno()
134                outfd = dst.fileno()
135                try:
136                    os.sendfile(outfd, infd, 0, 2)
137                except OSError:
138                    return False
139                else:
140                    return True
141    finally:
142        if srcname is not None:
143            support.unlink(srcname)
144        if dstname is not None:
145            support.unlink(dstname)
146
147
148SUPPORTS_SENDFILE = supports_file2file_sendfile()
149
150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
151# The AIX command 'dump -o program' gives XCOFF header information
152# The second word of the last line in the maxdata value
153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
154def _maxdataOK():
155    if AIX and sys.maxsize == 2147483647:
156        hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
157        maxdata=hdrs.split("\n")[-1].split()[1]
158        return int(maxdata,16) >= 0x20000000
159    else:
160        return True
161
162class TestShutil(unittest.TestCase):
163
164    def setUp(self):
165        super(TestShutil, self).setUp()
166        self.tempdirs = []
167
168    def tearDown(self):
169        super(TestShutil, self).tearDown()
170        while self.tempdirs:
171            d = self.tempdirs.pop()
172            shutil.rmtree(d, os.name in ('nt', 'cygwin'))
173
174
175    def mkdtemp(self):
176        """Create a temporary directory that will be cleaned up.
177
178        Returns the path of the directory.
179        """
180        basedir = None
181        if sys.platform == "win32":
182            basedir = os.path.realpath(os.getcwd())
183        d = tempfile.mkdtemp(dir=basedir)
184        self.tempdirs.append(d)
185        return d
186
187    def test_rmtree_works_on_bytes(self):
188        tmp = self.mkdtemp()
189        victim = os.path.join(tmp, 'killme')
190        os.mkdir(victim)
191        write_file(os.path.join(victim, 'somefile'), 'foo')
192        victim = os.fsencode(victim)
193        self.assertIsInstance(victim, bytes)
194        shutil.rmtree(victim)
195
196    @support.skip_unless_symlink
197    def test_rmtree_fails_on_symlink(self):
198        tmp = self.mkdtemp()
199        dir_ = os.path.join(tmp, 'dir')
200        os.mkdir(dir_)
201        link = os.path.join(tmp, 'link')
202        os.symlink(dir_, link)
203        self.assertRaises(OSError, shutil.rmtree, link)
204        self.assertTrue(os.path.exists(dir_))
205        self.assertTrue(os.path.lexists(link))
206        errors = []
207        def onerror(*args):
208            errors.append(args)
209        shutil.rmtree(link, onerror=onerror)
210        self.assertEqual(len(errors), 1)
211        self.assertIs(errors[0][0], os.path.islink)
212        self.assertEqual(errors[0][1], link)
213        self.assertIsInstance(errors[0][2][1], OSError)
214
215    @support.skip_unless_symlink
216    def test_rmtree_works_on_symlinks(self):
217        tmp = self.mkdtemp()
218        dir1 = os.path.join(tmp, 'dir1')
219        dir2 = os.path.join(dir1, 'dir2')
220        dir3 = os.path.join(tmp, 'dir3')
221        for d in dir1, dir2, dir3:
222            os.mkdir(d)
223        file1 = os.path.join(tmp, 'file1')
224        write_file(file1, 'foo')
225        link1 = os.path.join(dir1, 'link1')
226        os.symlink(dir2, link1)
227        link2 = os.path.join(dir1, 'link2')
228        os.symlink(dir3, link2)
229        link3 = os.path.join(dir1, 'link3')
230        os.symlink(file1, link3)
231        # make sure symlinks are removed but not followed
232        shutil.rmtree(dir1)
233        self.assertFalse(os.path.exists(dir1))
234        self.assertTrue(os.path.exists(dir3))
235        self.assertTrue(os.path.exists(file1))
236
237    @unittest.skipUnless(_winapi, 'only relevant on Windows')
238    def test_rmtree_fails_on_junctions(self):
239        tmp = self.mkdtemp()
240        dir_ = os.path.join(tmp, 'dir')
241        os.mkdir(dir_)
242        link = os.path.join(tmp, 'link')
243        _winapi.CreateJunction(dir_, link)
244        self.assertRaises(OSError, shutil.rmtree, link)
245        self.assertTrue(os.path.exists(dir_))
246        self.assertTrue(os.path.lexists(link))
247        errors = []
248        def onerror(*args):
249            errors.append(args)
250        shutil.rmtree(link, onerror=onerror)
251        self.assertEqual(len(errors), 1)
252        self.assertIs(errors[0][0], os.path.islink)
253        self.assertEqual(errors[0][1], link)
254        self.assertIsInstance(errors[0][2][1], OSError)
255
256    @unittest.skipUnless(_winapi, 'only relevant on Windows')
257    def test_rmtree_works_on_junctions(self):
258        tmp = self.mkdtemp()
259        dir1 = os.path.join(tmp, 'dir1')
260        dir2 = os.path.join(dir1, 'dir2')
261        dir3 = os.path.join(tmp, 'dir3')
262        for d in dir1, dir2, dir3:
263            os.mkdir(d)
264        file1 = os.path.join(tmp, 'file1')
265        write_file(file1, 'foo')
266        link1 = os.path.join(dir1, 'link1')
267        _winapi.CreateJunction(dir2, link1)
268        link2 = os.path.join(dir1, 'link2')
269        _winapi.CreateJunction(dir3, link2)
270        link3 = os.path.join(dir1, 'link3')
271        _winapi.CreateJunction(file1, link3)
272        # make sure junctions are removed but not followed
273        shutil.rmtree(dir1)
274        self.assertFalse(os.path.exists(dir1))
275        self.assertTrue(os.path.exists(dir3))
276        self.assertTrue(os.path.exists(file1))
277
278    def test_rmtree_errors(self):
279        # filename is guaranteed not to exist
280        filename = tempfile.mktemp()
281        self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
282        # test that ignore_errors option is honored
283        shutil.rmtree(filename, ignore_errors=True)
284
285        # existing file
286        tmpdir = self.mkdtemp()
287        write_file((tmpdir, "tstfile"), "")
288        filename = os.path.join(tmpdir, "tstfile")
289        with self.assertRaises(NotADirectoryError) as cm:
290            shutil.rmtree(filename)
291        # The reason for this rather odd construct is that Windows sprinkles
292        # a \*.* at the end of file names. But only sometimes on some buildbots
293        possible_args = [filename, os.path.join(filename, '*.*')]
294        self.assertIn(cm.exception.filename, possible_args)
295        self.assertTrue(os.path.exists(filename))
296        # test that ignore_errors option is honored
297        shutil.rmtree(filename, ignore_errors=True)
298        self.assertTrue(os.path.exists(filename))
299        errors = []
300        def onerror(*args):
301            errors.append(args)
302        shutil.rmtree(filename, onerror=onerror)
303        self.assertEqual(len(errors), 2)
304        self.assertIs(errors[0][0], os.scandir)
305        self.assertEqual(errors[0][1], filename)
306        self.assertIsInstance(errors[0][2][1], NotADirectoryError)
307        self.assertIn(errors[0][2][1].filename, possible_args)
308        self.assertIs(errors[1][0], os.rmdir)
309        self.assertEqual(errors[1][1], filename)
310        self.assertIsInstance(errors[1][2][1], NotADirectoryError)
311        self.assertIn(errors[1][2][1].filename, possible_args)
312
313
314    @unittest.skipIf(sys.platform[:6] == 'cygwin',
315                     "This test can't be run on Cygwin (issue #1071513).")
316    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
317                     "This test can't be run reliably as root (issue #1076467).")
318    def test_on_error(self):
319        self.errorState = 0
320        os.mkdir(TESTFN)
321        self.addCleanup(shutil.rmtree, TESTFN)
322
323        self.child_file_path = os.path.join(TESTFN, 'a')
324        self.child_dir_path = os.path.join(TESTFN, 'b')
325        support.create_empty_file(self.child_file_path)
326        os.mkdir(self.child_dir_path)
327        old_dir_mode = os.stat(TESTFN).st_mode
328        old_child_file_mode = os.stat(self.child_file_path).st_mode
329        old_child_dir_mode = os.stat(self.child_dir_path).st_mode
330        # Make unwritable.
331        new_mode = stat.S_IREAD|stat.S_IEXEC
332        os.chmod(self.child_file_path, new_mode)
333        os.chmod(self.child_dir_path, new_mode)
334        os.chmod(TESTFN, new_mode)
335
336        self.addCleanup(os.chmod, TESTFN, old_dir_mode)
337        self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
338        self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
339
340        shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
341        # Test whether onerror has actually been called.
342        self.assertEqual(self.errorState, 3,
343                         "Expected call to onerror function did not happen.")
344
345    def check_args_to_onerror(self, func, arg, exc):
346        # test_rmtree_errors deliberately runs rmtree
347        # on a directory that is chmod 500, which will fail.
348        # This function is run when shutil.rmtree fails.
349        # 99.9% of the time it initially fails to remove
350        # a file in the directory, so the first time through
351        # func is os.remove.
352        # However, some Linux machines running ZFS on
353        # FUSE experienced a failure earlier in the process
354        # at os.listdir.  The first failure may legally
355        # be either.
356        if self.errorState < 2:
357            if func is os.unlink:
358                self.assertEqual(arg, self.child_file_path)
359            elif func is os.rmdir:
360                self.assertEqual(arg, self.child_dir_path)
361            else:
362                self.assertIs(func, os.listdir)
363                self.assertIn(arg, [TESTFN, self.child_dir_path])
364            self.assertTrue(issubclass(exc[0], OSError))
365            self.errorState += 1
366        else:
367            self.assertEqual(func, os.rmdir)
368            self.assertEqual(arg, TESTFN)
369            self.assertTrue(issubclass(exc[0], OSError))
370            self.errorState = 3
371
372    def test_rmtree_does_not_choke_on_failing_lstat(self):
373        try:
374            orig_lstat = os.lstat
375            def raiser(fn, *args, **kwargs):
376                if fn != TESTFN:
377                    raise OSError()
378                else:
379                    return orig_lstat(fn)
380            os.lstat = raiser
381
382            os.mkdir(TESTFN)
383            write_file((TESTFN, 'foo'), 'foo')
384            shutil.rmtree(TESTFN)
385        finally:
386            os.lstat = orig_lstat
387
388    @support.skip_unless_symlink
389    def test_copymode_follow_symlinks(self):
390        tmp_dir = self.mkdtemp()
391        src = os.path.join(tmp_dir, 'foo')
392        dst = os.path.join(tmp_dir, 'bar')
393        src_link = os.path.join(tmp_dir, 'baz')
394        dst_link = os.path.join(tmp_dir, 'quux')
395        write_file(src, 'foo')
396        write_file(dst, 'foo')
397        os.symlink(src, src_link)
398        os.symlink(dst, dst_link)
399        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
400        # file to file
401        os.chmod(dst, stat.S_IRWXO)
402        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
403        shutil.copymode(src, dst)
404        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
405        # On Windows, os.chmod does not follow symlinks (issue #15411)
406        if os.name != 'nt':
407            # follow src link
408            os.chmod(dst, stat.S_IRWXO)
409            shutil.copymode(src_link, dst)
410            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
411            # follow dst link
412            os.chmod(dst, stat.S_IRWXO)
413            shutil.copymode(src, dst_link)
414            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
415            # follow both links
416            os.chmod(dst, stat.S_IRWXO)
417            shutil.copymode(src_link, dst_link)
418            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
419
420    @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
421    @support.skip_unless_symlink
422    def test_copymode_symlink_to_symlink(self):
423        tmp_dir = self.mkdtemp()
424        src = os.path.join(tmp_dir, 'foo')
425        dst = os.path.join(tmp_dir, 'bar')
426        src_link = os.path.join(tmp_dir, 'baz')
427        dst_link = os.path.join(tmp_dir, 'quux')
428        write_file(src, 'foo')
429        write_file(dst, 'foo')
430        os.symlink(src, src_link)
431        os.symlink(dst, dst_link)
432        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
433        os.chmod(dst, stat.S_IRWXU)
434        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
435        # link to link
436        os.lchmod(dst_link, stat.S_IRWXO)
437        shutil.copymode(src_link, dst_link, follow_symlinks=False)
438        self.assertEqual(os.lstat(src_link).st_mode,
439                         os.lstat(dst_link).st_mode)
440        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
441        # src link - use chmod
442        os.lchmod(dst_link, stat.S_IRWXO)
443        shutil.copymode(src_link, dst, follow_symlinks=False)
444        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
445        # dst link - use chmod
446        os.lchmod(dst_link, stat.S_IRWXO)
447        shutil.copymode(src, dst_link, follow_symlinks=False)
448        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
449
450    @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
451    @support.skip_unless_symlink
452    def test_copymode_symlink_to_symlink_wo_lchmod(self):
453        tmp_dir = self.mkdtemp()
454        src = os.path.join(tmp_dir, 'foo')
455        dst = os.path.join(tmp_dir, 'bar')
456        src_link = os.path.join(tmp_dir, 'baz')
457        dst_link = os.path.join(tmp_dir, 'quux')
458        write_file(src, 'foo')
459        write_file(dst, 'foo')
460        os.symlink(src, src_link)
461        os.symlink(dst, dst_link)
462        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
463
464    @support.skip_unless_symlink
465    def test_copystat_symlinks(self):
466        tmp_dir = self.mkdtemp()
467        src = os.path.join(tmp_dir, 'foo')
468        dst = os.path.join(tmp_dir, 'bar')
469        src_link = os.path.join(tmp_dir, 'baz')
470        dst_link = os.path.join(tmp_dir, 'qux')
471        write_file(src, 'foo')
472        src_stat = os.stat(src)
473        os.utime(src, (src_stat.st_atime,
474                       src_stat.st_mtime - 42.0))  # ensure different mtimes
475        write_file(dst, 'bar')
476        self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
477        os.symlink(src, src_link)
478        os.symlink(dst, dst_link)
479        if hasattr(os, 'lchmod'):
480            os.lchmod(src_link, stat.S_IRWXO)
481        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
482            os.lchflags(src_link, stat.UF_NODUMP)
483        src_link_stat = os.lstat(src_link)
484        # follow
485        if hasattr(os, 'lchmod'):
486            shutil.copystat(src_link, dst_link, follow_symlinks=True)
487            self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
488        # don't follow
489        shutil.copystat(src_link, dst_link, follow_symlinks=False)
490        dst_link_stat = os.lstat(dst_link)
491        if os.utime in os.supports_follow_symlinks:
492            for attr in 'st_atime', 'st_mtime':
493                # The modification times may be truncated in the new file.
494                self.assertLessEqual(getattr(src_link_stat, attr),
495                                     getattr(dst_link_stat, attr) + 1)
496        if hasattr(os, 'lchmod'):
497            self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
498        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
499            self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
500        # tell to follow but dst is not a link
501        shutil.copystat(src_link, dst, follow_symlinks=False)
502        self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
503                        00000.1)
504
505    @unittest.skipUnless(hasattr(os, 'chflags') and
506                         hasattr(errno, 'EOPNOTSUPP') and
507                         hasattr(errno, 'ENOTSUP'),
508                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
509    def test_copystat_handles_harmless_chflags_errors(self):
510        tmpdir = self.mkdtemp()
511        file1 = os.path.join(tmpdir, 'file1')
512        file2 = os.path.join(tmpdir, 'file2')
513        write_file(file1, 'xxx')
514        write_file(file2, 'xxx')
515
516        def make_chflags_raiser(err):
517            ex = OSError()
518
519            def _chflags_raiser(path, flags, *, follow_symlinks=True):
520                ex.errno = err
521                raise ex
522            return _chflags_raiser
523        old_chflags = os.chflags
524        try:
525            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
526                os.chflags = make_chflags_raiser(err)
527                shutil.copystat(file1, file2)
528            # assert others errors break it
529            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
530            self.assertRaises(OSError, shutil.copystat, file1, file2)
531        finally:
532            os.chflags = old_chflags
533
534    @support.skip_unless_xattr
535    def test_copyxattr(self):
536        tmp_dir = self.mkdtemp()
537        src = os.path.join(tmp_dir, 'foo')
538        write_file(src, 'foo')
539        dst = os.path.join(tmp_dir, 'bar')
540        write_file(dst, 'bar')
541
542        # no xattr == no problem
543        shutil._copyxattr(src, dst)
544        # common case
545        os.setxattr(src, 'user.foo', b'42')
546        os.setxattr(src, 'user.bar', b'43')
547        shutil._copyxattr(src, dst)
548        self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
549        self.assertEqual(
550                os.getxattr(src, 'user.foo'),
551                os.getxattr(dst, 'user.foo'))
552        # check errors don't affect other attrs
553        os.remove(dst)
554        write_file(dst, 'bar')
555        os_error = OSError(errno.EPERM, 'EPERM')
556
557        def _raise_on_user_foo(fname, attr, val, **kwargs):
558            if attr == 'user.foo':
559                raise os_error
560            else:
561                orig_setxattr(fname, attr, val, **kwargs)
562        try:
563            orig_setxattr = os.setxattr
564            os.setxattr = _raise_on_user_foo
565            shutil._copyxattr(src, dst)
566            self.assertIn('user.bar', os.listxattr(dst))
567        finally:
568            os.setxattr = orig_setxattr
569        # the source filesystem not supporting xattrs should be ok, too.
570        def _raise_on_src(fname, *, follow_symlinks=True):
571            if fname == src:
572                raise OSError(errno.ENOTSUP, 'Operation not supported')
573            return orig_listxattr(fname, follow_symlinks=follow_symlinks)
574        try:
575            orig_listxattr = os.listxattr
576            os.listxattr = _raise_on_src
577            shutil._copyxattr(src, dst)
578        finally:
579            os.listxattr = orig_listxattr
580
581        # test that shutil.copystat copies xattrs
582        src = os.path.join(tmp_dir, 'the_original')
583        srcro = os.path.join(tmp_dir, 'the_original_ro')
584        write_file(src, src)
585        write_file(srcro, srcro)
586        os.setxattr(src, 'user.the_value', b'fiddly')
587        os.setxattr(srcro, 'user.the_value', b'fiddly')
588        os.chmod(srcro, 0o444)
589        dst = os.path.join(tmp_dir, 'the_copy')
590        dstro = os.path.join(tmp_dir, 'the_copy_ro')
591        write_file(dst, dst)
592        write_file(dstro, dstro)
593        shutil.copystat(src, dst)
594        shutil.copystat(srcro, dstro)
595        self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
596        self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
597
598    @support.skip_unless_symlink
599    @support.skip_unless_xattr
600    @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
601                         'root privileges required')
602    def test_copyxattr_symlinks(self):
603        # On Linux, it's only possible to access non-user xattr for symlinks;
604        # which in turn require root privileges. This test should be expanded
605        # as soon as other platforms gain support for extended attributes.
606        tmp_dir = self.mkdtemp()
607        src = os.path.join(tmp_dir, 'foo')
608        src_link = os.path.join(tmp_dir, 'baz')
609        write_file(src, 'foo')
610        os.symlink(src, src_link)
611        os.setxattr(src, 'trusted.foo', b'42')
612        os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
613        dst = os.path.join(tmp_dir, 'bar')
614        dst_link = os.path.join(tmp_dir, 'qux')
615        write_file(dst, 'bar')
616        os.symlink(dst, dst_link)
617        shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
618        self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
619        self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
620        shutil._copyxattr(src_link, dst, follow_symlinks=False)
621        self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
622
623    @support.skip_unless_symlink
624    def test_copy_symlinks(self):
625        tmp_dir = self.mkdtemp()
626        src = os.path.join(tmp_dir, 'foo')
627        dst = os.path.join(tmp_dir, 'bar')
628        src_link = os.path.join(tmp_dir, 'baz')
629        write_file(src, 'foo')
630        os.symlink(src, src_link)
631        if hasattr(os, 'lchmod'):
632            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
633        # don't follow
634        shutil.copy(src_link, dst, follow_symlinks=True)
635        self.assertFalse(os.path.islink(dst))
636        self.assertEqual(read_file(src), read_file(dst))
637        os.remove(dst)
638        # follow
639        shutil.copy(src_link, dst, follow_symlinks=False)
640        self.assertTrue(os.path.islink(dst))
641        self.assertEqual(os.readlink(dst), os.readlink(src_link))
642        if hasattr(os, 'lchmod'):
643            self.assertEqual(os.lstat(src_link).st_mode,
644                             os.lstat(dst).st_mode)
645
646    @support.skip_unless_symlink
647    def test_copy2_symlinks(self):
648        tmp_dir = self.mkdtemp()
649        src = os.path.join(tmp_dir, 'foo')
650        dst = os.path.join(tmp_dir, 'bar')
651        src_link = os.path.join(tmp_dir, 'baz')
652        write_file(src, 'foo')
653        os.symlink(src, src_link)
654        if hasattr(os, 'lchmod'):
655            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
656        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
657            os.lchflags(src_link, stat.UF_NODUMP)
658        src_stat = os.stat(src)
659        src_link_stat = os.lstat(src_link)
660        # follow
661        shutil.copy2(src_link, dst, follow_symlinks=True)
662        self.assertFalse(os.path.islink(dst))
663        self.assertEqual(read_file(src), read_file(dst))
664        os.remove(dst)
665        # don't follow
666        shutil.copy2(src_link, dst, follow_symlinks=False)
667        self.assertTrue(os.path.islink(dst))
668        self.assertEqual(os.readlink(dst), os.readlink(src_link))
669        dst_stat = os.lstat(dst)
670        if os.utime in os.supports_follow_symlinks:
671            for attr in 'st_atime', 'st_mtime':
672                # The modification times may be truncated in the new file.
673                self.assertLessEqual(getattr(src_link_stat, attr),
674                                     getattr(dst_stat, attr) + 1)
675        if hasattr(os, 'lchmod'):
676            self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
677            self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
678        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
679            self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
680
681    @support.skip_unless_xattr
682    def test_copy2_xattr(self):
683        tmp_dir = self.mkdtemp()
684        src = os.path.join(tmp_dir, 'foo')
685        dst = os.path.join(tmp_dir, 'bar')
686        write_file(src, 'foo')
687        os.setxattr(src, 'user.foo', b'42')
688        shutil.copy2(src, dst)
689        self.assertEqual(
690                os.getxattr(src, 'user.foo'),
691                os.getxattr(dst, 'user.foo'))
692        os.remove(dst)
693
694    @support.skip_unless_symlink
695    def test_copyfile_symlinks(self):
696        tmp_dir = self.mkdtemp()
697        src = os.path.join(tmp_dir, 'src')
698        dst = os.path.join(tmp_dir, 'dst')
699        dst_link = os.path.join(tmp_dir, 'dst_link')
700        link = os.path.join(tmp_dir, 'link')
701        write_file(src, 'foo')
702        os.symlink(src, link)
703        # don't follow
704        shutil.copyfile(link, dst_link, follow_symlinks=False)
705        self.assertTrue(os.path.islink(dst_link))
706        self.assertEqual(os.readlink(link), os.readlink(dst_link))
707        # follow
708        shutil.copyfile(link, dst)
709        self.assertFalse(os.path.islink(dst))
710
711    def test_rmtree_uses_safe_fd_version_if_available(self):
712        _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
713                             os.supports_dir_fd and
714                             os.listdir in os.supports_fd and
715                             os.stat in os.supports_follow_symlinks)
716        if _use_fd_functions:
717            self.assertTrue(shutil._use_fd_functions)
718            self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
719            tmp_dir = self.mkdtemp()
720            d = os.path.join(tmp_dir, 'a')
721            os.mkdir(d)
722            try:
723                real_rmtree = shutil._rmtree_safe_fd
724                class Called(Exception): pass
725                def _raiser(*args, **kwargs):
726                    raise Called
727                shutil._rmtree_safe_fd = _raiser
728                self.assertRaises(Called, shutil.rmtree, d)
729            finally:
730                shutil._rmtree_safe_fd = real_rmtree
731        else:
732            self.assertFalse(shutil._use_fd_functions)
733            self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
734
735    def test_rmtree_dont_delete_file(self):
736        # When called on a file instead of a directory, don't delete it.
737        handle, path = tempfile.mkstemp()
738        os.close(handle)
739        self.assertRaises(NotADirectoryError, shutil.rmtree, path)
740        os.remove(path)
741
742    def test_copytree_simple(self):
743        src_dir = tempfile.mkdtemp()
744        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
745        self.addCleanup(shutil.rmtree, src_dir)
746        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
747        write_file((src_dir, 'test.txt'), '123')
748        os.mkdir(os.path.join(src_dir, 'test_dir'))
749        write_file((src_dir, 'test_dir', 'test.txt'), '456')
750
751        shutil.copytree(src_dir, dst_dir)
752        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
753        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
754        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
755                                                    'test.txt')))
756        actual = read_file((dst_dir, 'test.txt'))
757        self.assertEqual(actual, '123')
758        actual = read_file((dst_dir, 'test_dir', 'test.txt'))
759        self.assertEqual(actual, '456')
760
761    def test_copytree_dirs_exist_ok(self):
762        src_dir = tempfile.mkdtemp()
763        dst_dir = tempfile.mkdtemp()
764        self.addCleanup(shutil.rmtree, src_dir)
765        self.addCleanup(shutil.rmtree, dst_dir)
766
767        write_file((src_dir, 'nonexisting.txt'), '123')
768        os.mkdir(os.path.join(src_dir, 'existing_dir'))
769        os.mkdir(os.path.join(dst_dir, 'existing_dir'))
770        write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
771        write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
772
773        shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
774        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
775        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
776        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
777                                                    'existing.txt')))
778        actual = read_file((dst_dir, 'nonexisting.txt'))
779        self.assertEqual(actual, '123')
780        actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
781        self.assertEqual(actual, 'has been replaced')
782
783        with self.assertRaises(FileExistsError):
784            shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
785
786    @support.skip_unless_symlink
787    def test_copytree_symlinks(self):
788        tmp_dir = self.mkdtemp()
789        src_dir = os.path.join(tmp_dir, 'src')
790        dst_dir = os.path.join(tmp_dir, 'dst')
791        sub_dir = os.path.join(src_dir, 'sub')
792        os.mkdir(src_dir)
793        os.mkdir(sub_dir)
794        write_file((src_dir, 'file.txt'), 'foo')
795        src_link = os.path.join(sub_dir, 'link')
796        dst_link = os.path.join(dst_dir, 'sub/link')
797        os.symlink(os.path.join(src_dir, 'file.txt'),
798                   src_link)
799        if hasattr(os, 'lchmod'):
800            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
801        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
802            os.lchflags(src_link, stat.UF_NODUMP)
803        src_stat = os.lstat(src_link)
804        shutil.copytree(src_dir, dst_dir, symlinks=True)
805        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
806        actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
807        # Bad practice to blindly strip the prefix as it may be required to
808        # correctly refer to the file, but we're only comparing paths here.
809        if os.name == 'nt' and actual.startswith('\\\\?\\'):
810            actual = actual[4:]
811        self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
812        dst_stat = os.lstat(dst_link)
813        if hasattr(os, 'lchmod'):
814            self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
815        if hasattr(os, 'lchflags'):
816            self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
817
818    def test_copytree_with_exclude(self):
819        # creating data
820        join = os.path.join
821        exists = os.path.exists
822        src_dir = tempfile.mkdtemp()
823        try:
824            dst_dir = join(tempfile.mkdtemp(), 'destination')
825            write_file((src_dir, 'test.txt'), '123')
826            write_file((src_dir, 'test.tmp'), '123')
827            os.mkdir(join(src_dir, 'test_dir'))
828            write_file((src_dir, 'test_dir', 'test.txt'), '456')
829            os.mkdir(join(src_dir, 'test_dir2'))
830            write_file((src_dir, 'test_dir2', 'test.txt'), '456')
831            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
832            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
833            write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
834            write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
835
836            # testing glob-like patterns
837            try:
838                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
839                shutil.copytree(src_dir, dst_dir, ignore=patterns)
840                # checking the result: some elements should not be copied
841                self.assertTrue(exists(join(dst_dir, 'test.txt')))
842                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
843                self.assertFalse(exists(join(dst_dir, 'test_dir2')))
844            finally:
845                shutil.rmtree(dst_dir)
846            try:
847                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
848                shutil.copytree(src_dir, dst_dir, ignore=patterns)
849                # checking the result: some elements should not be copied
850                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
851                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
852                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
853            finally:
854                shutil.rmtree(dst_dir)
855
856            # testing callable-style
857            try:
858                def _filter(src, names):
859                    res = []
860                    for name in names:
861                        path = os.path.join(src, name)
862
863                        if (os.path.isdir(path) and
864                            path.split()[-1] == 'subdir'):
865                            res.append(name)
866                        elif os.path.splitext(path)[-1] in ('.py'):
867                            res.append(name)
868                    return res
869
870                shutil.copytree(src_dir, dst_dir, ignore=_filter)
871
872                # checking the result: some elements should not be copied
873                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
874                                             'test.py')))
875                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
876
877            finally:
878                shutil.rmtree(dst_dir)
879        finally:
880            shutil.rmtree(src_dir)
881            shutil.rmtree(os.path.dirname(dst_dir))
882
883    def test_copytree_arg_types_of_ignore(self):
884        join = os.path.join
885        exists = os.path.exists
886
887        tmp_dir = self.mkdtemp()
888        src_dir = join(tmp_dir, "source")
889
890        os.mkdir(join(src_dir))
891        os.mkdir(join(src_dir, 'test_dir'))
892        os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
893        write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
894
895        invokations = []
896
897        def _ignore(src, names):
898            invokations.append(src)
899            self.assertIsInstance(src, str)
900            self.assertIsInstance(names, list)
901            self.assertEqual(len(names), len(set(names)))
902            for name in names:
903                self.assertIsInstance(name, str)
904            return []
905
906        dst_dir = join(self.mkdtemp(), 'destination')
907        shutil.copytree(src_dir, dst_dir, ignore=_ignore)
908        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
909                                    'test.txt')))
910
911        dst_dir = join(self.mkdtemp(), 'destination')
912        shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
913        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
914                                    'test.txt')))
915
916        dst_dir = join(self.mkdtemp(), 'destination')
917        src_dir_entry = list(os.scandir(tmp_dir))[0]
918        self.assertIsInstance(src_dir_entry, os.DirEntry)
919        shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
920        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
921                                    'test.txt')))
922
923        self.assertEqual(len(invokations), 9)
924
925    def test_copytree_retains_permissions(self):
926        tmp_dir = tempfile.mkdtemp()
927        src_dir = os.path.join(tmp_dir, 'source')
928        os.mkdir(src_dir)
929        dst_dir = os.path.join(tmp_dir, 'destination')
930        self.addCleanup(shutil.rmtree, tmp_dir)
931
932        os.chmod(src_dir, 0o777)
933        write_file((src_dir, 'permissive.txt'), '123')
934        os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
935        write_file((src_dir, 'restrictive.txt'), '456')
936        os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
937        restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
938        os.chmod(restrictive_subdir, 0o600)
939
940        shutil.copytree(src_dir, dst_dir)
941        self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
942        self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
943                          os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
944        self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
945                          os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
946        restrictive_subdir_dst = os.path.join(dst_dir,
947                                              os.path.split(restrictive_subdir)[1])
948        self.assertEqual(os.stat(restrictive_subdir).st_mode,
949                          os.stat(restrictive_subdir_dst).st_mode)
950
951    @unittest.mock.patch('os.chmod')
952    def test_copytree_winerror(self, mock_patch):
953        # When copying to VFAT, copystat() raises OSError. On Windows, the
954        # exception object has a meaningful 'winerror' attribute, but not
955        # on other operating systems. Do not assume 'winerror' is set.
956        src_dir = tempfile.mkdtemp()
957        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
958        self.addCleanup(shutil.rmtree, src_dir)
959        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
960
961        mock_patch.side_effect = PermissionError('ka-boom')
962        with self.assertRaises(shutil.Error):
963            shutil.copytree(src_dir, dst_dir)
964
965    def test_copytree_custom_copy_function(self):
966        # See: https://bugs.python.org/issue35648
967        def custom_cpfun(a, b):
968            flag.append(None)
969            self.assertIsInstance(a, str)
970            self.assertIsInstance(b, str)
971            self.assertEqual(a, os.path.join(src, 'foo'))
972            self.assertEqual(b, os.path.join(dst, 'foo'))
973
974        flag = []
975        src = tempfile.mkdtemp()
976        self.addCleanup(support.rmtree, src)
977        dst = tempfile.mktemp()
978        self.addCleanup(support.rmtree, dst)
979        with open(os.path.join(src, 'foo'), 'w') as f:
980            f.close()
981        shutil.copytree(src, dst, copy_function=custom_cpfun)
982        self.assertEqual(len(flag), 1)
983
984    @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
985    def test_dont_copy_file_onto_link_to_itself(self):
986        # bug 851123.
987        os.mkdir(TESTFN)
988        src = os.path.join(TESTFN, 'cheese')
989        dst = os.path.join(TESTFN, 'shop')
990        try:
991            with open(src, 'w') as f:
992                f.write('cheddar')
993            try:
994                os.link(src, dst)
995            except PermissionError as e:
996                self.skipTest('os.link(): %s' % e)
997            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
998            with open(src, 'r') as f:
999                self.assertEqual(f.read(), 'cheddar')
1000            os.remove(dst)
1001        finally:
1002            shutil.rmtree(TESTFN, ignore_errors=True)
1003
1004    @support.skip_unless_symlink
1005    def test_dont_copy_file_onto_symlink_to_itself(self):
1006        # bug 851123.
1007        os.mkdir(TESTFN)
1008        src = os.path.join(TESTFN, 'cheese')
1009        dst = os.path.join(TESTFN, 'shop')
1010        try:
1011            with open(src, 'w') as f:
1012                f.write('cheddar')
1013            # Using `src` here would mean we end up with a symlink pointing
1014            # to TESTFN/TESTFN/cheese, while it should point at
1015            # TESTFN/cheese.
1016            os.symlink('cheese', dst)
1017            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
1018            with open(src, 'r') as f:
1019                self.assertEqual(f.read(), 'cheddar')
1020            os.remove(dst)
1021        finally:
1022            shutil.rmtree(TESTFN, ignore_errors=True)
1023
1024    @support.skip_unless_symlink
1025    def test_rmtree_on_symlink(self):
1026        # bug 1669.
1027        os.mkdir(TESTFN)
1028        try:
1029            src = os.path.join(TESTFN, 'cheese')
1030            dst = os.path.join(TESTFN, 'shop')
1031            os.mkdir(src)
1032            os.symlink(src, dst)
1033            self.assertRaises(OSError, shutil.rmtree, dst)
1034            shutil.rmtree(dst, ignore_errors=True)
1035        finally:
1036            shutil.rmtree(TESTFN, ignore_errors=True)
1037
1038    @unittest.skipUnless(_winapi, 'only relevant on Windows')
1039    def test_rmtree_on_junction(self):
1040        os.mkdir(TESTFN)
1041        try:
1042            src = os.path.join(TESTFN, 'cheese')
1043            dst = os.path.join(TESTFN, 'shop')
1044            os.mkdir(src)
1045            open(os.path.join(src, 'spam'), 'wb').close()
1046            _winapi.CreateJunction(src, dst)
1047            self.assertRaises(OSError, shutil.rmtree, dst)
1048            shutil.rmtree(dst, ignore_errors=True)
1049        finally:
1050            shutil.rmtree(TESTFN, ignore_errors=True)
1051
1052    # Issue #3002: copyfile and copytree block indefinitely on named pipes
1053    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1054    def test_copyfile_named_pipe(self):
1055        try:
1056            os.mkfifo(TESTFN)
1057        except PermissionError as e:
1058            self.skipTest('os.mkfifo(): %s' % e)
1059        try:
1060            self.assertRaises(shutil.SpecialFileError,
1061                                shutil.copyfile, TESTFN, TESTFN2)
1062            self.assertRaises(shutil.SpecialFileError,
1063                                shutil.copyfile, __file__, TESTFN)
1064        finally:
1065            os.remove(TESTFN)
1066
1067    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1068    @support.skip_unless_symlink
1069    def test_copytree_named_pipe(self):
1070        os.mkdir(TESTFN)
1071        try:
1072            subdir = os.path.join(TESTFN, "subdir")
1073            os.mkdir(subdir)
1074            pipe = os.path.join(subdir, "mypipe")
1075            try:
1076                os.mkfifo(pipe)
1077            except PermissionError as e:
1078                self.skipTest('os.mkfifo(): %s' % e)
1079            try:
1080                shutil.copytree(TESTFN, TESTFN2)
1081            except shutil.Error as e:
1082                errors = e.args[0]
1083                self.assertEqual(len(errors), 1)
1084                src, dst, error_msg = errors[0]
1085                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
1086            else:
1087                self.fail("shutil.Error should have been raised")
1088        finally:
1089            shutil.rmtree(TESTFN, ignore_errors=True)
1090            shutil.rmtree(TESTFN2, ignore_errors=True)
1091
1092    def test_copytree_special_func(self):
1093
1094        src_dir = self.mkdtemp()
1095        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1096        write_file((src_dir, 'test.txt'), '123')
1097        os.mkdir(os.path.join(src_dir, 'test_dir'))
1098        write_file((src_dir, 'test_dir', 'test.txt'), '456')
1099
1100        copied = []
1101        def _copy(src, dst):
1102            copied.append((src, dst))
1103
1104        shutil.copytree(src_dir, dst_dir, copy_function=_copy)
1105        self.assertEqual(len(copied), 2)
1106
1107    @support.skip_unless_symlink
1108    def test_copytree_dangling_symlinks(self):
1109
1110        # a dangling symlink raises an error at the end
1111        src_dir = self.mkdtemp()
1112        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1113        os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
1114        os.mkdir(os.path.join(src_dir, 'test_dir'))
1115        write_file((src_dir, 'test_dir', 'test.txt'), '456')
1116        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
1117
1118        # a dangling symlink is ignored with the proper flag
1119        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1120        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
1121        self.assertNotIn('test.txt', os.listdir(dst_dir))
1122
1123        # a dangling symlink is copied if symlinks=True
1124        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
1125        shutil.copytree(src_dir, dst_dir, symlinks=True)
1126        self.assertIn('test.txt', os.listdir(dst_dir))
1127
1128    @support.skip_unless_symlink
1129    def test_copytree_symlink_dir(self):
1130        src_dir = self.mkdtemp()
1131        dst_dir = os.path.join(self.mkdtemp(), 'destination')
1132        os.mkdir(os.path.join(src_dir, 'real_dir'))
1133        with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1134            pass
1135        os.symlink(os.path.join(src_dir, 'real_dir'),
1136                   os.path.join(src_dir, 'link_to_dir'),
1137                   target_is_directory=True)
1138
1139        shutil.copytree(src_dir, dst_dir, symlinks=False)
1140        self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1141        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1142
1143        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1144        shutil.copytree(src_dir, dst_dir, symlinks=True)
1145        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1146        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1147
1148    def _copy_file(self, method):
1149        fname = 'test.txt'
1150        tmpdir = self.mkdtemp()
1151        write_file((tmpdir, fname), 'xxx')
1152        file1 = os.path.join(tmpdir, fname)
1153        tmpdir2 = self.mkdtemp()
1154        method(file1, tmpdir2)
1155        file2 = os.path.join(tmpdir2, fname)
1156        return (file1, file2)
1157
1158    def test_copy(self):
1159        # Ensure that the copied file exists and has the same mode bits.
1160        file1, file2 = self._copy_file(shutil.copy)
1161        self.assertTrue(os.path.exists(file2))
1162        self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1163
1164    @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1165    def test_copy2(self):
1166        # Ensure that the copied file exists and has the same mode and
1167        # modification time bits.
1168        file1, file2 = self._copy_file(shutil.copy2)
1169        self.assertTrue(os.path.exists(file2))
1170        file1_stat = os.stat(file1)
1171        file2_stat = os.stat(file2)
1172        self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1173        for attr in 'st_atime', 'st_mtime':
1174            # The modification times may be truncated in the new file.
1175            self.assertLessEqual(getattr(file1_stat, attr),
1176                                 getattr(file2_stat, attr) + 1)
1177        if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1178            self.assertEqual(getattr(file1_stat, 'st_flags'),
1179                             getattr(file2_stat, 'st_flags'))
1180
1181    @support.requires_zlib
1182    def test_make_tarball(self):
1183        # creating something to tar
1184        root_dir, base_dir = self._create_files('')
1185
1186        tmpdir2 = self.mkdtemp()
1187        # force shutil to create the directory
1188        os.rmdir(tmpdir2)
1189        # working with relative paths
1190        work_dir = os.path.dirname(tmpdir2)
1191        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1192
1193        with support.change_cwd(work_dir):
1194            base_name = os.path.abspath(rel_base_name)
1195            tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
1196
1197        # check if the compressed tarball was created
1198        self.assertEqual(tarball, base_name + '.tar.gz')
1199        self.assertTrue(os.path.isfile(tarball))
1200        self.assertTrue(tarfile.is_tarfile(tarball))
1201        with tarfile.open(tarball, 'r:gz') as tf:
1202            self.assertCountEqual(tf.getnames(),
1203                                  ['.', './sub', './sub2',
1204                                   './file1', './file2', './sub/file3'])
1205
1206        # trying an uncompressed one
1207        with support.change_cwd(work_dir):
1208            tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
1209        self.assertEqual(tarball, base_name + '.tar')
1210        self.assertTrue(os.path.isfile(tarball))
1211        self.assertTrue(tarfile.is_tarfile(tarball))
1212        with tarfile.open(tarball, 'r') as tf:
1213            self.assertCountEqual(tf.getnames(),
1214                                  ['.', './sub', './sub2',
1215                                  './file1', './file2', './sub/file3'])
1216
1217    def _tarinfo(self, path):
1218        with tarfile.open(path) as tar:
1219            names = tar.getnames()
1220            names.sort()
1221            return tuple(names)
1222
1223    def _create_files(self, base_dir='dist'):
1224        # creating something to tar
1225        root_dir = self.mkdtemp()
1226        dist = os.path.join(root_dir, base_dir)
1227        os.makedirs(dist, exist_ok=True)
1228        write_file((dist, 'file1'), 'xxx')
1229        write_file((dist, 'file2'), 'xxx')
1230        os.mkdir(os.path.join(dist, 'sub'))
1231        write_file((dist, 'sub', 'file3'), 'xxx')
1232        os.mkdir(os.path.join(dist, 'sub2'))
1233        if base_dir:
1234            write_file((root_dir, 'outer'), 'xxx')
1235        return root_dir, base_dir
1236
1237    @support.requires_zlib
1238    @unittest.skipUnless(shutil.which('tar'),
1239                         'Need the tar command to run')
1240    def test_tarfile_vs_tar(self):
1241        root_dir, base_dir = self._create_files()
1242        base_name = os.path.join(self.mkdtemp(), 'archive')
1243        tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
1244
1245        # check if the compressed tarball was created
1246        self.assertEqual(tarball, base_name + '.tar.gz')
1247        self.assertTrue(os.path.isfile(tarball))
1248
1249        # now create another tarball using `tar`
1250        tarball2 = os.path.join(root_dir, 'archive2.tar')
1251        tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
1252        subprocess.check_call(tar_cmd, cwd=root_dir,
1253                              stdout=subprocess.DEVNULL)
1254
1255        self.assertTrue(os.path.isfile(tarball2))
1256        # let's compare both tarballs
1257        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
1258
1259        # trying an uncompressed one
1260        tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1261        self.assertEqual(tarball, base_name + '.tar')
1262        self.assertTrue(os.path.isfile(tarball))
1263
1264        # now for a dry_run
1265        tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1266                               dry_run=True)
1267        self.assertEqual(tarball, base_name + '.tar')
1268        self.assertTrue(os.path.isfile(tarball))
1269
1270    @support.requires_zlib
1271    def test_make_zipfile(self):
1272        # creating something to zip
1273        root_dir, base_dir = self._create_files()
1274
1275        tmpdir2 = self.mkdtemp()
1276        # force shutil to create the directory
1277        os.rmdir(tmpdir2)
1278        # working with relative paths
1279        work_dir = os.path.dirname(tmpdir2)
1280        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1281
1282        with support.change_cwd(work_dir):
1283            base_name = os.path.abspath(rel_base_name)
1284            res = make_archive(rel_base_name, 'zip', root_dir)
1285
1286        self.assertEqual(res, base_name + '.zip')
1287        self.assertTrue(os.path.isfile(res))
1288        self.assertTrue(zipfile.is_zipfile(res))
1289        with zipfile.ZipFile(res) as zf:
1290            self.assertCountEqual(zf.namelist(),
1291                    ['dist/', 'dist/sub/', 'dist/sub2/',
1292                     'dist/file1', 'dist/file2', 'dist/sub/file3',
1293                     'outer'])
1294
1295        with support.change_cwd(work_dir):
1296            base_name = os.path.abspath(rel_base_name)
1297            res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
1298
1299        self.assertEqual(res, base_name + '.zip')
1300        self.assertTrue(os.path.isfile(res))
1301        self.assertTrue(zipfile.is_zipfile(res))
1302        with zipfile.ZipFile(res) as zf:
1303            self.assertCountEqual(zf.namelist(),
1304                    ['dist/', 'dist/sub/', 'dist/sub2/',
1305                     'dist/file1', 'dist/file2', 'dist/sub/file3'])
1306
1307    @support.requires_zlib
1308    @unittest.skipUnless(shutil.which('zip'),
1309                         'Need the zip command to run')
1310    def test_zipfile_vs_zip(self):
1311        root_dir, base_dir = self._create_files()
1312        base_name = os.path.join(self.mkdtemp(), 'archive')
1313        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1314
1315        # check if ZIP file  was created
1316        self.assertEqual(archive, base_name + '.zip')
1317        self.assertTrue(os.path.isfile(archive))
1318
1319        # now create another ZIP file using `zip`
1320        archive2 = os.path.join(root_dir, 'archive2.zip')
1321        zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
1322        subprocess.check_call(zip_cmd, cwd=root_dir,
1323                              stdout=subprocess.DEVNULL)
1324
1325        self.assertTrue(os.path.isfile(archive2))
1326        # let's compare both ZIP files
1327        with zipfile.ZipFile(archive) as zf:
1328            names = zf.namelist()
1329        with zipfile.ZipFile(archive2) as zf:
1330            names2 = zf.namelist()
1331        self.assertEqual(sorted(names), sorted(names2))
1332
1333    @support.requires_zlib
1334    @unittest.skipUnless(shutil.which('unzip'),
1335                         'Need the unzip command to run')
1336    def test_unzip_zipfile(self):
1337        root_dir, base_dir = self._create_files()
1338        base_name = os.path.join(self.mkdtemp(), 'archive')
1339        archive = make_archive(base_name, 'zip', root_dir, base_dir)
1340
1341        # check if ZIP file  was created
1342        self.assertEqual(archive, base_name + '.zip')
1343        self.assertTrue(os.path.isfile(archive))
1344
1345        # now check the ZIP file using `unzip -t`
1346        zip_cmd = ['unzip', '-t', archive]
1347        with support.change_cwd(root_dir):
1348            try:
1349                subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1350            except subprocess.CalledProcessError as exc:
1351                details = exc.output.decode(errors="replace")
1352                if 'unrecognized option: t' in details:
1353                    self.skipTest("unzip doesn't support -t")
1354                msg = "{}\n\n**Unzip Output**\n{}"
1355                self.fail(msg.format(exc, details))
1356
1357    def test_make_archive(self):
1358        tmpdir = self.mkdtemp()
1359        base_name = os.path.join(tmpdir, 'archive')
1360        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1361
1362    @support.requires_zlib
1363    def test_make_archive_owner_group(self):
1364        # testing make_archive with owner and group, with various combinations
1365        # this works even if there's not gid/uid support
1366        if UID_GID_SUPPORT:
1367            group = grp.getgrgid(0)[0]
1368            owner = pwd.getpwuid(0)[0]
1369        else:
1370            group = owner = 'root'
1371
1372        root_dir, base_dir = self._create_files()
1373        base_name = os.path.join(self.mkdtemp(), 'archive')
1374        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1375                           group=group)
1376        self.assertTrue(os.path.isfile(res))
1377
1378        res = make_archive(base_name, 'zip', root_dir, base_dir)
1379        self.assertTrue(os.path.isfile(res))
1380
1381        res = make_archive(base_name, 'tar', root_dir, base_dir,
1382                           owner=owner, group=group)
1383        self.assertTrue(os.path.isfile(res))
1384
1385        res = make_archive(base_name, 'tar', root_dir, base_dir,
1386                           owner='kjhkjhkjg', group='oihohoh')
1387        self.assertTrue(os.path.isfile(res))
1388
1389
1390    @support.requires_zlib
1391    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1392    def test_tarfile_root_owner(self):
1393        root_dir, base_dir = self._create_files()
1394        base_name = os.path.join(self.mkdtemp(), 'archive')
1395        group = grp.getgrgid(0)[0]
1396        owner = pwd.getpwuid(0)[0]
1397        with support.change_cwd(root_dir):
1398            archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1399                                        owner=owner, group=group)
1400
1401        # check if the compressed tarball was created
1402        self.assertTrue(os.path.isfile(archive_name))
1403
1404        # now checks the rights
1405        archive = tarfile.open(archive_name)
1406        try:
1407            for member in archive.getmembers():
1408                self.assertEqual(member.uid, 0)
1409                self.assertEqual(member.gid, 0)
1410        finally:
1411            archive.close()
1412
1413    def test_make_archive_cwd(self):
1414        current_dir = os.getcwd()
1415        def _breaks(*args, **kw):
1416            raise RuntimeError()
1417
1418        register_archive_format('xxx', _breaks, [], 'xxx file')
1419        try:
1420            try:
1421                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1422            except Exception:
1423                pass
1424            self.assertEqual(os.getcwd(), current_dir)
1425        finally:
1426            unregister_archive_format('xxx')
1427
1428    def test_make_tarfile_in_curdir(self):
1429        # Issue #21280
1430        root_dir = self.mkdtemp()
1431        with support.change_cwd(root_dir):
1432            self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1433            self.assertTrue(os.path.isfile('test.tar'))
1434
1435    @support.requires_zlib
1436    def test_make_zipfile_in_curdir(self):
1437        # Issue #21280
1438        root_dir = self.mkdtemp()
1439        with support.change_cwd(root_dir):
1440            self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1441            self.assertTrue(os.path.isfile('test.zip'))
1442
1443    def test_register_archive_format(self):
1444
1445        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1446        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1447                          1)
1448        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1449                          [(1, 2), (1, 2, 3)])
1450
1451        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1452        formats = [name for name, params in get_archive_formats()]
1453        self.assertIn('xxx', formats)
1454
1455        unregister_archive_format('xxx')
1456        formats = [name for name, params in get_archive_formats()]
1457        self.assertNotIn('xxx', formats)
1458
1459    def check_unpack_archive(self, format):
1460        self.check_unpack_archive_with_converter(format, lambda path: path)
1461        self.check_unpack_archive_with_converter(format, pathlib.Path)
1462        self.check_unpack_archive_with_converter(format, FakePath)
1463
1464    def check_unpack_archive_with_converter(self, format, converter):
1465        root_dir, base_dir = self._create_files()
1466        expected = rlistdir(root_dir)
1467        expected.remove('outer')
1468
1469        base_name = os.path.join(self.mkdtemp(), 'archive')
1470        filename = make_archive(base_name, format, root_dir, base_dir)
1471
1472        # let's try to unpack it now
1473        tmpdir2 = self.mkdtemp()
1474        unpack_archive(converter(filename), converter(tmpdir2))
1475        self.assertEqual(rlistdir(tmpdir2), expected)
1476
1477        # and again, this time with the format specified
1478        tmpdir3 = self.mkdtemp()
1479        unpack_archive(converter(filename), converter(tmpdir3), format=format)
1480        self.assertEqual(rlistdir(tmpdir3), expected)
1481
1482        self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1483        self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
1484
1485    def test_unpack_archive_tar(self):
1486        self.check_unpack_archive('tar')
1487
1488    @support.requires_zlib
1489    def test_unpack_archive_gztar(self):
1490        self.check_unpack_archive('gztar')
1491
1492    @support.requires_bz2
1493    def test_unpack_archive_bztar(self):
1494        self.check_unpack_archive('bztar')
1495
1496    @support.requires_lzma
1497    @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
1498    def test_unpack_archive_xztar(self):
1499        self.check_unpack_archive('xztar')
1500
1501    @support.requires_zlib
1502    def test_unpack_archive_zip(self):
1503        self.check_unpack_archive('zip')
1504
1505    def test_unpack_registry(self):
1506
1507        formats = get_unpack_formats()
1508
1509        def _boo(filename, extract_dir, extra):
1510            self.assertEqual(extra, 1)
1511            self.assertEqual(filename, 'stuff.boo')
1512            self.assertEqual(extract_dir, 'xx')
1513
1514        register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1515        unpack_archive('stuff.boo', 'xx')
1516
1517        # trying to register a .boo unpacker again
1518        self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1519                          ['.boo'], _boo)
1520
1521        # should work now
1522        unregister_unpack_format('Boo')
1523        register_unpack_format('Boo2', ['.boo'], _boo)
1524        self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1525        self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1526
1527        # let's leave a clean state
1528        unregister_unpack_format('Boo2')
1529        self.assertEqual(get_unpack_formats(), formats)
1530
1531    @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1532                         "disk_usage not available on this platform")
1533    def test_disk_usage(self):
1534        usage = shutil.disk_usage(os.path.dirname(__file__))
1535        for attr in ('total', 'used', 'free'):
1536            self.assertIsInstance(getattr(usage, attr), int)
1537        self.assertGreater(usage.total, 0)
1538        self.assertGreater(usage.used, 0)
1539        self.assertGreaterEqual(usage.free, 0)
1540        self.assertGreaterEqual(usage.total, usage.used)
1541        self.assertGreater(usage.total, usage.free)
1542
1543        # bpo-32557: Check that disk_usage() also accepts a filename
1544        shutil.disk_usage(__file__)
1545
1546    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1547    @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1548    def test_chown(self):
1549
1550        # cleaned-up automatically by TestShutil.tearDown method
1551        dirname = self.mkdtemp()
1552        filename = tempfile.mktemp(dir=dirname)
1553        write_file(filename, 'testing chown function')
1554
1555        with self.assertRaises(ValueError):
1556            shutil.chown(filename)
1557
1558        with self.assertRaises(LookupError):
1559            shutil.chown(filename, user='non-existing username')
1560
1561        with self.assertRaises(LookupError):
1562            shutil.chown(filename, group='non-existing groupname')
1563
1564        with self.assertRaises(TypeError):
1565            shutil.chown(filename, b'spam')
1566
1567        with self.assertRaises(TypeError):
1568            shutil.chown(filename, 3.14)
1569
1570        uid = os.getuid()
1571        gid = os.getgid()
1572
1573        def check_chown(path, uid=None, gid=None):
1574            s = os.stat(filename)
1575            if uid is not None:
1576                self.assertEqual(uid, s.st_uid)
1577            if gid is not None:
1578                self.assertEqual(gid, s.st_gid)
1579
1580        shutil.chown(filename, uid, gid)
1581        check_chown(filename, uid, gid)
1582        shutil.chown(filename, uid)
1583        check_chown(filename, uid)
1584        shutil.chown(filename, user=uid)
1585        check_chown(filename, uid)
1586        shutil.chown(filename, group=gid)
1587        check_chown(filename, gid=gid)
1588
1589        shutil.chown(dirname, uid, gid)
1590        check_chown(dirname, uid, gid)
1591        shutil.chown(dirname, uid)
1592        check_chown(dirname, uid)
1593        shutil.chown(dirname, user=uid)
1594        check_chown(dirname, uid)
1595        shutil.chown(dirname, group=gid)
1596        check_chown(dirname, gid=gid)
1597
1598        user = pwd.getpwuid(uid)[0]
1599        group = grp.getgrgid(gid)[0]
1600        shutil.chown(filename, user, group)
1601        check_chown(filename, uid, gid)
1602        shutil.chown(dirname, user, group)
1603        check_chown(dirname, uid, gid)
1604
1605    def test_copy_return_value(self):
1606        # copy and copy2 both return their destination path.
1607        for fn in (shutil.copy, shutil.copy2):
1608            src_dir = self.mkdtemp()
1609            dst_dir = self.mkdtemp()
1610            src = os.path.join(src_dir, 'foo')
1611            write_file(src, 'foo')
1612            rv = fn(src, dst_dir)
1613            self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1614            rv = fn(src, os.path.join(dst_dir, 'bar'))
1615            self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1616
1617    def test_copyfile_return_value(self):
1618        # copytree returns its destination path.
1619        src_dir = self.mkdtemp()
1620        dst_dir = self.mkdtemp()
1621        dst_file = os.path.join(dst_dir, 'bar')
1622        src_file = os.path.join(src_dir, 'foo')
1623        write_file(src_file, 'foo')
1624        rv = shutil.copyfile(src_file, dst_file)
1625        self.assertTrue(os.path.exists(rv))
1626        self.assertEqual(read_file(src_file), read_file(dst_file))
1627
1628    def test_copyfile_same_file(self):
1629        # copyfile() should raise SameFileError if the source and destination
1630        # are the same.
1631        src_dir = self.mkdtemp()
1632        src_file = os.path.join(src_dir, 'foo')
1633        write_file(src_file, 'foo')
1634        self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1635        # But Error should work too, to stay backward compatible.
1636        self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1637        # Make sure file is not corrupted.
1638        self.assertEqual(read_file(src_file), 'foo')
1639
1640    def test_copytree_return_value(self):
1641        # copytree returns its destination path.
1642        src_dir = self.mkdtemp()
1643        dst_dir = src_dir + "dest"
1644        self.addCleanup(shutil.rmtree, dst_dir, True)
1645        src = os.path.join(src_dir, 'foo')
1646        write_file(src, 'foo')
1647        rv = shutil.copytree(src_dir, dst_dir)
1648        self.assertEqual(['foo'], os.listdir(rv))
1649
1650    def test_copytree_subdirectory(self):
1651        # copytree where dst is a subdirectory of src, see Issue 38688
1652        base_dir = self.mkdtemp()
1653        self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
1654        src_dir = os.path.join(base_dir, "t", "pg")
1655        dst_dir = os.path.join(src_dir, "somevendor", "1.0")
1656        os.makedirs(src_dir)
1657        src = os.path.join(src_dir, 'pol')
1658        write_file(src, 'pol')
1659        rv = shutil.copytree(src_dir, dst_dir)
1660        self.assertEqual(['pol'], os.listdir(rv))
1661
1662
1663class TestWhich(unittest.TestCase):
1664
1665    def setUp(self):
1666        self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
1667        self.addCleanup(shutil.rmtree, self.temp_dir, True)
1668        # Give the temp_file an ".exe" suffix for all.
1669        # It's needed on Windows and not harmful on other platforms.
1670        self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1671                                                     prefix="Tmp",
1672                                                     suffix=".Exe")
1673        os.chmod(self.temp_file.name, stat.S_IXUSR)
1674        self.addCleanup(self.temp_file.close)
1675        self.dir, self.file = os.path.split(self.temp_file.name)
1676        self.env_path = self.dir
1677        self.curdir = os.curdir
1678        self.ext = ".EXE"
1679
1680    def test_basic(self):
1681        # Given an EXE in a directory, it should be returned.
1682        rv = shutil.which(self.file, path=self.dir)
1683        self.assertEqual(rv, self.temp_file.name)
1684
1685    def test_absolute_cmd(self):
1686        # When given the fully qualified path to an executable that exists,
1687        # it should be returned.
1688        rv = shutil.which(self.temp_file.name, path=self.temp_dir)
1689        self.assertEqual(rv, self.temp_file.name)
1690
1691    def test_relative_cmd(self):
1692        # When given the relative path with a directory part to an executable
1693        # that exists, it should be returned.
1694        base_dir, tail_dir = os.path.split(self.dir)
1695        relpath = os.path.join(tail_dir, self.file)
1696        with support.change_cwd(path=base_dir):
1697            rv = shutil.which(relpath, path=self.temp_dir)
1698            self.assertEqual(rv, relpath)
1699        # But it shouldn't be searched in PATH directories (issue #16957).
1700        with support.change_cwd(path=self.dir):
1701            rv = shutil.which(relpath, path=base_dir)
1702            self.assertIsNone(rv)
1703
1704    def test_cwd(self):
1705        # Issue #16957
1706        base_dir = os.path.dirname(self.dir)
1707        with support.change_cwd(path=self.dir):
1708            rv = shutil.which(self.file, path=base_dir)
1709            if sys.platform == "win32":
1710                # Windows: current directory implicitly on PATH
1711                self.assertEqual(rv, os.path.join(self.curdir, self.file))
1712            else:
1713                # Other platforms: shouldn't match in the current directory.
1714                self.assertIsNone(rv)
1715
1716    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1717                     'non-root user required')
1718    def test_non_matching_mode(self):
1719        # Set the file read-only and ask for writeable files.
1720        os.chmod(self.temp_file.name, stat.S_IREAD)
1721        if os.access(self.temp_file.name, os.W_OK):
1722            self.skipTest("can't set the file read-only")
1723        rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1724        self.assertIsNone(rv)
1725
1726    def test_relative_path(self):
1727        base_dir, tail_dir = os.path.split(self.dir)
1728        with support.change_cwd(path=base_dir):
1729            rv = shutil.which(self.file, path=tail_dir)
1730            self.assertEqual(rv, os.path.join(tail_dir, self.file))
1731
1732    def test_nonexistent_file(self):
1733        # Return None when no matching executable file is found on the path.
1734        rv = shutil.which("foo.exe", path=self.dir)
1735        self.assertIsNone(rv)
1736
1737    @unittest.skipUnless(sys.platform == "win32",
1738                         "pathext check is Windows-only")
1739    def test_pathext_checking(self):
1740        # Ask for the file without the ".exe" extension, then ensure that
1741        # it gets found properly with the extension.
1742        rv = shutil.which(self.file[:-4], path=self.dir)
1743        self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
1744
1745    def test_environ_path(self):
1746        with support.EnvironmentVarGuard() as env:
1747            env['PATH'] = self.env_path
1748            rv = shutil.which(self.file)
1749            self.assertEqual(rv, self.temp_file.name)
1750
1751    def test_environ_path_empty(self):
1752        # PATH='': no match
1753        with support.EnvironmentVarGuard() as env:
1754            env['PATH'] = ''
1755            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1756                                     create=True), \
1757                 support.swap_attr(os, 'defpath', self.dir), \
1758                 support.change_cwd(self.dir):
1759                rv = shutil.which(self.file)
1760                self.assertIsNone(rv)
1761
1762    def test_environ_path_cwd(self):
1763        expected_cwd = os.path.basename(self.temp_file.name)
1764        if sys.platform == "win32":
1765            curdir = os.curdir
1766            if isinstance(expected_cwd, bytes):
1767                curdir = os.fsencode(curdir)
1768            expected_cwd = os.path.join(curdir, expected_cwd)
1769
1770        # PATH=':': explicitly looks in the current directory
1771        with support.EnvironmentVarGuard() as env:
1772            env['PATH'] = os.pathsep
1773            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1774                                     create=True), \
1775                 support.swap_attr(os, 'defpath', self.dir):
1776                rv = shutil.which(self.file)
1777                self.assertIsNone(rv)
1778
1779                # look in current directory
1780                with support.change_cwd(self.dir):
1781                    rv = shutil.which(self.file)
1782                    self.assertEqual(rv, expected_cwd)
1783
1784    def test_environ_path_missing(self):
1785        with support.EnvironmentVarGuard() as env:
1786            env.pop('PATH', None)
1787
1788            # without confstr
1789            with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1790                                     create=True), \
1791                 support.swap_attr(os, 'defpath', self.dir):
1792                rv = shutil.which(self.file)
1793            self.assertEqual(rv, self.temp_file.name)
1794
1795            # with confstr
1796            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1797                                     create=True), \
1798                 support.swap_attr(os, 'defpath', ''):
1799                rv = shutil.which(self.file)
1800            self.assertEqual(rv, self.temp_file.name)
1801
1802    def test_empty_path(self):
1803        base_dir = os.path.dirname(self.dir)
1804        with support.change_cwd(path=self.dir), \
1805             support.EnvironmentVarGuard() as env:
1806            env['PATH'] = self.env_path
1807            rv = shutil.which(self.file, path='')
1808            self.assertIsNone(rv)
1809
1810    def test_empty_path_no_PATH(self):
1811        with support.EnvironmentVarGuard() as env:
1812            env.pop('PATH', None)
1813            rv = shutil.which(self.file)
1814            self.assertIsNone(rv)
1815
1816    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1817    def test_pathext(self):
1818        ext = ".xyz"
1819        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1820                                                   prefix="Tmp2", suffix=ext)
1821        os.chmod(temp_filexyz.name, stat.S_IXUSR)
1822        self.addCleanup(temp_filexyz.close)
1823
1824        # strip path and extension
1825        program = os.path.basename(temp_filexyz.name)
1826        program = os.path.splitext(program)[0]
1827
1828        with support.EnvironmentVarGuard() as env:
1829            env['PATHEXT'] = ext
1830            rv = shutil.which(program, path=self.temp_dir)
1831            self.assertEqual(rv, temp_filexyz.name)
1832
1833
1834class TestWhichBytes(TestWhich):
1835    def setUp(self):
1836        TestWhich.setUp(self)
1837        self.dir = os.fsencode(self.dir)
1838        self.file = os.fsencode(self.file)
1839        self.temp_file.name = os.fsencode(self.temp_file.name)
1840        self.curdir = os.fsencode(self.curdir)
1841        self.ext = os.fsencode(self.ext)
1842
1843
1844class TestMove(unittest.TestCase):
1845
1846    def setUp(self):
1847        filename = "foo"
1848        basedir = None
1849        if sys.platform == "win32":
1850            basedir = os.path.realpath(os.getcwd())
1851        self.src_dir = tempfile.mkdtemp(dir=basedir)
1852        self.dst_dir = tempfile.mkdtemp(dir=basedir)
1853        self.src_file = os.path.join(self.src_dir, filename)
1854        self.dst_file = os.path.join(self.dst_dir, filename)
1855        with open(self.src_file, "wb") as f:
1856            f.write(b"spam")
1857
1858    def tearDown(self):
1859        for d in (self.src_dir, self.dst_dir):
1860            try:
1861                if d:
1862                    shutil.rmtree(d)
1863            except:
1864                pass
1865
1866    def _check_move_file(self, src, dst, real_dst):
1867        with open(src, "rb") as f:
1868            contents = f.read()
1869        shutil.move(src, dst)
1870        with open(real_dst, "rb") as f:
1871            self.assertEqual(contents, f.read())
1872        self.assertFalse(os.path.exists(src))
1873
1874    def _check_move_dir(self, src, dst, real_dst):
1875        contents = sorted(os.listdir(src))
1876        shutil.move(src, dst)
1877        self.assertEqual(contents, sorted(os.listdir(real_dst)))
1878        self.assertFalse(os.path.exists(src))
1879
1880    def test_move_file(self):
1881        # Move a file to another location on the same filesystem.
1882        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1883
1884    def test_move_file_to_dir(self):
1885        # Move a file inside an existing dir on the same filesystem.
1886        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1887
1888    @mock_rename
1889    def test_move_file_other_fs(self):
1890        # Move a file to an existing dir on another filesystem.
1891        self.test_move_file()
1892
1893    @mock_rename
1894    def test_move_file_to_dir_other_fs(self):
1895        # Move a file to another location on another filesystem.
1896        self.test_move_file_to_dir()
1897
1898    def test_move_dir(self):
1899        # Move a dir to another location on the same filesystem.
1900        dst_dir = tempfile.mktemp()
1901        try:
1902            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1903        finally:
1904            try:
1905                shutil.rmtree(dst_dir)
1906            except:
1907                pass
1908
1909    @mock_rename
1910    def test_move_dir_other_fs(self):
1911        # Move a dir to another location on another filesystem.
1912        self.test_move_dir()
1913
1914    def test_move_dir_to_dir(self):
1915        # Move a dir inside an existing dir on the same filesystem.
1916        self._check_move_dir(self.src_dir, self.dst_dir,
1917            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1918
1919    @mock_rename
1920    def test_move_dir_to_dir_other_fs(self):
1921        # Move a dir inside an existing dir on another filesystem.
1922        self.test_move_dir_to_dir()
1923
1924    def test_move_dir_sep_to_dir(self):
1925        self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1926            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1927
1928    @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1929    def test_move_dir_altsep_to_dir(self):
1930        self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1931            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1932
1933    def test_existing_file_inside_dest_dir(self):
1934        # A file with the same name inside the destination dir already exists.
1935        with open(self.dst_file, "wb"):
1936            pass
1937        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1938
1939    def test_dont_move_dir_in_itself(self):
1940        # Moving a dir inside itself raises an Error.
1941        dst = os.path.join(self.src_dir, "bar")
1942        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1943
1944    def test_destinsrc_false_negative(self):
1945        os.mkdir(TESTFN)
1946        try:
1947            for src, dst in [('srcdir', 'srcdir/dest')]:
1948                src = os.path.join(TESTFN, src)
1949                dst = os.path.join(TESTFN, dst)
1950                self.assertTrue(shutil._destinsrc(src, dst),
1951                             msg='_destinsrc() wrongly concluded that '
1952                             'dst (%s) is not in src (%s)' % (dst, src))
1953        finally:
1954            shutil.rmtree(TESTFN, ignore_errors=True)
1955
1956    def test_destinsrc_false_positive(self):
1957        os.mkdir(TESTFN)
1958        try:
1959            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1960                src = os.path.join(TESTFN, src)
1961                dst = os.path.join(TESTFN, dst)
1962                self.assertFalse(shutil._destinsrc(src, dst),
1963                            msg='_destinsrc() wrongly concluded that '
1964                            'dst (%s) is in src (%s)' % (dst, src))
1965        finally:
1966            shutil.rmtree(TESTFN, ignore_errors=True)
1967
1968    @support.skip_unless_symlink
1969    @mock_rename
1970    def test_move_file_symlink(self):
1971        dst = os.path.join(self.src_dir, 'bar')
1972        os.symlink(self.src_file, dst)
1973        shutil.move(dst, self.dst_file)
1974        self.assertTrue(os.path.islink(self.dst_file))
1975        self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1976
1977    @support.skip_unless_symlink
1978    @mock_rename
1979    def test_move_file_symlink_to_dir(self):
1980        filename = "bar"
1981        dst = os.path.join(self.src_dir, filename)
1982        os.symlink(self.src_file, dst)
1983        shutil.move(dst, self.dst_dir)
1984        final_link = os.path.join(self.dst_dir, filename)
1985        self.assertTrue(os.path.islink(final_link))
1986        self.assertTrue(os.path.samefile(self.src_file, final_link))
1987
1988    @support.skip_unless_symlink
1989    @mock_rename
1990    def test_move_dangling_symlink(self):
1991        src = os.path.join(self.src_dir, 'baz')
1992        dst = os.path.join(self.src_dir, 'bar')
1993        os.symlink(src, dst)
1994        dst_link = os.path.join(self.dst_dir, 'quux')
1995        shutil.move(dst, dst_link)
1996        self.assertTrue(os.path.islink(dst_link))
1997        self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
1998
1999    @support.skip_unless_symlink
2000    @mock_rename
2001    def test_move_dir_symlink(self):
2002        src = os.path.join(self.src_dir, 'baz')
2003        dst = os.path.join(self.src_dir, 'bar')
2004        os.mkdir(src)
2005        os.symlink(src, dst)
2006        dst_link = os.path.join(self.dst_dir, 'quux')
2007        shutil.move(dst, dst_link)
2008        self.assertTrue(os.path.islink(dst_link))
2009        self.assertTrue(os.path.samefile(src, dst_link))
2010
2011    def test_move_return_value(self):
2012        rv = shutil.move(self.src_file, self.dst_dir)
2013        self.assertEqual(rv,
2014                os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2015
2016    def test_move_as_rename_return_value(self):
2017        rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2018        self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2019
2020    @mock_rename
2021    def test_move_file_special_function(self):
2022        moved = []
2023        def _copy(src, dst):
2024            moved.append((src, dst))
2025        shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2026        self.assertEqual(len(moved), 1)
2027
2028    @mock_rename
2029    def test_move_dir_special_function(self):
2030        moved = []
2031        def _copy(src, dst):
2032            moved.append((src, dst))
2033        support.create_empty_file(os.path.join(self.src_dir, 'child'))
2034        support.create_empty_file(os.path.join(self.src_dir, 'child1'))
2035        shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2036        self.assertEqual(len(moved), 3)
2037
2038
2039class TestCopyFile(unittest.TestCase):
2040
2041    _delete = False
2042
2043    class Faux(object):
2044        _entered = False
2045        _exited_with = None
2046        _raised = False
2047        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2048            self._raise_in_exit = raise_in_exit
2049            self._suppress_at_exit = suppress_at_exit
2050        def read(self, *args):
2051            return ''
2052        def __enter__(self):
2053            self._entered = True
2054        def __exit__(self, exc_type, exc_val, exc_tb):
2055            self._exited_with = exc_type, exc_val, exc_tb
2056            if self._raise_in_exit:
2057                self._raised = True
2058                raise OSError("Cannot close")
2059            return self._suppress_at_exit
2060
2061    def tearDown(self):
2062        if self._delete:
2063            del shutil.open
2064
2065    def _set_shutil_open(self, func):
2066        shutil.open = func
2067        self._delete = True
2068
2069    def test_w_source_open_fails(self):
2070        def _open(filename, mode='r'):
2071            if filename == 'srcfile':
2072                raise OSError('Cannot open "srcfile"')
2073            assert 0  # shouldn't reach here.
2074
2075        self._set_shutil_open(_open)
2076
2077        self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
2078
2079    @unittest.skipIf(MACOS, "skipped on macOS")
2080    def test_w_dest_open_fails(self):
2081
2082        srcfile = self.Faux()
2083
2084        def _open(filename, mode='r'):
2085            if filename == 'srcfile':
2086                return srcfile
2087            if filename == 'destfile':
2088                raise OSError('Cannot open "destfile"')
2089            assert 0  # shouldn't reach here.
2090
2091        self._set_shutil_open(_open)
2092
2093        shutil.copyfile('srcfile', 'destfile')
2094        self.assertTrue(srcfile._entered)
2095        self.assertTrue(srcfile._exited_with[0] is OSError)
2096        self.assertEqual(srcfile._exited_with[1].args,
2097                         ('Cannot open "destfile"',))
2098
2099    @unittest.skipIf(MACOS, "skipped on macOS")
2100    def test_w_dest_close_fails(self):
2101
2102        srcfile = self.Faux()
2103        destfile = self.Faux(True)
2104
2105        def _open(filename, mode='r'):
2106            if filename == 'srcfile':
2107                return srcfile
2108            if filename == 'destfile':
2109                return destfile
2110            assert 0  # shouldn't reach here.
2111
2112        self._set_shutil_open(_open)
2113
2114        shutil.copyfile('srcfile', 'destfile')
2115        self.assertTrue(srcfile._entered)
2116        self.assertTrue(destfile._entered)
2117        self.assertTrue(destfile._raised)
2118        self.assertTrue(srcfile._exited_with[0] is OSError)
2119        self.assertEqual(srcfile._exited_with[1].args,
2120                         ('Cannot close',))
2121
2122    @unittest.skipIf(MACOS, "skipped on macOS")
2123    def test_w_source_close_fails(self):
2124
2125        srcfile = self.Faux(True)
2126        destfile = self.Faux()
2127
2128        def _open(filename, mode='r'):
2129            if filename == 'srcfile':
2130                return srcfile
2131            if filename == 'destfile':
2132                return destfile
2133            assert 0  # shouldn't reach here.
2134
2135        self._set_shutil_open(_open)
2136
2137        self.assertRaises(OSError,
2138                          shutil.copyfile, 'srcfile', 'destfile')
2139        self.assertTrue(srcfile._entered)
2140        self.assertTrue(destfile._entered)
2141        self.assertFalse(destfile._raised)
2142        self.assertTrue(srcfile._exited_with[0] is None)
2143        self.assertTrue(srcfile._raised)
2144
2145    def test_move_dir_caseinsensitive(self):
2146        # Renames a folder to the same name
2147        # but a different case.
2148
2149        self.src_dir = tempfile.mkdtemp()
2150        self.addCleanup(shutil.rmtree, self.src_dir, True)
2151        dst_dir = os.path.join(
2152                os.path.dirname(self.src_dir),
2153                os.path.basename(self.src_dir).upper())
2154        self.assertNotEqual(self.src_dir, dst_dir)
2155
2156        try:
2157            shutil.move(self.src_dir, dst_dir)
2158            self.assertTrue(os.path.isdir(dst_dir))
2159        finally:
2160            os.rmdir(dst_dir)
2161
2162
2163class TestCopyFileObj(unittest.TestCase):
2164    FILESIZE = 2 * 1024 * 1024
2165
2166    @classmethod
2167    def setUpClass(cls):
2168        write_test_file(TESTFN, cls.FILESIZE)
2169
2170    @classmethod
2171    def tearDownClass(cls):
2172        support.unlink(TESTFN)
2173        support.unlink(TESTFN2)
2174
2175    def tearDown(self):
2176        support.unlink(TESTFN2)
2177
2178    @contextlib.contextmanager
2179    def get_files(self):
2180        with open(TESTFN, "rb") as src:
2181            with open(TESTFN2, "wb") as dst:
2182                yield (src, dst)
2183
2184    def assert_files_eq(self, src, dst):
2185        with open(src, 'rb') as fsrc:
2186            with open(dst, 'rb') as fdst:
2187                self.assertEqual(fsrc.read(), fdst.read())
2188
2189    def test_content(self):
2190        with self.get_files() as (src, dst):
2191            shutil.copyfileobj(src, dst)
2192        self.assert_files_eq(TESTFN, TESTFN2)
2193
2194    def test_file_not_closed(self):
2195        with self.get_files() as (src, dst):
2196            shutil.copyfileobj(src, dst)
2197            assert not src.closed
2198            assert not dst.closed
2199
2200    def test_file_offset(self):
2201        with self.get_files() as (src, dst):
2202            shutil.copyfileobj(src, dst)
2203            self.assertEqual(src.tell(), self.FILESIZE)
2204            self.assertEqual(dst.tell(), self.FILESIZE)
2205
2206    @unittest.skipIf(os.name != 'nt', "Windows only")
2207    def test_win_impl(self):
2208        # Make sure alternate Windows implementation is called.
2209        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2210            shutil.copyfile(TESTFN, TESTFN2)
2211        assert m.called
2212
2213        # File size is 2 MiB but max buf size should be 1 MiB.
2214        self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2215
2216        # If file size < 1 MiB memoryview() length must be equal to
2217        # the actual file size.
2218        with tempfile.NamedTemporaryFile(delete=False) as f:
2219            f.write(b'foo')
2220        fname = f.name
2221        self.addCleanup(support.unlink, fname)
2222        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2223            shutil.copyfile(fname, TESTFN2)
2224        self.assertEqual(m.call_args[0][2], 3)
2225
2226        # Empty files should not rely on readinto() variant.
2227        with tempfile.NamedTemporaryFile(delete=False) as f:
2228            pass
2229        fname = f.name
2230        self.addCleanup(support.unlink, fname)
2231        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2232            shutil.copyfile(fname, TESTFN2)
2233        assert not m.called
2234        self.assert_files_eq(fname, TESTFN2)
2235
2236
2237class _ZeroCopyFileTest(object):
2238    """Tests common to all zero-copy APIs."""
2239    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
2240    FILEDATA = b""
2241    PATCHPOINT = ""
2242
2243    @classmethod
2244    def setUpClass(cls):
2245        write_test_file(TESTFN, cls.FILESIZE)
2246        with open(TESTFN, 'rb') as f:
2247            cls.FILEDATA = f.read()
2248            assert len(cls.FILEDATA) == cls.FILESIZE
2249
2250    @classmethod
2251    def tearDownClass(cls):
2252        support.unlink(TESTFN)
2253
2254    def tearDown(self):
2255        support.unlink(TESTFN2)
2256
2257    @contextlib.contextmanager
2258    def get_files(self):
2259        with open(TESTFN, "rb") as src:
2260            with open(TESTFN2, "wb") as dst:
2261                yield (src, dst)
2262
2263    def zerocopy_fun(self, *args, **kwargs):
2264        raise NotImplementedError("must be implemented in subclass")
2265
2266    def reset(self):
2267        self.tearDown()
2268        self.tearDownClass()
2269        self.setUpClass()
2270        self.setUp()
2271
2272    # ---
2273
2274    def test_regular_copy(self):
2275        with self.get_files() as (src, dst):
2276            self.zerocopy_fun(src, dst)
2277        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2278        # Make sure the fallback function is not called.
2279        with self.get_files() as (src, dst):
2280            with unittest.mock.patch('shutil.copyfileobj') as m:
2281                shutil.copyfile(TESTFN, TESTFN2)
2282            assert not m.called
2283
2284    def test_same_file(self):
2285        self.addCleanup(self.reset)
2286        with self.get_files() as (src, dst):
2287            with self.assertRaises(Exception):
2288                self.zerocopy_fun(src, src)
2289        # Make sure src file is not corrupted.
2290        self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2291
2292    def test_non_existent_src(self):
2293        name = tempfile.mktemp()
2294        with self.assertRaises(FileNotFoundError) as cm:
2295            shutil.copyfile(name, "new")
2296        self.assertEqual(cm.exception.filename, name)
2297
2298    def test_empty_file(self):
2299        srcname = TESTFN + 'src'
2300        dstname = TESTFN + 'dst'
2301        self.addCleanup(lambda: support.unlink(srcname))
2302        self.addCleanup(lambda: support.unlink(dstname))
2303        with open(srcname, "wb"):
2304            pass
2305
2306        with open(srcname, "rb") as src:
2307            with open(dstname, "wb") as dst:
2308                self.zerocopy_fun(src, dst)
2309
2310        self.assertEqual(read_file(dstname, binary=True), b"")
2311
2312    def test_unhandled_exception(self):
2313        with unittest.mock.patch(self.PATCHPOINT,
2314                                 side_effect=ZeroDivisionError):
2315            self.assertRaises(ZeroDivisionError,
2316                              shutil.copyfile, TESTFN, TESTFN2)
2317
2318    def test_exception_on_first_call(self):
2319        # Emulate a case where the first call to the zero-copy
2320        # function raises an exception in which case the function is
2321        # supposed to give up immediately.
2322        with unittest.mock.patch(self.PATCHPOINT,
2323                                 side_effect=OSError(errno.EINVAL, "yo")):
2324            with self.get_files() as (src, dst):
2325                with self.assertRaises(_GiveupOnFastCopy):
2326                    self.zerocopy_fun(src, dst)
2327
2328    def test_filesystem_full(self):
2329        # Emulate a case where filesystem is full and sendfile() fails
2330        # on first call.
2331        with unittest.mock.patch(self.PATCHPOINT,
2332                                 side_effect=OSError(errno.ENOSPC, "yo")):
2333            with self.get_files() as (src, dst):
2334                self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2335
2336
2337@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2338class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2339    PATCHPOINT = "os.sendfile"
2340
2341    def zerocopy_fun(self, fsrc, fdst):
2342        return shutil._fastcopy_sendfile(fsrc, fdst)
2343
2344    def test_non_regular_file_src(self):
2345        with io.BytesIO(self.FILEDATA) as src:
2346            with open(TESTFN2, "wb") as dst:
2347                with self.assertRaises(_GiveupOnFastCopy):
2348                    self.zerocopy_fun(src, dst)
2349                shutil.copyfileobj(src, dst)
2350
2351        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2352
2353    def test_non_regular_file_dst(self):
2354        with open(TESTFN, "rb") as src:
2355            with io.BytesIO() as dst:
2356                with self.assertRaises(_GiveupOnFastCopy):
2357                    self.zerocopy_fun(src, dst)
2358                shutil.copyfileobj(src, dst)
2359                dst.seek(0)
2360                self.assertEqual(dst.read(), self.FILEDATA)
2361
2362    def test_exception_on_second_call(self):
2363        def sendfile(*args, **kwargs):
2364            if not flag:
2365                flag.append(None)
2366                return orig_sendfile(*args, **kwargs)
2367            else:
2368                raise OSError(errno.EBADF, "yo")
2369
2370        flag = []
2371        orig_sendfile = os.sendfile
2372        with unittest.mock.patch('os.sendfile', create=True,
2373                                 side_effect=sendfile):
2374            with self.get_files() as (src, dst):
2375                with self.assertRaises(OSError) as cm:
2376                    shutil._fastcopy_sendfile(src, dst)
2377        assert flag
2378        self.assertEqual(cm.exception.errno, errno.EBADF)
2379
2380    def test_cant_get_size(self):
2381        # Emulate a case where src file size cannot be determined.
2382        # Internally bufsize will be set to a small value and
2383        # sendfile() will be called repeatedly.
2384        with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2385            with self.get_files() as (src, dst):
2386                shutil._fastcopy_sendfile(src, dst)
2387                assert m.called
2388        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2389
2390    def test_small_chunks(self):
2391        # Force internal file size detection to be smaller than the
2392        # actual file size. We want to force sendfile() to be called
2393        # multiple times, also in order to emulate a src fd which gets
2394        # bigger while it is being copied.
2395        mock = unittest.mock.Mock()
2396        mock.st_size = 65536 + 1
2397        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2398            with self.get_files() as (src, dst):
2399                shutil._fastcopy_sendfile(src, dst)
2400                assert m.called
2401        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2402
2403    def test_big_chunk(self):
2404        # Force internal file size detection to be +100MB bigger than
2405        # the actual file size. Make sure sendfile() does not rely on
2406        # file size value except for (maybe) a better throughput /
2407        # performance.
2408        mock = unittest.mock.Mock()
2409        mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2410        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2411            with self.get_files() as (src, dst):
2412                shutil._fastcopy_sendfile(src, dst)
2413                assert m.called
2414        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2415
2416    def test_blocksize_arg(self):
2417        with unittest.mock.patch('os.sendfile',
2418                                 side_effect=ZeroDivisionError) as m:
2419            self.assertRaises(ZeroDivisionError,
2420                              shutil.copyfile, TESTFN, TESTFN2)
2421            blocksize = m.call_args[0][3]
2422            # Make sure file size and the block size arg passed to
2423            # sendfile() are the same.
2424            self.assertEqual(blocksize, os.path.getsize(TESTFN))
2425            # ...unless we're dealing with a small file.
2426            support.unlink(TESTFN2)
2427            write_file(TESTFN2, b"hello", binary=True)
2428            self.addCleanup(support.unlink, TESTFN2 + '3')
2429            self.assertRaises(ZeroDivisionError,
2430                              shutil.copyfile, TESTFN2, TESTFN2 + '3')
2431            blocksize = m.call_args[0][3]
2432            self.assertEqual(blocksize, 2 ** 23)
2433
2434    def test_file2file_not_supported(self):
2435        # Emulate a case where sendfile() only support file->socket
2436        # fds. In such a case copyfile() is supposed to skip the
2437        # fast-copy attempt from then on.
2438        assert shutil._USE_CP_SENDFILE
2439        try:
2440            with unittest.mock.patch(
2441                    self.PATCHPOINT,
2442                    side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2443                with self.get_files() as (src, dst):
2444                    with self.assertRaises(_GiveupOnFastCopy):
2445                        shutil._fastcopy_sendfile(src, dst)
2446                assert m.called
2447            assert not shutil._USE_CP_SENDFILE
2448
2449            with unittest.mock.patch(self.PATCHPOINT) as m:
2450                shutil.copyfile(TESTFN, TESTFN2)
2451                assert not m.called
2452        finally:
2453            shutil._USE_CP_SENDFILE = True
2454
2455
2456@unittest.skipIf(not MACOS, 'macOS only')
2457class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
2458    PATCHPOINT = "posix._fcopyfile"
2459
2460    def zerocopy_fun(self, src, dst):
2461        return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
2462
2463
2464class TermsizeTests(unittest.TestCase):
2465    def test_does_not_crash(self):
2466        """Check if get_terminal_size() returns a meaningful value.
2467
2468        There's no easy portable way to actually check the size of the
2469        terminal, so let's check if it returns something sensible instead.
2470        """
2471        size = shutil.get_terminal_size()
2472        self.assertGreaterEqual(size.columns, 0)
2473        self.assertGreaterEqual(size.lines, 0)
2474
2475    def test_os_environ_first(self):
2476        "Check if environment variables have precedence"
2477
2478        with support.EnvironmentVarGuard() as env:
2479            env['COLUMNS'] = '777'
2480            del env['LINES']
2481            size = shutil.get_terminal_size()
2482        self.assertEqual(size.columns, 777)
2483
2484        with support.EnvironmentVarGuard() as env:
2485            del env['COLUMNS']
2486            env['LINES'] = '888'
2487            size = shutil.get_terminal_size()
2488        self.assertEqual(size.lines, 888)
2489
2490    def test_bad_environ(self):
2491        with support.EnvironmentVarGuard() as env:
2492            env['COLUMNS'] = 'xxx'
2493            env['LINES'] = 'yyy'
2494            size = shutil.get_terminal_size()
2495        self.assertGreaterEqual(size.columns, 0)
2496        self.assertGreaterEqual(size.lines, 0)
2497
2498    @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
2499    @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2500                         'need os.get_terminal_size()')
2501    def test_stty_match(self):
2502        """Check if stty returns the same results ignoring env
2503
2504        This test will fail if stdin and stdout are connected to
2505        different terminals with different sizes. Nevertheless, such
2506        situations should be pretty rare.
2507        """
2508        try:
2509            size = subprocess.check_output(['stty', 'size']).decode().split()
2510        except (FileNotFoundError, PermissionError,
2511                subprocess.CalledProcessError):
2512            self.skipTest("stty invocation failed")
2513        expected = (int(size[1]), int(size[0])) # reversed order
2514
2515        with support.EnvironmentVarGuard() as env:
2516            del env['LINES']
2517            del env['COLUMNS']
2518            actual = shutil.get_terminal_size()
2519
2520        self.assertEqual(expected, actual)
2521
2522    def test_fallback(self):
2523        with support.EnvironmentVarGuard() as env:
2524            del env['LINES']
2525            del env['COLUMNS']
2526
2527            # sys.__stdout__ has no fileno()
2528            with support.swap_attr(sys, '__stdout__', None):
2529                size = shutil.get_terminal_size(fallback=(10, 20))
2530            self.assertEqual(size.columns, 10)
2531            self.assertEqual(size.lines, 20)
2532
2533            # sys.__stdout__ is not a terminal on Unix
2534            # or fileno() not in (0, 1, 2) on Windows
2535            with open(os.devnull, 'w') as f, \
2536                 support.swap_attr(sys, '__stdout__', f):
2537                size = shutil.get_terminal_size(fallback=(30, 40))
2538            self.assertEqual(size.columns, 30)
2539            self.assertEqual(size.lines, 40)
2540
2541
2542class PublicAPITests(unittest.TestCase):
2543    """Ensures that the correct values are exposed in the public API."""
2544
2545    def test_module_all_attribute(self):
2546        self.assertTrue(hasattr(shutil, '__all__'))
2547        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2548                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2549                      'SpecialFileError', 'ExecError', 'make_archive',
2550                      'get_archive_formats', 'register_archive_format',
2551                      'unregister_archive_format', 'get_unpack_formats',
2552                      'register_unpack_format', 'unregister_unpack_format',
2553                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
2554                      'get_terminal_size', 'SameFileError']
2555        if hasattr(os, 'statvfs') or os.name == 'nt':
2556            target_api.append('disk_usage')
2557        self.assertEqual(set(shutil.__all__), set(target_api))
2558
2559
2560if __name__ == '__main__':
2561    unittest.main()
2562