• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import shutil
5import tempfile
6import sys
7import stat
8import os
9import os.path
10from os.path import splitdrive
11from distutils.spawn import find_executable, spawn
12from shutil import (_make_tarball, _make_zipfile, make_archive,
13                    register_archive_format, unregister_archive_format,
14                    get_archive_formats)
15import tarfile
16import warnings
17
18from test import test_support
19from test.test_support import TESTFN, check_warnings, captured_stdout
20
21TESTFN2 = TESTFN + "2"
22
23try:
24    import grp
25    import pwd
26    UID_GID_SUPPORT = True
27except ImportError:
28    UID_GID_SUPPORT = False
29
30try:
31    import zlib
32except ImportError:
33    zlib = None
34
35try:
36    import zipfile
37    ZIP_SUPPORT = True
38except ImportError:
39    ZIP_SUPPORT = find_executable('zip')
40
41class TestShutil(unittest.TestCase):
42
43    def setUp(self):
44        super(TestShutil, self).setUp()
45        self.tempdirs = []
46
47    def tearDown(self):
48        super(TestShutil, self).tearDown()
49        while self.tempdirs:
50            d = self.tempdirs.pop()
51            shutil.rmtree(d, os.name in ('nt', 'cygwin'))
52
53    def write_file(self, path, content='xxx'):
54        """Writes a file in the given path.
55
56
57        path can be a string or a sequence.
58        """
59        if isinstance(path, (list, tuple)):
60            path = os.path.join(*path)
61        f = open(path, 'w')
62        try:
63            f.write(content)
64        finally:
65            f.close()
66
67    def mkdtemp(self):
68        """Create a temporary directory that will be cleaned up.
69
70        Returns the path of the directory.
71        """
72        d = tempfile.mkdtemp()
73        self.tempdirs.append(d)
74        return d
75    def test_rmtree_errors(self):
76        # filename is guaranteed not to exist
77        filename = tempfile.mktemp()
78        self.assertRaises(OSError, shutil.rmtree, filename)
79
80    # See bug #1071513 for why we don't run this on cygwin
81    # and bug #1076467 for why we don't run this as root.
82    if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
83        and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
84        def test_on_error(self):
85            self.errorState = 0
86            os.mkdir(TESTFN)
87            self.childpath = os.path.join(TESTFN, 'a')
88            f = open(self.childpath, 'w')
89            f.close()
90            old_dir_mode = os.stat(TESTFN).st_mode
91            old_child_mode = os.stat(self.childpath).st_mode
92            # Make unwritable.
93            os.chmod(self.childpath, stat.S_IREAD)
94            os.chmod(TESTFN, stat.S_IREAD)
95
96            shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
97            # Test whether onerror has actually been called.
98            self.assertEqual(self.errorState, 2,
99                             "Expected call to onerror function did not happen.")
100
101            # Make writable again.
102            os.chmod(TESTFN, old_dir_mode)
103            os.chmod(self.childpath, old_child_mode)
104
105            # Clean up.
106            shutil.rmtree(TESTFN)
107
108    def check_args_to_onerror(self, func, arg, exc):
109        # test_rmtree_errors deliberately runs rmtree
110        # on a directory that is chmod 400, which will fail.
111        # This function is run when shutil.rmtree fails.
112        # 99.9% of the time it initially fails to remove
113        # a file in the directory, so the first time through
114        # func is os.remove.
115        # However, some Linux machines running ZFS on
116        # FUSE experienced a failure earlier in the process
117        # at os.listdir.  The first failure may legally
118        # be either.
119        if self.errorState == 0:
120            if func is os.remove:
121                self.assertEqual(arg, self.childpath)
122            else:
123                self.assertIs(func, os.listdir,
124                              "func must be either os.remove or os.listdir")
125                self.assertEqual(arg, TESTFN)
126            self.assertTrue(issubclass(exc[0], OSError))
127            self.errorState = 1
128        else:
129            self.assertEqual(func, os.rmdir)
130            self.assertEqual(arg, TESTFN)
131            self.assertTrue(issubclass(exc[0], OSError))
132            self.errorState = 2
133
134    def test_rmtree_dont_delete_file(self):
135        # When called on a file instead of a directory, don't delete it.
136        handle, path = tempfile.mkstemp()
137        os.fdopen(handle).close()
138        self.assertRaises(OSError, shutil.rmtree, path)
139        os.remove(path)
140
141    def test_copytree_simple(self):
142        def write_data(path, data):
143            f = open(path, "w")
144            f.write(data)
145            f.close()
146
147        def read_data(path):
148            f = open(path)
149            data = f.read()
150            f.close()
151            return data
152
153        src_dir = tempfile.mkdtemp()
154        dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
155
156        write_data(os.path.join(src_dir, 'test.txt'), '123')
157
158        os.mkdir(os.path.join(src_dir, 'test_dir'))
159        write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
160
161        try:
162            shutil.copytree(src_dir, dst_dir)
163            self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
164            self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
165            self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
166                                                        'test.txt')))
167            actual = read_data(os.path.join(dst_dir, 'test.txt'))
168            self.assertEqual(actual, '123')
169            actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
170            self.assertEqual(actual, '456')
171        finally:
172            for path in (
173                    os.path.join(src_dir, 'test.txt'),
174                    os.path.join(dst_dir, 'test.txt'),
175                    os.path.join(src_dir, 'test_dir', 'test.txt'),
176                    os.path.join(dst_dir, 'test_dir', 'test.txt'),
177                ):
178                if os.path.exists(path):
179                    os.remove(path)
180            for path in (src_dir,
181                    os.path.dirname(dst_dir)
182                ):
183                if os.path.exists(path):
184                    shutil.rmtree(path)
185
186    def test_copytree_with_exclude(self):
187
188        def write_data(path, data):
189            f = open(path, "w")
190            f.write(data)
191            f.close()
192
193        def read_data(path):
194            f = open(path)
195            data = f.read()
196            f.close()
197            return data
198
199        # creating data
200        join = os.path.join
201        exists = os.path.exists
202        src_dir = tempfile.mkdtemp()
203        try:
204            dst_dir = join(tempfile.mkdtemp(), 'destination')
205            write_data(join(src_dir, 'test.txt'), '123')
206            write_data(join(src_dir, 'test.tmp'), '123')
207            os.mkdir(join(src_dir, 'test_dir'))
208            write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
209            os.mkdir(join(src_dir, 'test_dir2'))
210            write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
211            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
212            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
213            write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
214            write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
215
216
217            # testing glob-like patterns
218            try:
219                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
220                shutil.copytree(src_dir, dst_dir, ignore=patterns)
221                # checking the result: some elements should not be copied
222                self.assertTrue(exists(join(dst_dir, 'test.txt')))
223                self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
224                self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
225            finally:
226                if os.path.exists(dst_dir):
227                    shutil.rmtree(dst_dir)
228            try:
229                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
230                shutil.copytree(src_dir, dst_dir, ignore=patterns)
231                # checking the result: some elements should not be copied
232                self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
233                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
234                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
235            finally:
236                if os.path.exists(dst_dir):
237                    shutil.rmtree(dst_dir)
238
239            # testing callable-style
240            try:
241                def _filter(src, names):
242                    res = []
243                    for name in names:
244                        path = os.path.join(src, name)
245
246                        if (os.path.isdir(path) and
247                            path.split()[-1] == 'subdir'):
248                            res.append(name)
249                        elif os.path.splitext(path)[-1] in ('.py'):
250                            res.append(name)
251                    return res
252
253                shutil.copytree(src_dir, dst_dir, ignore=_filter)
254
255                # checking the result: some elements should not be copied
256                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
257                                        'test.py')))
258                self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
259
260            finally:
261                if os.path.exists(dst_dir):
262                    shutil.rmtree(dst_dir)
263        finally:
264            shutil.rmtree(src_dir)
265            shutil.rmtree(os.path.dirname(dst_dir))
266
267    if hasattr(os, "symlink"):
268        def test_dont_copy_file_onto_link_to_itself(self):
269            # bug 851123.
270            os.mkdir(TESTFN)
271            src = os.path.join(TESTFN, 'cheese')
272            dst = os.path.join(TESTFN, 'shop')
273            try:
274                f = open(src, 'w')
275                f.write('cheddar')
276                f.close()
277
278                os.link(src, dst)
279                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
280                with open(src, 'r') as f:
281                    self.assertEqual(f.read(), 'cheddar')
282                os.remove(dst)
283
284                # Using `src` here would mean we end up with a symlink pointing
285                # to TESTFN/TESTFN/cheese, while it should point at
286                # TESTFN/cheese.
287                os.symlink('cheese', dst)
288                self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
289                with open(src, 'r') as f:
290                    self.assertEqual(f.read(), 'cheddar')
291                os.remove(dst)
292            finally:
293                try:
294                    shutil.rmtree(TESTFN)
295                except OSError:
296                    pass
297
298        def test_rmtree_on_symlink(self):
299            # bug 1669.
300            os.mkdir(TESTFN)
301            try:
302                src = os.path.join(TESTFN, 'cheese')
303                dst = os.path.join(TESTFN, 'shop')
304                os.mkdir(src)
305                os.symlink(src, dst)
306                self.assertRaises(OSError, shutil.rmtree, dst)
307            finally:
308                shutil.rmtree(TESTFN, ignore_errors=True)
309
310    if hasattr(os, "mkfifo"):
311        # Issue #3002: copyfile and copytree block indefinitely on named pipes
312        def test_copyfile_named_pipe(self):
313            os.mkfifo(TESTFN)
314            try:
315                self.assertRaises(shutil.SpecialFileError,
316                                  shutil.copyfile, TESTFN, TESTFN2)
317                self.assertRaises(shutil.SpecialFileError,
318                                  shutil.copyfile, __file__, TESTFN)
319            finally:
320                os.remove(TESTFN)
321
322        def test_copytree_named_pipe(self):
323            os.mkdir(TESTFN)
324            try:
325                subdir = os.path.join(TESTFN, "subdir")
326                os.mkdir(subdir)
327                pipe = os.path.join(subdir, "mypipe")
328                os.mkfifo(pipe)
329                try:
330                    shutil.copytree(TESTFN, TESTFN2)
331                except shutil.Error as e:
332                    errors = e.args[0]
333                    self.assertEqual(len(errors), 1)
334                    src, dst, error_msg = errors[0]
335                    self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
336                else:
337                    self.fail("shutil.Error should have been raised")
338            finally:
339                shutil.rmtree(TESTFN, ignore_errors=True)
340                shutil.rmtree(TESTFN2, ignore_errors=True)
341
342    @unittest.skipUnless(zlib, "requires zlib")
343    def test_make_tarball(self):
344        # creating something to tar
345        tmpdir = self.mkdtemp()
346        self.write_file([tmpdir, 'file1'], 'xxx')
347        self.write_file([tmpdir, 'file2'], 'xxx')
348        os.mkdir(os.path.join(tmpdir, 'sub'))
349        self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
350
351        tmpdir2 = self.mkdtemp()
352        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
353                            "source and target should be on same drive")
354
355        base_name = os.path.join(tmpdir2, 'archive')
356
357        # working with relative paths to avoid tar warnings
358        old_dir = os.getcwd()
359        os.chdir(tmpdir)
360        try:
361            _make_tarball(splitdrive(base_name)[1], '.')
362        finally:
363            os.chdir(old_dir)
364
365        # check if the compressed tarball was created
366        tarball = base_name + '.tar.gz'
367        self.assertTrue(os.path.exists(tarball))
368
369        # trying an uncompressed one
370        base_name = os.path.join(tmpdir2, 'archive')
371        old_dir = os.getcwd()
372        os.chdir(tmpdir)
373        try:
374            _make_tarball(splitdrive(base_name)[1], '.', compress=None)
375        finally:
376            os.chdir(old_dir)
377        tarball = base_name + '.tar'
378        self.assertTrue(os.path.exists(tarball))
379
380    def _tarinfo(self, path):
381        tar = tarfile.open(path)
382        try:
383            names = tar.getnames()
384            names.sort()
385            return tuple(names)
386        finally:
387            tar.close()
388
389    def _create_files(self):
390        # creating something to tar
391        tmpdir = self.mkdtemp()
392        dist = os.path.join(tmpdir, 'dist')
393        os.mkdir(dist)
394        self.write_file([dist, 'file1'], 'xxx')
395        self.write_file([dist, 'file2'], 'xxx')
396        os.mkdir(os.path.join(dist, 'sub'))
397        self.write_file([dist, 'sub', 'file3'], 'xxx')
398        os.mkdir(os.path.join(dist, 'sub2'))
399        tmpdir2 = self.mkdtemp()
400        base_name = os.path.join(tmpdir2, 'archive')
401        return tmpdir, tmpdir2, base_name
402
403    @unittest.skipUnless(zlib, "Requires zlib")
404    @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
405                         'Need the tar command to run')
406    def test_tarfile_vs_tar(self):
407        tmpdir, tmpdir2, base_name =  self._create_files()
408        old_dir = os.getcwd()
409        os.chdir(tmpdir)
410        try:
411            _make_tarball(base_name, 'dist')
412        finally:
413            os.chdir(old_dir)
414
415        # check if the compressed tarball was created
416        tarball = base_name + '.tar.gz'
417        self.assertTrue(os.path.exists(tarball))
418
419        # now create another tarball using `tar`
420        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
421        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
422        gzip_cmd = ['gzip', '-f9', 'archive2.tar']
423        old_dir = os.getcwd()
424        os.chdir(tmpdir)
425        try:
426            with captured_stdout() as s:
427                spawn(tar_cmd)
428                spawn(gzip_cmd)
429        finally:
430            os.chdir(old_dir)
431
432        self.assertTrue(os.path.exists(tarball2))
433        # let's compare both tarballs
434        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
435
436        # trying an uncompressed one
437        base_name = os.path.join(tmpdir2, 'archive')
438        old_dir = os.getcwd()
439        os.chdir(tmpdir)
440        try:
441            _make_tarball(base_name, 'dist', compress=None)
442        finally:
443            os.chdir(old_dir)
444        tarball = base_name + '.tar'
445        self.assertTrue(os.path.exists(tarball))
446
447        # now for a dry_run
448        base_name = os.path.join(tmpdir2, 'archive')
449        old_dir = os.getcwd()
450        os.chdir(tmpdir)
451        try:
452            _make_tarball(base_name, 'dist', compress=None, dry_run=True)
453        finally:
454            os.chdir(old_dir)
455        tarball = base_name + '.tar'
456        self.assertTrue(os.path.exists(tarball))
457
458    @unittest.skipUnless(zlib, "Requires zlib")
459    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
460    def test_make_zipfile(self):
461        # creating something to tar
462        tmpdir = self.mkdtemp()
463        self.write_file([tmpdir, 'file1'], 'xxx')
464        self.write_file([tmpdir, 'file2'], 'xxx')
465
466        tmpdir2 = self.mkdtemp()
467        base_name = os.path.join(tmpdir2, 'archive')
468        _make_zipfile(base_name, tmpdir)
469
470        # check if the compressed tarball was created
471        tarball = base_name + '.zip'
472        self.assertTrue(os.path.exists(tarball))
473
474
475    def test_make_archive(self):
476        tmpdir = self.mkdtemp()
477        base_name = os.path.join(tmpdir, 'archive')
478        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
479
480    @unittest.skipUnless(zlib, "Requires zlib")
481    def test_make_archive_owner_group(self):
482        # testing make_archive with owner and group, with various combinations
483        # this works even if there's not gid/uid support
484        if UID_GID_SUPPORT:
485            group = grp.getgrgid(0)[0]
486            owner = pwd.getpwuid(0)[0]
487        else:
488            group = owner = 'root'
489
490        base_dir, root_dir, base_name =  self._create_files()
491        base_name = os.path.join(self.mkdtemp() , 'archive')
492        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
493                           group=group)
494        self.assertTrue(os.path.exists(res))
495
496        res = make_archive(base_name, 'zip', root_dir, base_dir)
497        self.assertTrue(os.path.exists(res))
498
499        res = make_archive(base_name, 'tar', root_dir, base_dir,
500                           owner=owner, group=group)
501        self.assertTrue(os.path.exists(res))
502
503        res = make_archive(base_name, 'tar', root_dir, base_dir,
504                           owner='kjhkjhkjg', group='oihohoh')
505        self.assertTrue(os.path.exists(res))
506
507    @unittest.skipUnless(zlib, "Requires zlib")
508    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
509    def test_tarfile_root_owner(self):
510        tmpdir, tmpdir2, base_name =  self._create_files()
511        old_dir = os.getcwd()
512        os.chdir(tmpdir)
513        group = grp.getgrgid(0)[0]
514        owner = pwd.getpwuid(0)[0]
515        try:
516            archive_name = _make_tarball(base_name, 'dist', compress=None,
517                                         owner=owner, group=group)
518        finally:
519            os.chdir(old_dir)
520
521        # check if the compressed tarball was created
522        self.assertTrue(os.path.exists(archive_name))
523
524        # now checks the rights
525        archive = tarfile.open(archive_name)
526        try:
527            for member in archive.getmembers():
528                self.assertEqual(member.uid, 0)
529                self.assertEqual(member.gid, 0)
530        finally:
531            archive.close()
532
533    def test_make_archive_cwd(self):
534        current_dir = os.getcwd()
535        def _breaks(*args, **kw):
536            raise RuntimeError()
537
538        register_archive_format('xxx', _breaks, [], 'xxx file')
539        try:
540            try:
541                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
542            except Exception:
543                pass
544            self.assertEqual(os.getcwd(), current_dir)
545        finally:
546            unregister_archive_format('xxx')
547
548    def test_register_archive_format(self):
549
550        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
551        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
552                          1)
553        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
554                          [(1, 2), (1, 2, 3)])
555
556        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
557        formats = [name for name, params in get_archive_formats()]
558        self.assertIn('xxx', formats)
559
560        unregister_archive_format('xxx')
561        formats = [name for name, params in get_archive_formats()]
562        self.assertNotIn('xxx', formats)
563
564
565class TestMove(unittest.TestCase):
566
567    def setUp(self):
568        filename = "foo"
569        self.src_dir = tempfile.mkdtemp()
570        self.dst_dir = tempfile.mkdtemp()
571        self.src_file = os.path.join(self.src_dir, filename)
572        self.dst_file = os.path.join(self.dst_dir, filename)
573        # Try to create a dir in the current directory, hoping that it is
574        # not located on the same filesystem as the system tmp dir.
575        try:
576            self.dir_other_fs = tempfile.mkdtemp(
577                dir=os.path.dirname(__file__))
578            self.file_other_fs = os.path.join(self.dir_other_fs,
579                filename)
580        except OSError:
581            self.dir_other_fs = None
582        with open(self.src_file, "wb") as f:
583            f.write("spam")
584
585    def tearDown(self):
586        for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
587            try:
588                if d:
589                    shutil.rmtree(d)
590            except:
591                pass
592
593    def _check_move_file(self, src, dst, real_dst):
594        with open(src, "rb") as f:
595            contents = f.read()
596        shutil.move(src, dst)
597        with open(real_dst, "rb") as f:
598            self.assertEqual(contents, f.read())
599        self.assertFalse(os.path.exists(src))
600
601    def _check_move_dir(self, src, dst, real_dst):
602        contents = sorted(os.listdir(src))
603        shutil.move(src, dst)
604        self.assertEqual(contents, sorted(os.listdir(real_dst)))
605        self.assertFalse(os.path.exists(src))
606
607    def test_move_file(self):
608        # Move a file to another location on the same filesystem.
609        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
610
611    def test_move_file_to_dir(self):
612        # Move a file inside an existing dir on the same filesystem.
613        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
614
615    def test_move_file_other_fs(self):
616        # Move a file to an existing dir on another filesystem.
617        if not self.dir_other_fs:
618            # skip
619            return
620        self._check_move_file(self.src_file, self.file_other_fs,
621            self.file_other_fs)
622
623    def test_move_file_to_dir_other_fs(self):
624        # Move a file to another location on another filesystem.
625        if not self.dir_other_fs:
626            # skip
627            return
628        self._check_move_file(self.src_file, self.dir_other_fs,
629            self.file_other_fs)
630
631    def test_move_dir(self):
632        # Move a dir to another location on the same filesystem.
633        dst_dir = tempfile.mktemp()
634        try:
635            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
636        finally:
637            try:
638                shutil.rmtree(dst_dir)
639            except:
640                pass
641
642    def test_move_dir_other_fs(self):
643        # Move a dir to another location on another filesystem.
644        if not self.dir_other_fs:
645            # skip
646            return
647        dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
648        try:
649            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
650        finally:
651            try:
652                shutil.rmtree(dst_dir)
653            except:
654                pass
655
656    def test_move_dir_to_dir(self):
657        # Move a dir inside an existing dir on the same filesystem.
658        self._check_move_dir(self.src_dir, self.dst_dir,
659            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
660
661    def test_move_dir_to_dir_other_fs(self):
662        # Move a dir inside an existing dir on another filesystem.
663        if not self.dir_other_fs:
664            # skip
665            return
666        self._check_move_dir(self.src_dir, self.dir_other_fs,
667            os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
668
669    def test_existing_file_inside_dest_dir(self):
670        # A file with the same name inside the destination dir already exists.
671        with open(self.dst_file, "wb"):
672            pass
673        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
674
675    def test_dont_move_dir_in_itself(self):
676        # Moving a dir inside itself raises an Error.
677        dst = os.path.join(self.src_dir, "bar")
678        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
679
680    def test_destinsrc_false_negative(self):
681        os.mkdir(TESTFN)
682        try:
683            for src, dst in [('srcdir', 'srcdir/dest')]:
684                src = os.path.join(TESTFN, src)
685                dst = os.path.join(TESTFN, dst)
686                self.assertTrue(shutil._destinsrc(src, dst),
687                             msg='_destinsrc() wrongly concluded that '
688                             'dst (%s) is not in src (%s)' % (dst, src))
689        finally:
690            shutil.rmtree(TESTFN, ignore_errors=True)
691
692    def test_destinsrc_false_positive(self):
693        os.mkdir(TESTFN)
694        try:
695            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
696                src = os.path.join(TESTFN, src)
697                dst = os.path.join(TESTFN, dst)
698                self.assertFalse(shutil._destinsrc(src, dst),
699                            msg='_destinsrc() wrongly concluded that '
700                            'dst (%s) is in src (%s)' % (dst, src))
701        finally:
702            shutil.rmtree(TESTFN, ignore_errors=True)
703
704
705class TestCopyFile(unittest.TestCase):
706
707    _delete = False
708
709    class Faux(object):
710        _entered = False
711        _exited_with = None
712        _raised = False
713        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
714            self._raise_in_exit = raise_in_exit
715            self._suppress_at_exit = suppress_at_exit
716        def read(self, *args):
717            return ''
718        def __enter__(self):
719            self._entered = True
720        def __exit__(self, exc_type, exc_val, exc_tb):
721            self._exited_with = exc_type, exc_val, exc_tb
722            if self._raise_in_exit:
723                self._raised = True
724                raise IOError("Cannot close")
725            return self._suppress_at_exit
726
727    def tearDown(self):
728        if self._delete:
729            del shutil.open
730
731    def _set_shutil_open(self, func):
732        shutil.open = func
733        self._delete = True
734
735    def test_w_source_open_fails(self):
736        def _open(filename, mode='r'):
737            if filename == 'srcfile':
738                raise IOError('Cannot open "srcfile"')
739            assert 0  # shouldn't reach here.
740
741        self._set_shutil_open(_open)
742
743        self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
744
745    def test_w_dest_open_fails(self):
746
747        srcfile = self.Faux()
748
749        def _open(filename, mode='r'):
750            if filename == 'srcfile':
751                return srcfile
752            if filename == 'destfile':
753                raise IOError('Cannot open "destfile"')
754            assert 0  # shouldn't reach here.
755
756        self._set_shutil_open(_open)
757
758        shutil.copyfile('srcfile', 'destfile')
759        self.assertTrue(srcfile._entered)
760        self.assertTrue(srcfile._exited_with[0] is IOError)
761        self.assertEqual(srcfile._exited_with[1].args,
762                         ('Cannot open "destfile"',))
763
764    def test_w_dest_close_fails(self):
765
766        srcfile = self.Faux()
767        destfile = self.Faux(True)
768
769        def _open(filename, mode='r'):
770            if filename == 'srcfile':
771                return srcfile
772            if filename == 'destfile':
773                return destfile
774            assert 0  # shouldn't reach here.
775
776        self._set_shutil_open(_open)
777
778        shutil.copyfile('srcfile', 'destfile')
779        self.assertTrue(srcfile._entered)
780        self.assertTrue(destfile._entered)
781        self.assertTrue(destfile._raised)
782        self.assertTrue(srcfile._exited_with[0] is IOError)
783        self.assertEqual(srcfile._exited_with[1].args,
784                         ('Cannot close',))
785
786    def test_w_source_close_fails(self):
787
788        srcfile = self.Faux(True)
789        destfile = self.Faux()
790
791        def _open(filename, mode='r'):
792            if filename == 'srcfile':
793                return srcfile
794            if filename == 'destfile':
795                return destfile
796            assert 0  # shouldn't reach here.
797
798        self._set_shutil_open(_open)
799
800        self.assertRaises(IOError,
801                          shutil.copyfile, 'srcfile', 'destfile')
802        self.assertTrue(srcfile._entered)
803        self.assertTrue(destfile._entered)
804        self.assertFalse(destfile._raised)
805        self.assertTrue(srcfile._exited_with[0] is None)
806        self.assertTrue(srcfile._raised)
807
808    def test_move_dir_caseinsensitive(self):
809        # Renames a folder to the same name
810        # but a different case.
811
812        self.src_dir = tempfile.mkdtemp()
813        dst_dir = os.path.join(
814                os.path.dirname(self.src_dir),
815                os.path.basename(self.src_dir).upper())
816        self.assertNotEqual(self.src_dir, dst_dir)
817
818        try:
819            shutil.move(self.src_dir, dst_dir)
820            self.assertTrue(os.path.isdir(dst_dir))
821        finally:
822            if os.path.exists(dst_dir):
823                os.rmdir(dst_dir)
824
825
826
827def test_main():
828    test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
829
830if __name__ == '__main__':
831    test_main()
832