• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import shutil
5import tempfile
6import sys
7import stat
8import os
9import os.path
10import errno
11import subprocess
12from distutils.spawn import find_executable
13from shutil import (make_archive,
14                    register_archive_format, unregister_archive_format,
15                    get_archive_formats)
16import tarfile
17import warnings
18
19from test import test_support as support
20from test.test_support import TESTFN, check_warnings, captured_stdout
21
22TESTFN2 = TESTFN + "2"
23
24try:
25    import grp
26    import pwd
27    UID_GID_SUPPORT = True
28except ImportError:
29    UID_GID_SUPPORT = False
30
31try:
32    import zlib
33except ImportError:
34    zlib = None
35
36try:
37    import zipfile
38    import zlib
39    ZIP_SUPPORT = True
40except ImportError:
41    ZIP_SUPPORT = find_executable('zip')
42
43class TestShutil(unittest.TestCase):
44
45    def setUp(self):
46        super(TestShutil, self).setUp()
47        self.tempdirs = []
48
49    def tearDown(self):
50        super(TestShutil, self).tearDown()
51        while self.tempdirs:
52            d = self.tempdirs.pop()
53            shutil.rmtree(d, os.name in ('nt', 'cygwin'))
54
55    def write_file(self, path, content='xxx'):
56        """Writes a file in the given path.
57
58
59        path can be a string or a sequence.
60        """
61        if isinstance(path, (list, tuple)):
62            path = os.path.join(*path)
63        f = open(path, 'w')
64        try:
65            f.write(content)
66        finally:
67            f.close()
68
69    def mkdtemp(self):
70        """Create a temporary directory that will be cleaned up.
71
72        Returns the path of the directory.
73        """
74        d = tempfile.mkdtemp()
75        self.tempdirs.append(d)
76        return d
77    def test_rmtree_errors(self):
78        # filename is guaranteed not to exist
79        filename = tempfile.mktemp()
80        self.assertRaises(OSError, shutil.rmtree, filename)
81
82    @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
83    @unittest.skipIf(sys.platform[:6] == 'cygwin',
84                     "This test can't be run on Cygwin (issue #1071513).")
85    @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
86                     "This test can't be run reliably as root (issue #1076467).")
87    def test_on_error(self):
88        self.errorState = 0
89        os.mkdir(TESTFN)
90        self.childpath = os.path.join(TESTFN, 'a')
91        f = open(self.childpath, 'w')
92        f.close()
93        old_dir_mode = os.stat(TESTFN).st_mode
94        old_child_mode = os.stat(self.childpath).st_mode
95        # Make unwritable.
96        os.chmod(self.childpath, stat.S_IREAD)
97        os.chmod(TESTFN, stat.S_IREAD)
98
99        shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
100        # Test whether onerror has actually been called.
101        self.assertEqual(self.errorState, 2,
102                            "Expected call to onerror function did not happen.")
103
104        # Make writable again.
105        os.chmod(TESTFN, old_dir_mode)
106        os.chmod(self.childpath, old_child_mode)
107
108        # Clean up.
109        shutil.rmtree(TESTFN)
110
111    def check_args_to_onerror(self, func, arg, exc):
112        # test_rmtree_errors deliberately runs rmtree
113        # on a directory that is chmod 400, which will fail.
114        # This function is run when shutil.rmtree fails.
115        # 99.9% of the time it initially fails to remove
116        # a file in the directory, so the first time through
117        # func is os.remove.
118        # However, some Linux machines running ZFS on
119        # FUSE experienced a failure earlier in the process
120        # at os.listdir.  The first failure may legally
121        # be either.
122        if self.errorState == 0:
123            if func is os.remove:
124                self.assertEqual(arg, self.childpath)
125            else:
126                self.assertIs(func, os.listdir,
127                              "func must be either os.remove or os.listdir")
128                self.assertEqual(arg, TESTFN)
129            self.assertTrue(issubclass(exc[0], OSError))
130            self.errorState = 1
131        else:
132            self.assertEqual(func, os.rmdir)
133            self.assertEqual(arg, TESTFN)
134            self.assertTrue(issubclass(exc[0], OSError))
135            self.errorState = 2
136
137    def test_rmtree_dont_delete_file(self):
138        # When called on a file instead of a directory, don't delete it.
139        handle, path = tempfile.mkstemp()
140        os.fdopen(handle).close()
141        self.assertRaises(OSError, shutil.rmtree, path)
142        os.remove(path)
143
144    def test_copytree_simple(self):
145        def write_data(path, data):
146            f = open(path, "w")
147            f.write(data)
148            f.close()
149
150        def read_data(path):
151            f = open(path)
152            data = f.read()
153            f.close()
154            return data
155
156        src_dir = tempfile.mkdtemp()
157        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
158
159        write_data(os.path.join(src_dir, 'test.txt'), '123')
160
161        os.mkdir(os.path.join(src_dir, 'test_dir'))
162        write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
163
164        try:
165            shutil.copytree(src_dir, dst_dir)
166            self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
167            self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
168            self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
169                                                        'test.txt')))
170            actual = read_data(os.path.join(dst_dir, 'test.txt'))
171            self.assertEqual(actual, '123')
172            actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
173            self.assertEqual(actual, '456')
174        finally:
175            for path in (
176                    os.path.join(src_dir, 'test.txt'),
177                    os.path.join(dst_dir, 'test.txt'),
178                    os.path.join(src_dir, 'test_dir', 'test.txt'),
179                    os.path.join(dst_dir, 'test_dir', 'test.txt'),
180                ):
181                if os.path.exists(path):
182                    os.remove(path)
183            for path in (src_dir,
184                    os.path.dirname(dst_dir)
185                ):
186                if os.path.exists(path):
187                    shutil.rmtree(path)
188
189    def test_copytree_with_exclude(self):
190
191        def write_data(path, data):
192            f = open(path, "w")
193            f.write(data)
194            f.close()
195
196        def read_data(path):
197            f = open(path)
198            data = f.read()
199            f.close()
200            return data
201
202        # creating data
203        join = os.path.join
204        exists = os.path.exists
205        src_dir = tempfile.mkdtemp()
206        try:
207            dst_dir = join(tempfile.mkdtemp(), 'destination')
208            write_data(join(src_dir, 'test.txt'), '123')
209            write_data(join(src_dir, 'test.tmp'), '123')
210            os.mkdir(join(src_dir, 'test_dir'))
211            write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
212            os.mkdir(join(src_dir, 'test_dir2'))
213            write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
214            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
215            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
216            write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
217            write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
218
219
220            # testing glob-like patterns
221            try:
222                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
223                shutil.copytree(src_dir, dst_dir, ignore=patterns)
224                # checking the result: some elements should not be copied
225                self.assertTrue(exists(join(dst_dir, 'test.txt')))
226                self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
227                self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
228            finally:
229                if os.path.exists(dst_dir):
230                    shutil.rmtree(dst_dir)
231            try:
232                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
233                shutil.copytree(src_dir, dst_dir, ignore=patterns)
234                # checking the result: some elements should not be copied
235                self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
236                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
237                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
238            finally:
239                if os.path.exists(dst_dir):
240                    shutil.rmtree(dst_dir)
241
242            # testing callable-style
243            try:
244                def _filter(src, names):
245                    res = []
246                    for name in names:
247                        path = os.path.join(src, name)
248
249                        if (os.path.isdir(path) and
250                            path.split()[-1] == 'subdir'):
251                            res.append(name)
252                        elif os.path.splitext(path)[-1] in ('.py'):
253                            res.append(name)
254                    return res
255
256                shutil.copytree(src_dir, dst_dir, ignore=_filter)
257
258                # checking the result: some elements should not be copied
259                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
260                                        'test.py')))
261                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
262
263            finally:
264                if os.path.exists(dst_dir):
265                    shutil.rmtree(dst_dir)
266        finally:
267            shutil.rmtree(src_dir)
268            shutil.rmtree(os.path.dirname(dst_dir))
269
270    if hasattr(os, "symlink"):
271        def test_dont_copy_file_onto_link_to_itself(self):
272            # bug 851123.
273            os.mkdir(TESTFN)
274            src = os.path.join(TESTFN, 'cheese')
275            dst = os.path.join(TESTFN, 'shop')
276            try:
277                f = open(src, 'w')
278                f.write('cheddar')
279                f.close()
280
281                os.link(src, dst)
282                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
283                with open(src, 'r') as f:
284                    self.assertEqual(f.read(), 'cheddar')
285                os.remove(dst)
286
287                # Using `src` here would mean we end up with a symlink pointing
288                # to TESTFN/TESTFN/cheese, while it should point at
289                # TESTFN/cheese.
290                os.symlink('cheese', dst)
291                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
292                with open(src, 'r') as f:
293                    self.assertEqual(f.read(), 'cheddar')
294                os.remove(dst)
295            finally:
296                try:
297                    shutil.rmtree(TESTFN)
298                except OSError:
299                    pass
300
301        def test_rmtree_on_symlink(self):
302            # bug 1669.
303            os.mkdir(TESTFN)
304            try:
305                src = os.path.join(TESTFN, 'cheese')
306                dst = os.path.join(TESTFN, 'shop')
307                os.mkdir(src)
308                os.symlink(src, dst)
309                self.assertRaises(OSError, shutil.rmtree, dst)
310            finally:
311                shutil.rmtree(TESTFN, ignore_errors=True)
312
313    # Issue #3002: copyfile and copytree block indefinitely on named pipes
314    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
315    def test_copyfile_named_pipe(self):
316        os.mkfifo(TESTFN)
317        try:
318            self.assertRaises(shutil.SpecialFileError,
319                              shutil.copyfile, TESTFN, TESTFN2)
320            self.assertRaises(shutil.SpecialFileError,
321                              shutil.copyfile, __file__, TESTFN)
322        finally:
323            os.remove(TESTFN)
324
325    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
326    def test_copytree_named_pipe(self):
327        os.mkdir(TESTFN)
328        try:
329            subdir = os.path.join(TESTFN, "subdir")
330            os.mkdir(subdir)
331            pipe = os.path.join(subdir, "mypipe")
332            os.mkfifo(pipe)
333            try:
334                shutil.copytree(TESTFN, TESTFN2)
335            except shutil.Error as e:
336                errors = e.args[0]
337                self.assertEqual(len(errors), 1)
338                src, dst, error_msg = errors[0]
339                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
340            else:
341                self.fail("shutil.Error should have been raised")
342        finally:
343            shutil.rmtree(TESTFN, ignore_errors=True)
344            shutil.rmtree(TESTFN2, ignore_errors=True)
345
346    @unittest.skipUnless(hasattr(os, 'chflags') and
347                         hasattr(errno, 'EOPNOTSUPP') and
348                         hasattr(errno, 'ENOTSUP'),
349                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
350    def test_copystat_handles_harmless_chflags_errors(self):
351        tmpdir = self.mkdtemp()
352        file1 = os.path.join(tmpdir, 'file1')
353        file2 = os.path.join(tmpdir, 'file2')
354        self.write_file(file1, 'xxx')
355        self.write_file(file2, 'xxx')
356
357        def make_chflags_raiser(err):
358            ex = OSError()
359
360            def _chflags_raiser(path, flags):
361                ex.errno = err
362                raise ex
363            return _chflags_raiser
364        old_chflags = os.chflags
365        try:
366            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
367                os.chflags = make_chflags_raiser(err)
368                shutil.copystat(file1, file2)
369            # assert others errors break it
370            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
371            self.assertRaises(OSError, shutil.copystat, file1, file2)
372        finally:
373            os.chflags = old_chflags
374
375    @unittest.skipUnless(zlib, "requires zlib")
376    def test_make_tarball(self):
377        # creating something to tar
378        root_dir, base_dir = self._create_files('')
379
380        tmpdir2 = self.mkdtemp()
381        # force shutil to create the directory
382        os.rmdir(tmpdir2)
383        # working with relative paths
384        work_dir = os.path.dirname(tmpdir2)
385        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
386
387        with support.change_cwd(work_dir):
388            base_name = os.path.abspath(rel_base_name)
389            tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
390
391        # check if the compressed tarball was created
392        self.assertEqual(tarball, base_name + '.tar.gz')
393        self.assertTrue(os.path.isfile(tarball))
394        self.assertTrue(tarfile.is_tarfile(tarball))
395        with tarfile.open(tarball, 'r:gz') as tf:
396            self.assertEqual(sorted(tf.getnames()),
397                             ['.', './file1', './file2',
398                              './sub', './sub/file3', './sub2'])
399
400        # trying an uncompressed one
401        with support.change_cwd(work_dir):
402            tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
403        self.assertEqual(tarball, base_name + '.tar')
404        self.assertTrue(os.path.isfile(tarball))
405        self.assertTrue(tarfile.is_tarfile(tarball))
406        with tarfile.open(tarball, 'r') as tf:
407            self.assertEqual(sorted(tf.getnames()),
408                             ['.', './file1', './file2',
409                              './sub', './sub/file3', './sub2'])
410
411    def _tarinfo(self, path):
412        with tarfile.open(path) as tar:
413            names = tar.getnames()
414            names.sort()
415            return tuple(names)
416
417    def _create_files(self, base_dir='dist'):
418        # creating something to tar
419        root_dir = self.mkdtemp()
420        dist = os.path.join(root_dir, base_dir)
421        if not os.path.isdir(dist):
422            os.makedirs(dist)
423        self.write_file((dist, 'file1'), 'xxx')
424        self.write_file((dist, 'file2'), 'xxx')
425        os.mkdir(os.path.join(dist, 'sub'))
426        self.write_file((dist, 'sub', 'file3'), 'xxx')
427        os.mkdir(os.path.join(dist, 'sub2'))
428        if base_dir:
429            self.write_file((root_dir, 'outer'), 'xxx')
430        return root_dir, base_dir
431
432    @unittest.skipUnless(zlib, "Requires zlib")
433    @unittest.skipUnless(find_executable('tar'),
434                         'Need the tar command to run')
435    def test_tarfile_vs_tar(self):
436        root_dir, base_dir = self._create_files()
437        base_name = os.path.join(self.mkdtemp(), 'archive')
438        tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
439
440        # check if the compressed tarball was created
441        self.assertEqual(tarball, base_name + '.tar.gz')
442        self.assertTrue(os.path.isfile(tarball))
443
444        # now create another tarball using `tar`
445        tarball2 = os.path.join(root_dir, 'archive2.tar')
446        tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
447        subprocess.check_call(tar_cmd, cwd=root_dir)
448
449        self.assertTrue(os.path.isfile(tarball2))
450        # let's compare both tarballs
451        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
452
453        # trying an uncompressed one
454        tarball = make_archive(base_name, 'tar', root_dir, base_dir)
455        self.assertEqual(tarball, base_name + '.tar')
456        self.assertTrue(os.path.isfile(tarball))
457
458        # now for a dry_run
459        tarball = make_archive(base_name, 'tar', root_dir, base_dir,
460                               dry_run=True)
461        self.assertEqual(tarball, base_name + '.tar')
462        self.assertTrue(os.path.isfile(tarball))
463
464    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
465    def test_make_zipfile(self):
466        # creating something to zip
467        root_dir, base_dir = self._create_files()
468
469        tmpdir2 = self.mkdtemp()
470        # force shutil to create the directory
471        os.rmdir(tmpdir2)
472        # working with relative paths
473        work_dir = os.path.dirname(tmpdir2)
474        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
475
476        with support.change_cwd(work_dir):
477            base_name = os.path.abspath(rel_base_name)
478            res = make_archive(rel_base_name, 'zip', root_dir)
479
480        self.assertEqual(res, base_name + '.zip')
481        self.assertTrue(os.path.isfile(res))
482        self.assertTrue(zipfile.is_zipfile(res))
483        with zipfile.ZipFile(res) as zf:
484            self.assertEqual(sorted(zf.namelist()),
485                    ['dist/', 'dist/file1', 'dist/file2',
486                     'dist/sub/', 'dist/sub/file3', 'dist/sub2/',
487                     'outer'])
488        support.unlink(res)
489
490        with support.change_cwd(work_dir):
491            base_name = os.path.abspath(rel_base_name)
492            res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
493
494        self.assertEqual(res, base_name + '.zip')
495        self.assertTrue(os.path.isfile(res))
496        self.assertTrue(zipfile.is_zipfile(res))
497        with zipfile.ZipFile(res) as zf:
498            self.assertEqual(sorted(zf.namelist()),
499                    ['dist/', 'dist/file1', 'dist/file2',
500                     'dist/sub/', 'dist/sub/file3', 'dist/sub2/'])
501
502    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
503    @unittest.skipUnless(find_executable('zip'),
504                         'Need the zip command to run')
505    def test_zipfile_vs_zip(self):
506        root_dir, base_dir = self._create_files()
507        base_name = os.path.join(self.mkdtemp(), 'archive')
508        archive = make_archive(base_name, 'zip', root_dir, base_dir)
509
510        # check if ZIP file  was created
511        self.assertEqual(archive, base_name + '.zip')
512        self.assertTrue(os.path.isfile(archive))
513
514        # now create another ZIP file using `zip`
515        archive2 = os.path.join(root_dir, 'archive2.zip')
516        zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
517        subprocess.check_call(zip_cmd, cwd=root_dir)
518
519        self.assertTrue(os.path.isfile(archive2))
520        # let's compare both ZIP files
521        with zipfile.ZipFile(archive) as zf:
522            names = zf.namelist()
523        with zipfile.ZipFile(archive2) as zf:
524            names2 = zf.namelist()
525        self.assertEqual(sorted(names), sorted(names2))
526
527    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
528    @unittest.skipUnless(find_executable('unzip'),
529                         'Need the unzip command to run')
530    def test_unzip_zipfile(self):
531        root_dir, base_dir = self._create_files()
532        base_name = os.path.join(self.mkdtemp(), 'archive')
533        archive = make_archive(base_name, 'zip', root_dir, base_dir)
534
535        # check if ZIP file  was created
536        self.assertEqual(archive, base_name + '.zip')
537        self.assertTrue(os.path.isfile(archive))
538
539        # now check the ZIP file using `unzip -t`
540        zip_cmd = ['unzip', '-t', archive]
541        with support.change_cwd(root_dir):
542            try:
543                subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
544            except subprocess.CalledProcessError as exc:
545                details = exc.output
546                if 'unrecognized option: t' in details:
547                    self.skipTest("unzip doesn't support -t")
548                msg = "{}\n\n**Unzip Output**\n{}"
549                self.fail(msg.format(exc, details))
550
551    def test_make_archive(self):
552        tmpdir = self.mkdtemp()
553        base_name = os.path.join(tmpdir, 'archive')
554        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
555
556    @unittest.skipUnless(zlib, "Requires zlib")
557    def test_make_archive_owner_group(self):
558        # testing make_archive with owner and group, with various combinations
559        # this works even if there's not gid/uid support
560        if UID_GID_SUPPORT:
561            group = grp.getgrgid(0)[0]
562            owner = pwd.getpwuid(0)[0]
563        else:
564            group = owner = 'root'
565
566        root_dir, base_dir = self._create_files()
567        base_name = os.path.join(self.mkdtemp(), 'archive')
568        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
569                           group=group)
570        self.assertTrue(os.path.isfile(res))
571
572        res = make_archive(base_name, 'zip', root_dir, base_dir)
573        self.assertTrue(os.path.isfile(res))
574
575        res = make_archive(base_name, 'tar', root_dir, base_dir,
576                           owner=owner, group=group)
577        self.assertTrue(os.path.isfile(res))
578
579        res = make_archive(base_name, 'tar', root_dir, base_dir,
580                           owner='kjhkjhkjg', group='oihohoh')
581        self.assertTrue(os.path.isfile(res))
582
583    @unittest.skipUnless(zlib, "Requires zlib")
584    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
585    def test_tarfile_root_owner(self):
586        root_dir, base_dir = self._create_files()
587        base_name = os.path.join(self.mkdtemp(), 'archive')
588        group = grp.getgrgid(0)[0]
589        owner = pwd.getpwuid(0)[0]
590        with support.change_cwd(root_dir):
591            archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
592                                        owner=owner, group=group)
593
594        # check if the compressed tarball was created
595        self.assertTrue(os.path.isfile(archive_name))
596
597        # now checks the rights
598        archive = tarfile.open(archive_name)
599        try:
600            for member in archive.getmembers():
601                self.assertEqual(member.uid, 0)
602                self.assertEqual(member.gid, 0)
603        finally:
604            archive.close()
605
606    def test_make_archive_cwd(self):
607        current_dir = os.getcwd()
608        def _breaks(*args, **kw):
609            raise RuntimeError()
610
611        register_archive_format('xxx', _breaks, [], 'xxx file')
612        try:
613            try:
614                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
615            except Exception:
616                pass
617            self.assertEqual(os.getcwd(), current_dir)
618        finally:
619            unregister_archive_format('xxx')
620
621    def test_make_tarfile_in_curdir(self):
622        # Issue #21280
623        root_dir = self.mkdtemp()
624        saved_dir = os.getcwd()
625        try:
626            os.chdir(root_dir)
627            self.assertEqual(make_archive('test', 'tar'), 'test.tar')
628            self.assertTrue(os.path.isfile('test.tar'))
629        finally:
630            os.chdir(saved_dir)
631
632    @unittest.skipUnless(zlib, "Requires zlib")
633    def test_make_zipfile_in_curdir(self):
634        # Issue #21280
635        root_dir = self.mkdtemp()
636        saved_dir = os.getcwd()
637        try:
638            os.chdir(root_dir)
639            self.assertEqual(make_archive('test', 'zip'), 'test.zip')
640            self.assertTrue(os.path.isfile('test.zip'))
641        finally:
642            os.chdir(saved_dir)
643
644    def test_register_archive_format(self):
645
646        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
647        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
648                          1)
649        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
650                          [(1, 2), (1, 2, 3)])
651
652        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
653        formats = [name for name, params in get_archive_formats()]
654        self.assertIn('xxx', formats)
655
656        unregister_archive_format('xxx')
657        formats = [name for name, params in get_archive_formats()]
658        self.assertNotIn('xxx', formats)
659
660
661class TestMove(unittest.TestCase):
662
663    def setUp(self):
664        filename = "foo"
665        self.src_dir = tempfile.mkdtemp()
666        self.dst_dir = tempfile.mkdtemp()
667        self.src_file = os.path.join(self.src_dir, filename)
668        self.dst_file = os.path.join(self.dst_dir, filename)
669        # Try to create a dir in the current directory, hoping that it is
670        # not located on the same filesystem as the system tmp dir.
671        try:
672            self.dir_other_fs = tempfile.mkdtemp(
673                dir=os.path.dirname(__file__))
674            self.file_other_fs = os.path.join(self.dir_other_fs,
675                filename)
676        except OSError:
677            self.dir_other_fs = None
678        with open(self.src_file, "wb") as f:
679            f.write("spam")
680
681    def tearDown(self):
682        for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
683            try:
684                if d:
685                    shutil.rmtree(d)
686            except:
687                pass
688
689    def _check_move_file(self, src, dst, real_dst):
690        with open(src, "rb") as f:
691            contents = f.read()
692        shutil.move(src, dst)
693        with open(real_dst, "rb") as f:
694            self.assertEqual(contents, f.read())
695        self.assertFalse(os.path.exists(src))
696
697    def _check_move_dir(self, src, dst, real_dst):
698        contents = sorted(os.listdir(src))
699        shutil.move(src, dst)
700        self.assertEqual(contents, sorted(os.listdir(real_dst)))
701        self.assertFalse(os.path.exists(src))
702
703    def test_move_file(self):
704        # Move a file to another location on the same filesystem.
705        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
706
707    def test_move_file_to_dir(self):
708        # Move a file inside an existing dir on the same filesystem.
709        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
710
711    def test_move_file_other_fs(self):
712        # Move a file to an existing dir on another filesystem.
713        if not self.dir_other_fs:
714            self.skipTest('dir on other filesystem not available')
715        self._check_move_file(self.src_file, self.file_other_fs,
716            self.file_other_fs)
717
718    def test_move_file_to_dir_other_fs(self):
719        # Move a file to another location on another filesystem.
720        if not self.dir_other_fs:
721            self.skipTest('dir on other filesystem not available')
722        self._check_move_file(self.src_file, self.dir_other_fs,
723            self.file_other_fs)
724
725    def test_move_dir(self):
726        # Move a dir to another location on the same filesystem.
727        dst_dir = tempfile.mktemp()
728        try:
729            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
730        finally:
731            try:
732                shutil.rmtree(dst_dir)
733            except:
734                pass
735
736    def test_move_dir_other_fs(self):
737        # Move a dir to another location on another filesystem.
738        if not self.dir_other_fs:
739            self.skipTest('dir on other filesystem not available')
740        dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
741        try:
742            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
743        finally:
744            try:
745                shutil.rmtree(dst_dir)
746            except:
747                pass
748
749    def test_move_dir_to_dir(self):
750        # Move a dir inside an existing dir on the same filesystem.
751        self._check_move_dir(self.src_dir, self.dst_dir,
752            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
753
754    def test_move_dir_to_dir_other_fs(self):
755        # Move a dir inside an existing dir on another filesystem.
756        if not self.dir_other_fs:
757            self.skipTest('dir on other filesystem not available')
758        self._check_move_dir(self.src_dir, self.dir_other_fs,
759            os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
760
761    def test_move_dir_sep_to_dir(self):
762        self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
763            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
764
765    @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
766    def test_move_dir_altsep_to_dir(self):
767        self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
768            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
769
770    def test_existing_file_inside_dest_dir(self):
771        # A file with the same name inside the destination dir already exists.
772        with open(self.dst_file, "wb"):
773            pass
774        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
775
776    def test_dont_move_dir_in_itself(self):
777        # Moving a dir inside itself raises an Error.
778        dst = os.path.join(self.src_dir, "bar")
779        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
780
781    def test_destinsrc_false_negative(self):
782        os.mkdir(TESTFN)
783        try:
784            for src, dst in [('srcdir', 'srcdir/dest')]:
785                src = os.path.join(TESTFN, src)
786                dst = os.path.join(TESTFN, dst)
787                self.assertTrue(shutil._destinsrc(src, dst),
788                             msg='_destinsrc() wrongly concluded that '
789                             'dst (%s) is not in src (%s)' % (dst, src))
790        finally:
791            shutil.rmtree(TESTFN, ignore_errors=True)
792
793    def test_destinsrc_false_positive(self):
794        os.mkdir(TESTFN)
795        try:
796            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
797                src = os.path.join(TESTFN, src)
798                dst = os.path.join(TESTFN, dst)
799                self.assertFalse(shutil._destinsrc(src, dst),
800                            msg='_destinsrc() wrongly concluded that '
801                            'dst (%s) is in src (%s)' % (dst, src))
802        finally:
803            shutil.rmtree(TESTFN, ignore_errors=True)
804
805
806class TestCopyFile(unittest.TestCase):
807
808    _delete = False
809
810    class Faux(object):
811        _entered = False
812        _exited_with = None
813        _raised = False
814        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
815            self._raise_in_exit = raise_in_exit
816            self._suppress_at_exit = suppress_at_exit
817        def read(self, *args):
818            return ''
819        def __enter__(self):
820            self._entered = True
821        def __exit__(self, exc_type, exc_val, exc_tb):
822            self._exited_with = exc_type, exc_val, exc_tb
823            if self._raise_in_exit:
824                self._raised = True
825                raise IOError("Cannot close")
826            return self._suppress_at_exit
827
828    def tearDown(self):
829        if self._delete:
830            del shutil.open
831
832    def _set_shutil_open(self, func):
833        shutil.open = func
834        self._delete = True
835
836    def test_w_source_open_fails(self):
837        def _open(filename, mode='r'):
838            if filename == 'srcfile':
839                raise IOError('Cannot open "srcfile"')
840            assert 0  # shouldn't reach here.
841
842        self._set_shutil_open(_open)
843
844        self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
845
846    def test_w_dest_open_fails(self):
847
848        srcfile = self.Faux()
849
850        def _open(filename, mode='r'):
851            if filename == 'srcfile':
852                return srcfile
853            if filename == 'destfile':
854                raise IOError('Cannot open "destfile"')
855            assert 0  # shouldn't reach here.
856
857        self._set_shutil_open(_open)
858
859        shutil.copyfile('srcfile', 'destfile')
860        self.assertTrue(srcfile._entered)
861        self.assertTrue(srcfile._exited_with[0] is IOError)
862        self.assertEqual(srcfile._exited_with[1].args,
863                         ('Cannot open "destfile"',))
864
865    def test_w_dest_close_fails(self):
866
867        srcfile = self.Faux()
868        destfile = self.Faux(True)
869
870        def _open(filename, mode='r'):
871            if filename == 'srcfile':
872                return srcfile
873            if filename == 'destfile':
874                return destfile
875            assert 0  # shouldn't reach here.
876
877        self._set_shutil_open(_open)
878
879        shutil.copyfile('srcfile', 'destfile')
880        self.assertTrue(srcfile._entered)
881        self.assertTrue(destfile._entered)
882        self.assertTrue(destfile._raised)
883        self.assertTrue(srcfile._exited_with[0] is IOError)
884        self.assertEqual(srcfile._exited_with[1].args,
885                         ('Cannot close',))
886
887    def test_w_source_close_fails(self):
888
889        srcfile = self.Faux(True)
890        destfile = self.Faux()
891
892        def _open(filename, mode='r'):
893            if filename == 'srcfile':
894                return srcfile
895            if filename == 'destfile':
896                return destfile
897            assert 0  # shouldn't reach here.
898
899        self._set_shutil_open(_open)
900
901        self.assertRaises(IOError,
902                          shutil.copyfile, 'srcfile', 'destfile')
903        self.assertTrue(srcfile._entered)
904        self.assertTrue(destfile._entered)
905        self.assertFalse(destfile._raised)
906        self.assertTrue(srcfile._exited_with[0] is None)
907        self.assertTrue(srcfile._raised)
908
909    def test_move_dir_caseinsensitive(self):
910        # Renames a folder to the same name
911        # but a different case.
912
913        self.src_dir = tempfile.mkdtemp()
914        dst_dir = os.path.join(
915                os.path.dirname(self.src_dir),
916                os.path.basename(self.src_dir).upper())
917        self.assertNotEqual(self.src_dir, dst_dir)
918
919        try:
920            shutil.move(self.src_dir, dst_dir)
921            self.assertTrue(os.path.isdir(dst_dir))
922        finally:
923            if os.path.exists(dst_dir):
924                os.rmdir(dst_dir)
925
926
927
928def test_main():
929    support.run_unittest(TestShutil, TestMove, TestCopyFile)
930
931if __name__ == '__main__':
932    test_main()
933