• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Wheels support."""
2
3from distutils.util import get_platform
4from distutils import log
5import email
6import itertools
7import os
8import posixpath
9import re
10import zipfile
11
12import pkg_resources
13import setuptools
14from pkg_resources import parse_version
15from setuptools.extern.packaging.tags import sys_tags
16from setuptools.extern.packaging.utils import canonicalize_name
17from setuptools.command.egg_info import write_requirements
18
19
20WHEEL_NAME = re.compile(
21    r"""^(?P<project_name>.+?)-(?P<version>\d.*?)
22    ((-(?P<build>\d.*?))?-(?P<py_version>.+?)-(?P<abi>.+?)-(?P<platform>.+?)
23    )\.whl$""",
24    re.VERBOSE).match
25
26NAMESPACE_PACKAGE_INIT = \
27    "__import__('pkg_resources').declare_namespace(__name__)\n"
28
29
30def unpack(src_dir, dst_dir):
31    '''Move everything under `src_dir` to `dst_dir`, and delete the former.'''
32    for dirpath, dirnames, filenames in os.walk(src_dir):
33        subdir = os.path.relpath(dirpath, src_dir)
34        for f in filenames:
35            src = os.path.join(dirpath, f)
36            dst = os.path.join(dst_dir, subdir, f)
37            os.renames(src, dst)
38        for n, d in reversed(list(enumerate(dirnames))):
39            src = os.path.join(dirpath, d)
40            dst = os.path.join(dst_dir, subdir, d)
41            if not os.path.exists(dst):
42                # Directory does not exist in destination,
43                # rename it and prune it from os.walk list.
44                os.renames(src, dst)
45                del dirnames[n]
46    # Cleanup.
47    for dirpath, dirnames, filenames in os.walk(src_dir, topdown=True):
48        assert not filenames
49        os.rmdir(dirpath)
50
51
52class Wheel:
53
54    def __init__(self, filename):
55        match = WHEEL_NAME(os.path.basename(filename))
56        if match is None:
57            raise ValueError('invalid wheel name: %r' % filename)
58        self.filename = filename
59        for k, v in match.groupdict().items():
60            setattr(self, k, v)
61
62    def tags(self):
63        '''List tags (py_version, abi, platform) supported by this wheel.'''
64        return itertools.product(
65            self.py_version.split('.'),
66            self.abi.split('.'),
67            self.platform.split('.'),
68        )
69
70    def is_compatible(self):
71        '''Is the wheel is compatible with the current platform?'''
72        supported_tags = set(
73            (t.interpreter, t.abi, t.platform) for t in sys_tags())
74        return next((True for t in self.tags() if t in supported_tags), False)
75
76    def egg_name(self):
77        return pkg_resources.Distribution(
78            project_name=self.project_name, version=self.version,
79            platform=(None if self.platform == 'any' else get_platform()),
80        ).egg_name() + '.egg'
81
82    def get_dist_info(self, zf):
83        # find the correct name of the .dist-info dir in the wheel file
84        for member in zf.namelist():
85            dirname = posixpath.dirname(member)
86            if (dirname.endswith('.dist-info') and
87                    canonicalize_name(dirname).startswith(
88                        canonicalize_name(self.project_name))):
89                return dirname
90        raise ValueError("unsupported wheel format. .dist-info not found")
91
92    def install_as_egg(self, destination_eggdir):
93        '''Install wheel as an egg directory.'''
94        with zipfile.ZipFile(self.filename) as zf:
95            self._install_as_egg(destination_eggdir, zf)
96
97    def _install_as_egg(self, destination_eggdir, zf):
98        dist_basename = '%s-%s' % (self.project_name, self.version)
99        dist_info = self.get_dist_info(zf)
100        dist_data = '%s.data' % dist_basename
101        egg_info = os.path.join(destination_eggdir, 'EGG-INFO')
102
103        self._convert_metadata(zf, destination_eggdir, dist_info, egg_info)
104        self._move_data_entries(destination_eggdir, dist_data)
105        self._fix_namespace_packages(egg_info, destination_eggdir)
106
107    @staticmethod
108    def _convert_metadata(zf, destination_eggdir, dist_info, egg_info):
109        def get_metadata(name):
110            with zf.open(posixpath.join(dist_info, name)) as fp:
111                value = fp.read().decode('utf-8')
112                return email.parser.Parser().parsestr(value)
113
114        wheel_metadata = get_metadata('WHEEL')
115        # Check wheel format version is supported.
116        wheel_version = parse_version(wheel_metadata.get('Wheel-Version'))
117        wheel_v1 = (
118            parse_version('1.0') <= wheel_version < parse_version('2.0dev0')
119        )
120        if not wheel_v1:
121            raise ValueError(
122                'unsupported wheel format version: %s' % wheel_version)
123        # Extract to target directory.
124        os.mkdir(destination_eggdir)
125        zf.extractall(destination_eggdir)
126        # Convert metadata.
127        dist_info = os.path.join(destination_eggdir, dist_info)
128        dist = pkg_resources.Distribution.from_location(
129            destination_eggdir, dist_info,
130            metadata=pkg_resources.PathMetadata(destination_eggdir, dist_info),
131        )
132
133        # Note: Evaluate and strip markers now,
134        # as it's difficult to convert back from the syntax:
135        # foobar; "linux" in sys_platform and extra == 'test'
136        def raw_req(req):
137            req.marker = None
138            return str(req)
139        install_requires = list(map(raw_req, dist.requires()))
140        extras_require = {
141            extra: [
142                req
143                for req in map(raw_req, dist.requires((extra,)))
144                if req not in install_requires
145            ]
146            for extra in dist.extras
147        }
148        os.rename(dist_info, egg_info)
149        os.rename(
150            os.path.join(egg_info, 'METADATA'),
151            os.path.join(egg_info, 'PKG-INFO'),
152        )
153        setup_dist = setuptools.Distribution(
154            attrs=dict(
155                install_requires=install_requires,
156                extras_require=extras_require,
157            ),
158        )
159        # Temporarily disable info traces.
160        log_threshold = log._global_log.threshold
161        log.set_threshold(log.WARN)
162        try:
163            write_requirements(
164                setup_dist.get_command_obj('egg_info'),
165                None,
166                os.path.join(egg_info, 'requires.txt'),
167            )
168        finally:
169            log.set_threshold(log_threshold)
170
171    @staticmethod
172    def _move_data_entries(destination_eggdir, dist_data):
173        """Move data entries to their correct location."""
174        dist_data = os.path.join(destination_eggdir, dist_data)
175        dist_data_scripts = os.path.join(dist_data, 'scripts')
176        if os.path.exists(dist_data_scripts):
177            egg_info_scripts = os.path.join(
178                destination_eggdir, 'EGG-INFO', 'scripts')
179            os.mkdir(egg_info_scripts)
180            for entry in os.listdir(dist_data_scripts):
181                # Remove bytecode, as it's not properly handled
182                # during easy_install scripts install phase.
183                if entry.endswith('.pyc'):
184                    os.unlink(os.path.join(dist_data_scripts, entry))
185                else:
186                    os.rename(
187                        os.path.join(dist_data_scripts, entry),
188                        os.path.join(egg_info_scripts, entry),
189                    )
190            os.rmdir(dist_data_scripts)
191        for subdir in filter(os.path.exists, (
192            os.path.join(dist_data, d)
193            for d in ('data', 'headers', 'purelib', 'platlib')
194        )):
195            unpack(subdir, destination_eggdir)
196        if os.path.exists(dist_data):
197            os.rmdir(dist_data)
198
199    @staticmethod
200    def _fix_namespace_packages(egg_info, destination_eggdir):
201        namespace_packages = os.path.join(
202            egg_info, 'namespace_packages.txt')
203        if os.path.exists(namespace_packages):
204            with open(namespace_packages) as fp:
205                namespace_packages = fp.read().split()
206            for mod in namespace_packages:
207                mod_dir = os.path.join(destination_eggdir, *mod.split('.'))
208                mod_init = os.path.join(mod_dir, '__init__.py')
209                if not os.path.exists(mod_dir):
210                    os.mkdir(mod_dir)
211                if not os.path.exists(mod_init):
212                    with open(mod_init, 'w') as fp:
213                        fp.write(NAMESPACE_PACKAGE_INIT)
214