• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2"""Tests for distutils.archive_util."""
3import unittest
4import os
5import sys
6import tarfile
7from os.path import splitdrive
8import warnings
9
10from distutils import archive_util
11from distutils.archive_util import (check_archive_formats, make_tarball,
12                                    make_zipfile, make_archive,
13                                    ARCHIVE_FORMATS)
14from distutils.spawn import find_executable, spawn
15from distutils.tests import support
16from test.support import run_unittest, patch
17from test.support.os_helper import change_cwd
18from test.support.warnings_helper import check_warnings
19
20try:
21    import grp
22    import pwd
23    UID_GID_SUPPORT = True
24except ImportError:
25    UID_GID_SUPPORT = False
26
27try:
28    import zipfile
29    ZIP_SUPPORT = True
30except ImportError:
31    ZIP_SUPPORT = find_executable('zip')
32
33try:
34    import zlib
35    ZLIB_SUPPORT = True
36except ImportError:
37    ZLIB_SUPPORT = False
38
39try:
40    import bz2
41except ImportError:
42    bz2 = None
43
44try:
45    import lzma
46except ImportError:
47    lzma = None
48
49def can_fs_encode(filename):
50    """
51    Return True if the filename can be saved in the file system.
52    """
53    if os.path.supports_unicode_filenames:
54        return True
55    try:
56        filename.encode(sys.getfilesystemencoding())
57    except UnicodeEncodeError:
58        return False
59    return True
60
61
62class ArchiveUtilTestCase(support.TempdirManager,
63                          support.LoggingSilencer,
64                          unittest.TestCase):
65
66    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
67    def test_make_tarball(self, name='archive'):
68        # creating something to tar
69        tmpdir = self._create_files()
70        self._make_tarball(tmpdir, name, '.tar.gz')
71        # trying an uncompressed one
72        self._make_tarball(tmpdir, name, '.tar', compress=None)
73
74    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
75    def test_make_tarball_gzip(self):
76        tmpdir = self._create_files()
77        self._make_tarball(tmpdir, 'archive', '.tar.gz', compress='gzip')
78
79    @unittest.skipUnless(bz2, 'Need bz2 support to run')
80    def test_make_tarball_bzip2(self):
81        tmpdir = self._create_files()
82        self._make_tarball(tmpdir, 'archive', '.tar.bz2', compress='bzip2')
83
84    @unittest.skipUnless(lzma, 'Need lzma support to run')
85    def test_make_tarball_xz(self):
86        tmpdir = self._create_files()
87        self._make_tarball(tmpdir, 'archive', '.tar.xz', compress='xz')
88
89    @unittest.skipUnless(can_fs_encode('årchiv'),
90        'File system cannot handle this filename')
91    def test_make_tarball_latin1(self):
92        """
93        Mirror test_make_tarball, except filename contains latin characters.
94        """
95        self.test_make_tarball('årchiv') # note this isn't a real word
96
97    @unittest.skipUnless(can_fs_encode('のアーカイブ'),
98        'File system cannot handle this filename')
99    def test_make_tarball_extended(self):
100        """
101        Mirror test_make_tarball, except filename contains extended
102        characters outside the latin charset.
103        """
104        self.test_make_tarball('のアーカイブ') # japanese for archive
105
106    def _make_tarball(self, tmpdir, target_name, suffix, **kwargs):
107        tmpdir2 = self.mkdtemp()
108        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
109                            "source and target should be on same drive")
110
111        base_name = os.path.join(tmpdir2, target_name)
112
113        # working with relative paths to avoid tar warnings
114        with change_cwd(tmpdir):
115            make_tarball(splitdrive(base_name)[1], 'dist', **kwargs)
116
117        # check if the compressed tarball was created
118        tarball = base_name + suffix
119        self.assertTrue(os.path.exists(tarball))
120        self.assertEqual(self._tarinfo(tarball), self._created_files)
121
122    def _tarinfo(self, path):
123        tar = tarfile.open(path)
124        try:
125            names = tar.getnames()
126            names.sort()
127            return names
128        finally:
129            tar.close()
130
131    _zip_created_files = ['dist/', 'dist/file1', 'dist/file2',
132                          'dist/sub/', 'dist/sub/file3', 'dist/sub2/']
133    _created_files = [p.rstrip('/') for p in _zip_created_files]
134
135    def _create_files(self):
136        # creating something to tar
137        tmpdir = self.mkdtemp()
138        dist = os.path.join(tmpdir, 'dist')
139        os.mkdir(dist)
140        self.write_file([dist, 'file1'], 'xxx')
141        self.write_file([dist, 'file2'], 'xxx')
142        os.mkdir(os.path.join(dist, 'sub'))
143        self.write_file([dist, 'sub', 'file3'], 'xxx')
144        os.mkdir(os.path.join(dist, 'sub2'))
145        return tmpdir
146
147    @unittest.skipUnless(find_executable('tar') and find_executable('gzip')
148                         and ZLIB_SUPPORT,
149                         'Need the tar, gzip and zlib command to run')
150    def test_tarfile_vs_tar(self):
151        tmpdir =  self._create_files()
152        tmpdir2 = self.mkdtemp()
153        base_name = os.path.join(tmpdir2, 'archive')
154        old_dir = os.getcwd()
155        os.chdir(tmpdir)
156        try:
157            make_tarball(base_name, 'dist')
158        finally:
159            os.chdir(old_dir)
160
161        # check if the compressed tarball was created
162        tarball = base_name + '.tar.gz'
163        self.assertTrue(os.path.exists(tarball))
164
165        # now create another tarball using `tar`
166        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
167        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
168        gzip_cmd = ['gzip', '-f', '-9', 'archive2.tar']
169        old_dir = os.getcwd()
170        os.chdir(tmpdir)
171        try:
172            spawn(tar_cmd)
173            spawn(gzip_cmd)
174        finally:
175            os.chdir(old_dir)
176
177        self.assertTrue(os.path.exists(tarball2))
178        # let's compare both tarballs
179        self.assertEqual(self._tarinfo(tarball), self._created_files)
180        self.assertEqual(self._tarinfo(tarball2), self._created_files)
181
182        # trying an uncompressed one
183        base_name = os.path.join(tmpdir2, 'archive')
184        old_dir = os.getcwd()
185        os.chdir(tmpdir)
186        try:
187            make_tarball(base_name, 'dist', compress=None)
188        finally:
189            os.chdir(old_dir)
190        tarball = base_name + '.tar'
191        self.assertTrue(os.path.exists(tarball))
192
193        # now for a dry_run
194        base_name = os.path.join(tmpdir2, 'archive')
195        old_dir = os.getcwd()
196        os.chdir(tmpdir)
197        try:
198            make_tarball(base_name, 'dist', compress=None, dry_run=True)
199        finally:
200            os.chdir(old_dir)
201        tarball = base_name + '.tar'
202        self.assertTrue(os.path.exists(tarball))
203
204    @unittest.skipUnless(find_executable('compress'),
205                         'The compress program is required')
206    def test_compress_deprecated(self):
207        tmpdir =  self._create_files()
208        base_name = os.path.join(self.mkdtemp(), 'archive')
209
210        # using compress and testing the PendingDeprecationWarning
211        old_dir = os.getcwd()
212        os.chdir(tmpdir)
213        try:
214            with check_warnings() as w:
215                warnings.simplefilter("always")
216                make_tarball(base_name, 'dist', compress='compress')
217        finally:
218            os.chdir(old_dir)
219        tarball = base_name + '.tar.Z'
220        self.assertTrue(os.path.exists(tarball))
221        self.assertEqual(len(w.warnings), 1)
222
223        # same test with dry_run
224        os.remove(tarball)
225        old_dir = os.getcwd()
226        os.chdir(tmpdir)
227        try:
228            with check_warnings() as w:
229                warnings.simplefilter("always")
230                make_tarball(base_name, 'dist', compress='compress',
231                             dry_run=True)
232        finally:
233            os.chdir(old_dir)
234        self.assertFalse(os.path.exists(tarball))
235        self.assertEqual(len(w.warnings), 1)
236
237    @unittest.skipUnless(ZIP_SUPPORT and ZLIB_SUPPORT,
238                         'Need zip and zlib support to run')
239    def test_make_zipfile(self):
240        # creating something to tar
241        tmpdir = self._create_files()
242        base_name = os.path.join(self.mkdtemp(), 'archive')
243        with change_cwd(tmpdir):
244            make_zipfile(base_name, 'dist')
245
246        # check if the compressed tarball was created
247        tarball = base_name + '.zip'
248        self.assertTrue(os.path.exists(tarball))
249        with zipfile.ZipFile(tarball) as zf:
250            self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
251
252    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
253    def test_make_zipfile_no_zlib(self):
254        patch(self, archive_util.zipfile, 'zlib', None)  # force zlib ImportError
255
256        called = []
257        zipfile_class = zipfile.ZipFile
258        def fake_zipfile(*a, **kw):
259            if kw.get('compression', None) == zipfile.ZIP_STORED:
260                called.append((a, kw))
261            return zipfile_class(*a, **kw)
262
263        patch(self, archive_util.zipfile, 'ZipFile', fake_zipfile)
264
265        # create something to tar and compress
266        tmpdir = self._create_files()
267        base_name = os.path.join(self.mkdtemp(), 'archive')
268        with change_cwd(tmpdir):
269            make_zipfile(base_name, 'dist')
270
271        tarball = base_name + '.zip'
272        self.assertEqual(called,
273                         [((tarball, "w"), {'compression': zipfile.ZIP_STORED})])
274        self.assertTrue(os.path.exists(tarball))
275        with zipfile.ZipFile(tarball) as zf:
276            self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
277
278    def test_check_archive_formats(self):
279        self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
280                         'xxx')
281        self.assertIsNone(check_archive_formats(['gztar', 'bztar', 'xztar',
282                                                 'ztar', 'tar', 'zip']))
283
284    def test_make_archive(self):
285        tmpdir = self.mkdtemp()
286        base_name = os.path.join(tmpdir, 'archive')
287        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
288
289    def test_make_archive_cwd(self):
290        current_dir = os.getcwd()
291        def _breaks(*args, **kw):
292            raise RuntimeError()
293        ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
294        try:
295            try:
296                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
297            except:
298                pass
299            self.assertEqual(os.getcwd(), current_dir)
300        finally:
301            del ARCHIVE_FORMATS['xxx']
302
303    def test_make_archive_tar(self):
304        base_dir =  self._create_files()
305        base_name = os.path.join(self.mkdtemp() , 'archive')
306        res = make_archive(base_name, 'tar', base_dir, 'dist')
307        self.assertTrue(os.path.exists(res))
308        self.assertEqual(os.path.basename(res), 'archive.tar')
309        self.assertEqual(self._tarinfo(res), self._created_files)
310
311    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
312    def test_make_archive_gztar(self):
313        base_dir =  self._create_files()
314        base_name = os.path.join(self.mkdtemp() , 'archive')
315        res = make_archive(base_name, 'gztar', base_dir, 'dist')
316        self.assertTrue(os.path.exists(res))
317        self.assertEqual(os.path.basename(res), 'archive.tar.gz')
318        self.assertEqual(self._tarinfo(res), self._created_files)
319
320    @unittest.skipUnless(bz2, 'Need bz2 support to run')
321    def test_make_archive_bztar(self):
322        base_dir =  self._create_files()
323        base_name = os.path.join(self.mkdtemp() , 'archive')
324        res = make_archive(base_name, 'bztar', base_dir, 'dist')
325        self.assertTrue(os.path.exists(res))
326        self.assertEqual(os.path.basename(res), 'archive.tar.bz2')
327        self.assertEqual(self._tarinfo(res), self._created_files)
328
329    @unittest.skipUnless(lzma, 'Need xz support to run')
330    def test_make_archive_xztar(self):
331        base_dir =  self._create_files()
332        base_name = os.path.join(self.mkdtemp() , 'archive')
333        res = make_archive(base_name, 'xztar', base_dir, 'dist')
334        self.assertTrue(os.path.exists(res))
335        self.assertEqual(os.path.basename(res), 'archive.tar.xz')
336        self.assertEqual(self._tarinfo(res), self._created_files)
337
338    def test_make_archive_owner_group(self):
339        # testing make_archive with owner and group, with various combinations
340        # this works even if there's not gid/uid support
341        if UID_GID_SUPPORT:
342            group = grp.getgrgid(0)[0]
343            owner = pwd.getpwuid(0)[0]
344        else:
345            group = owner = 'root'
346
347        base_dir =  self._create_files()
348        root_dir = self.mkdtemp()
349        base_name = os.path.join(self.mkdtemp() , 'archive')
350        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
351                           group=group)
352        self.assertTrue(os.path.exists(res))
353
354        res = make_archive(base_name, 'zip', root_dir, base_dir)
355        self.assertTrue(os.path.exists(res))
356
357        res = make_archive(base_name, 'tar', root_dir, base_dir,
358                           owner=owner, group=group)
359        self.assertTrue(os.path.exists(res))
360
361        res = make_archive(base_name, 'tar', root_dir, base_dir,
362                           owner='kjhkjhkjg', group='oihohoh')
363        self.assertTrue(os.path.exists(res))
364
365    @unittest.skipUnless(ZLIB_SUPPORT, "Requires zlib")
366    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
367    def test_tarfile_root_owner(self):
368        tmpdir =  self._create_files()
369        base_name = os.path.join(self.mkdtemp(), 'archive')
370        old_dir = os.getcwd()
371        os.chdir(tmpdir)
372        group = grp.getgrgid(0)[0]
373        owner = pwd.getpwuid(0)[0]
374        try:
375            archive_name = make_tarball(base_name, 'dist', compress=None,
376                                        owner=owner, group=group)
377        finally:
378            os.chdir(old_dir)
379
380        # check if the compressed tarball was created
381        self.assertTrue(os.path.exists(archive_name))
382
383        # now checks the rights
384        archive = tarfile.open(archive_name)
385        try:
386            for member in archive.getmembers():
387                self.assertEqual(member.uid, 0)
388                self.assertEqual(member.gid, 0)
389        finally:
390            archive.close()
391
392def test_suite():
393    return unittest.makeSuite(ArchiveUtilTestCase)
394
395if __name__ == "__main__":
396    run_unittest(test_suite())
397