• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2"""Tests for distutils.archive_util."""
3import unittest
4import os
5import sys
6import tarfile
7from os.path import splitdrive
8import warnings
9
10from distutils import archive_util
11from distutils.archive_util import (check_archive_formats, make_tarball,
12                                    make_zipfile, make_archive,
13                                    ARCHIVE_FORMATS)
14from distutils.spawn import find_executable, spawn
15from distutils.tests import support
16from test.support import check_warnings, run_unittest, patch, change_cwd
17
18try:
19    import grp
20    import pwd
21    UID_GID_SUPPORT = True
22except ImportError:
23    UID_GID_SUPPORT = False
24
25try:
26    import zipfile
27    ZIP_SUPPORT = True
28except ImportError:
29    ZIP_SUPPORT = find_executable('zip')
30
31try:
32    import zlib
33    ZLIB_SUPPORT = True
34except ImportError:
35    ZLIB_SUPPORT = False
36
37try:
38    import bz2
39except ImportError:
40    bz2 = None
41
42try:
43    import lzma
44except ImportError:
45    lzma = None
46
47def can_fs_encode(filename):
48    """
49    Return True if the filename can be saved in the file system.
50    """
51    if os.path.supports_unicode_filenames:
52        return True
53    try:
54        filename.encode(sys.getfilesystemencoding())
55    except UnicodeEncodeError:
56        return False
57    return True
58
59
60class ArchiveUtilTestCase(support.TempdirManager,
61                          support.LoggingSilencer,
62                          unittest.TestCase):
63
64    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
65    def test_make_tarball(self, name='archive'):
66        # creating something to tar
67        tmpdir = self._create_files()
68        self._make_tarball(tmpdir, name, '.tar.gz')
69        # trying an uncompressed one
70        self._make_tarball(tmpdir, name, '.tar', compress=None)
71
72    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
73    def test_make_tarball_gzip(self):
74        tmpdir = self._create_files()
75        self._make_tarball(tmpdir, 'archive', '.tar.gz', compress='gzip')
76
77    @unittest.skipUnless(bz2, 'Need bz2 support to run')
78    def test_make_tarball_bzip2(self):
79        tmpdir = self._create_files()
80        self._make_tarball(tmpdir, 'archive', '.tar.bz2', compress='bzip2')
81
82    @unittest.skipUnless(lzma, 'Need lzma support to run')
83    def test_make_tarball_xz(self):
84        tmpdir = self._create_files()
85        self._make_tarball(tmpdir, 'archive', '.tar.xz', compress='xz')
86
87    @unittest.skipUnless(can_fs_encode('årchiv'),
88        'File system cannot handle this filename')
89    def test_make_tarball_latin1(self):
90        """
91        Mirror test_make_tarball, except filename contains latin characters.
92        """
93        self.test_make_tarball('årchiv') # note this isn't a real word
94
95    @unittest.skipUnless(can_fs_encode('のアーカイブ'),
96        'File system cannot handle this filename')
97    def test_make_tarball_extended(self):
98        """
99        Mirror test_make_tarball, except filename contains extended
100        characters outside the latin charset.
101        """
102        self.test_make_tarball('のアーカイブ') # japanese for archive
103
104    def _make_tarball(self, tmpdir, target_name, suffix, **kwargs):
105        tmpdir2 = self.mkdtemp()
106        unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
107                            "source and target should be on same drive")
108
109        base_name = os.path.join(tmpdir2, target_name)
110
111        # working with relative paths to avoid tar warnings
112        with change_cwd(tmpdir):
113            make_tarball(splitdrive(base_name)[1], 'dist', **kwargs)
114
115        # check if the compressed tarball was created
116        tarball = base_name + suffix
117        self.assertTrue(os.path.exists(tarball))
118        self.assertEqual(self._tarinfo(tarball), self._created_files)
119
120    def _tarinfo(self, path):
121        tar = tarfile.open(path)
122        try:
123            names = tar.getnames()
124            names.sort()
125            return names
126        finally:
127            tar.close()
128
129    _zip_created_files = ['dist/', 'dist/file1', 'dist/file2',
130                          'dist/sub/', 'dist/sub/file3', 'dist/sub2/']
131    _created_files = [p.rstrip('/') for p in _zip_created_files]
132
133    def _create_files(self):
134        # creating something to tar
135        tmpdir = self.mkdtemp()
136        dist = os.path.join(tmpdir, 'dist')
137        os.mkdir(dist)
138        self.write_file([dist, 'file1'], 'xxx')
139        self.write_file([dist, 'file2'], 'xxx')
140        os.mkdir(os.path.join(dist, 'sub'))
141        self.write_file([dist, 'sub', 'file3'], 'xxx')
142        os.mkdir(os.path.join(dist, 'sub2'))
143        return tmpdir
144
145    @unittest.skipUnless(find_executable('tar') and find_executable('gzip')
146                         and ZLIB_SUPPORT,
147                         'Need the tar, gzip and zlib command to run')
148    def test_tarfile_vs_tar(self):
149        tmpdir =  self._create_files()
150        tmpdir2 = self.mkdtemp()
151        base_name = os.path.join(tmpdir2, 'archive')
152        old_dir = os.getcwd()
153        os.chdir(tmpdir)
154        try:
155            make_tarball(base_name, 'dist')
156        finally:
157            os.chdir(old_dir)
158
159        # check if the compressed tarball was created
160        tarball = base_name + '.tar.gz'
161        self.assertTrue(os.path.exists(tarball))
162
163        # now create another tarball using `tar`
164        tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
165        tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
166        gzip_cmd = ['gzip', '-f', '-9', 'archive2.tar']
167        old_dir = os.getcwd()
168        os.chdir(tmpdir)
169        try:
170            spawn(tar_cmd)
171            spawn(gzip_cmd)
172        finally:
173            os.chdir(old_dir)
174
175        self.assertTrue(os.path.exists(tarball2))
176        # let's compare both tarballs
177        self.assertEqual(self._tarinfo(tarball), self._created_files)
178        self.assertEqual(self._tarinfo(tarball2), self._created_files)
179
180        # trying an uncompressed one
181        base_name = os.path.join(tmpdir2, 'archive')
182        old_dir = os.getcwd()
183        os.chdir(tmpdir)
184        try:
185            make_tarball(base_name, 'dist', compress=None)
186        finally:
187            os.chdir(old_dir)
188        tarball = base_name + '.tar'
189        self.assertTrue(os.path.exists(tarball))
190
191        # now for a dry_run
192        base_name = os.path.join(tmpdir2, 'archive')
193        old_dir = os.getcwd()
194        os.chdir(tmpdir)
195        try:
196            make_tarball(base_name, 'dist', compress=None, dry_run=True)
197        finally:
198            os.chdir(old_dir)
199        tarball = base_name + '.tar'
200        self.assertTrue(os.path.exists(tarball))
201
202    @unittest.skipUnless(find_executable('compress'),
203                         'The compress program is required')
204    def test_compress_deprecated(self):
205        tmpdir =  self._create_files()
206        base_name = os.path.join(self.mkdtemp(), 'archive')
207
208        # using compress and testing the PendingDeprecationWarning
209        old_dir = os.getcwd()
210        os.chdir(tmpdir)
211        try:
212            with check_warnings() as w:
213                warnings.simplefilter("always")
214                make_tarball(base_name, 'dist', compress='compress')
215        finally:
216            os.chdir(old_dir)
217        tarball = base_name + '.tar.Z'
218        self.assertTrue(os.path.exists(tarball))
219        self.assertEqual(len(w.warnings), 1)
220
221        # same test with dry_run
222        os.remove(tarball)
223        old_dir = os.getcwd()
224        os.chdir(tmpdir)
225        try:
226            with check_warnings() as w:
227                warnings.simplefilter("always")
228                make_tarball(base_name, 'dist', compress='compress',
229                             dry_run=True)
230        finally:
231            os.chdir(old_dir)
232        self.assertFalse(os.path.exists(tarball))
233        self.assertEqual(len(w.warnings), 1)
234
235    @unittest.skipUnless(ZIP_SUPPORT and ZLIB_SUPPORT,
236                         'Need zip and zlib support to run')
237    def test_make_zipfile(self):
238        # creating something to tar
239        tmpdir = self._create_files()
240        base_name = os.path.join(self.mkdtemp(), 'archive')
241        with change_cwd(tmpdir):
242            make_zipfile(base_name, 'dist')
243
244        # check if the compressed tarball was created
245        tarball = base_name + '.zip'
246        self.assertTrue(os.path.exists(tarball))
247        with zipfile.ZipFile(tarball) as zf:
248            self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
249
250    @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
251    def test_make_zipfile_no_zlib(self):
252        patch(self, archive_util.zipfile, 'zlib', None)  # force zlib ImportError
253
254        called = []
255        zipfile_class = zipfile.ZipFile
256        def fake_zipfile(*a, **kw):
257            if kw.get('compression', None) == zipfile.ZIP_STORED:
258                called.append((a, kw))
259            return zipfile_class(*a, **kw)
260
261        patch(self, archive_util.zipfile, 'ZipFile', fake_zipfile)
262
263        # create something to tar and compress
264        tmpdir = self._create_files()
265        base_name = os.path.join(self.mkdtemp(), 'archive')
266        with change_cwd(tmpdir):
267            make_zipfile(base_name, 'dist')
268
269        tarball = base_name + '.zip'
270        self.assertEqual(called,
271                         [((tarball, "w"), {'compression': zipfile.ZIP_STORED})])
272        self.assertTrue(os.path.exists(tarball))
273        with zipfile.ZipFile(tarball) as zf:
274            self.assertEqual(sorted(zf.namelist()), self._zip_created_files)
275
276    def test_check_archive_formats(self):
277        self.assertEqual(check_archive_formats(['gztar', 'xxx', 'zip']),
278                         'xxx')
279        self.assertIsNone(check_archive_formats(['gztar', 'bztar', 'xztar',
280                                                 'ztar', 'tar', 'zip']))
281
282    def test_make_archive(self):
283        tmpdir = self.mkdtemp()
284        base_name = os.path.join(tmpdir, 'archive')
285        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
286
287    def test_make_archive_cwd(self):
288        current_dir = os.getcwd()
289        def _breaks(*args, **kw):
290            raise RuntimeError()
291        ARCHIVE_FORMATS['xxx'] = (_breaks, [], 'xxx file')
292        try:
293            try:
294                make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
295            except:
296                pass
297            self.assertEqual(os.getcwd(), current_dir)
298        finally:
299            del ARCHIVE_FORMATS['xxx']
300
301    def test_make_archive_tar(self):
302        base_dir =  self._create_files()
303        base_name = os.path.join(self.mkdtemp() , 'archive')
304        res = make_archive(base_name, 'tar', base_dir, 'dist')
305        self.assertTrue(os.path.exists(res))
306        self.assertEqual(os.path.basename(res), 'archive.tar')
307        self.assertEqual(self._tarinfo(res), self._created_files)
308
309    @unittest.skipUnless(ZLIB_SUPPORT, 'Need zlib support to run')
310    def test_make_archive_gztar(self):
311        base_dir =  self._create_files()
312        base_name = os.path.join(self.mkdtemp() , 'archive')
313        res = make_archive(base_name, 'gztar', base_dir, 'dist')
314        self.assertTrue(os.path.exists(res))
315        self.assertEqual(os.path.basename(res), 'archive.tar.gz')
316        self.assertEqual(self._tarinfo(res), self._created_files)
317
318    @unittest.skipUnless(bz2, 'Need bz2 support to run')
319    def test_make_archive_bztar(self):
320        base_dir =  self._create_files()
321        base_name = os.path.join(self.mkdtemp() , 'archive')
322        res = make_archive(base_name, 'bztar', base_dir, 'dist')
323        self.assertTrue(os.path.exists(res))
324        self.assertEqual(os.path.basename(res), 'archive.tar.bz2')
325        self.assertEqual(self._tarinfo(res), self._created_files)
326
327    @unittest.skipUnless(lzma, 'Need xz support to run')
328    def test_make_archive_xztar(self):
329        base_dir =  self._create_files()
330        base_name = os.path.join(self.mkdtemp() , 'archive')
331        res = make_archive(base_name, 'xztar', base_dir, 'dist')
332        self.assertTrue(os.path.exists(res))
333        self.assertEqual(os.path.basename(res), 'archive.tar.xz')
334        self.assertEqual(self._tarinfo(res), self._created_files)
335
336    def test_make_archive_owner_group(self):
337        # testing make_archive with owner and group, with various combinations
338        # this works even if there's not gid/uid support
339        if UID_GID_SUPPORT:
340            group = grp.getgrgid(0)[0]
341            owner = pwd.getpwuid(0)[0]
342        else:
343            group = owner = 'root'
344
345        base_dir =  self._create_files()
346        root_dir = self.mkdtemp()
347        base_name = os.path.join(self.mkdtemp() , 'archive')
348        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
349                           group=group)
350        self.assertTrue(os.path.exists(res))
351
352        res = make_archive(base_name, 'zip', root_dir, base_dir)
353        self.assertTrue(os.path.exists(res))
354
355        res = make_archive(base_name, 'tar', root_dir, base_dir,
356                           owner=owner, group=group)
357        self.assertTrue(os.path.exists(res))
358
359        res = make_archive(base_name, 'tar', root_dir, base_dir,
360                           owner='kjhkjhkjg', group='oihohoh')
361        self.assertTrue(os.path.exists(res))
362
363    @unittest.skipUnless(ZLIB_SUPPORT, "Requires zlib")
364    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
365    def test_tarfile_root_owner(self):
366        tmpdir =  self._create_files()
367        base_name = os.path.join(self.mkdtemp(), 'archive')
368        old_dir = os.getcwd()
369        os.chdir(tmpdir)
370        group = grp.getgrgid(0)[0]
371        owner = pwd.getpwuid(0)[0]
372        try:
373            archive_name = make_tarball(base_name, 'dist', compress=None,
374                                        owner=owner, group=group)
375        finally:
376            os.chdir(old_dir)
377
378        # check if the compressed tarball was created
379        self.assertTrue(os.path.exists(archive_name))
380
381        # now checks the rights
382        archive = tarfile.open(archive_name)
383        try:
384            for member in archive.getmembers():
385                self.assertEqual(member.uid, 0)
386                self.assertEqual(member.gid, 0)
387        finally:
388            archive.close()
389
390def test_suite():
391    return unittest.makeSuite(ArchiveUtilTestCase)
392
393if __name__ == "__main__":
394    run_unittest(test_suite())
395