• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Wheels support."""
2
3import email
4import itertools
5import os
6import posixpath
7import re
8import zipfile
9import contextlib
10
11from distutils.util import get_platform
12
13import pkg_resources
14import setuptools
15from pkg_resources import parse_version
16from setuptools.extern.packaging.tags import sys_tags
17from setuptools.extern.packaging.utils import canonicalize_name
18from setuptools.command.egg_info import write_requirements
19from setuptools.archive_util import _unpack_zipfile_obj
20
21
22WHEEL_NAME = re.compile(
23    r"""^(?P<project_name>.+?)-(?P<version>\d.*?)
24    ((-(?P<build>\d.*?))?-(?P<py_version>.+?)-(?P<abi>.+?)-(?P<platform>.+?)
25    )\.whl$""",
26    re.VERBOSE).match
27
28NAMESPACE_PACKAGE_INIT = \
29    "__import__('pkg_resources').declare_namespace(__name__)\n"
30
31
32def unpack(src_dir, dst_dir):
33    '''Move everything under `src_dir` to `dst_dir`, and delete the former.'''
34    for dirpath, dirnames, filenames in os.walk(src_dir):
35        subdir = os.path.relpath(dirpath, src_dir)
36        for f in filenames:
37            src = os.path.join(dirpath, f)
38            dst = os.path.join(dst_dir, subdir, f)
39            os.renames(src, dst)
40        for n, d in reversed(list(enumerate(dirnames))):
41            src = os.path.join(dirpath, d)
42            dst = os.path.join(dst_dir, subdir, d)
43            if not os.path.exists(dst):
44                # Directory does not exist in destination,
45                # rename it and prune it from os.walk list.
46                os.renames(src, dst)
47                del dirnames[n]
48    # Cleanup.
49    for dirpath, dirnames, filenames in os.walk(src_dir, topdown=True):
50        assert not filenames
51        os.rmdir(dirpath)
52
53
54@contextlib.contextmanager
55def disable_info_traces():
56    """
57    Temporarily disable info traces.
58    """
59    from distutils import log
60    saved = log.set_threshold(log.WARN)
61    try:
62        yield
63    finally:
64        log.set_threshold(saved)
65
66
67class Wheel:
68
69    def __init__(self, filename):
70        match = WHEEL_NAME(os.path.basename(filename))
71        if match is None:
72            raise ValueError('invalid wheel name: %r' % filename)
73        self.filename = filename
74        for k, v in match.groupdict().items():
75            setattr(self, k, v)
76
77    def tags(self):
78        '''List tags (py_version, abi, platform) supported by this wheel.'''
79        return itertools.product(
80            self.py_version.split('.'),
81            self.abi.split('.'),
82            self.platform.split('.'),
83        )
84
85    def is_compatible(self):
86        '''Is the wheel is compatible with the current platform?'''
87        supported_tags = set(
88            (t.interpreter, t.abi, t.platform) for t in sys_tags())
89        return next((True for t in self.tags() if t in supported_tags), False)
90
91    def egg_name(self):
92        return pkg_resources.Distribution(
93            project_name=self.project_name, version=self.version,
94            platform=(None if self.platform == 'any' else get_platform()),
95        ).egg_name() + '.egg'
96
97    def get_dist_info(self, zf):
98        # find the correct name of the .dist-info dir in the wheel file
99        for member in zf.namelist():
100            dirname = posixpath.dirname(member)
101            if (dirname.endswith('.dist-info') and
102                    canonicalize_name(dirname).startswith(
103                        canonicalize_name(self.project_name))):
104                return dirname
105        raise ValueError("unsupported wheel format. .dist-info not found")
106
107    def install_as_egg(self, destination_eggdir):
108        '''Install wheel as an egg directory.'''
109        with zipfile.ZipFile(self.filename) as zf:
110            self._install_as_egg(destination_eggdir, zf)
111
112    def _install_as_egg(self, destination_eggdir, zf):
113        dist_basename = '%s-%s' % (self.project_name, self.version)
114        dist_info = self.get_dist_info(zf)
115        dist_data = '%s.data' % dist_basename
116        egg_info = os.path.join(destination_eggdir, 'EGG-INFO')
117
118        self._convert_metadata(zf, destination_eggdir, dist_info, egg_info)
119        self._move_data_entries(destination_eggdir, dist_data)
120        self._fix_namespace_packages(egg_info, destination_eggdir)
121
122    @staticmethod
123    def _convert_metadata(zf, destination_eggdir, dist_info, egg_info):
124        def get_metadata(name):
125            with zf.open(posixpath.join(dist_info, name)) as fp:
126                value = fp.read().decode('utf-8')
127                return email.parser.Parser().parsestr(value)
128
129        wheel_metadata = get_metadata('WHEEL')
130        # Check wheel format version is supported.
131        wheel_version = parse_version(wheel_metadata.get('Wheel-Version'))
132        wheel_v1 = (
133            parse_version('1.0') <= wheel_version < parse_version('2.0dev0')
134        )
135        if not wheel_v1:
136            raise ValueError(
137                'unsupported wheel format version: %s' % wheel_version)
138        # Extract to target directory.
139        _unpack_zipfile_obj(zf, destination_eggdir)
140        # Convert metadata.
141        dist_info = os.path.join(destination_eggdir, dist_info)
142        dist = pkg_resources.Distribution.from_location(
143            destination_eggdir, dist_info,
144            metadata=pkg_resources.PathMetadata(destination_eggdir, dist_info),
145        )
146
147        # Note: Evaluate and strip markers now,
148        # as it's difficult to convert back from the syntax:
149        # foobar; "linux" in sys_platform and extra == 'test'
150        def raw_req(req):
151            req.marker = None
152            return str(req)
153        install_requires = list(map(raw_req, dist.requires()))
154        extras_require = {
155            extra: [
156                req
157                for req in map(raw_req, dist.requires((extra,)))
158                if req not in install_requires
159            ]
160            for extra in dist.extras
161        }
162        os.rename(dist_info, egg_info)
163        os.rename(
164            os.path.join(egg_info, 'METADATA'),
165            os.path.join(egg_info, 'PKG-INFO'),
166        )
167        setup_dist = setuptools.Distribution(
168            attrs=dict(
169                install_requires=install_requires,
170                extras_require=extras_require,
171            ),
172        )
173        with disable_info_traces():
174            write_requirements(
175                setup_dist.get_command_obj('egg_info'),
176                None,
177                os.path.join(egg_info, 'requires.txt'),
178            )
179
180    @staticmethod
181    def _move_data_entries(destination_eggdir, dist_data):
182        """Move data entries to their correct location."""
183        dist_data = os.path.join(destination_eggdir, dist_data)
184        dist_data_scripts = os.path.join(dist_data, 'scripts')
185        if os.path.exists(dist_data_scripts):
186            egg_info_scripts = os.path.join(
187                destination_eggdir, 'EGG-INFO', 'scripts')
188            os.mkdir(egg_info_scripts)
189            for entry in os.listdir(dist_data_scripts):
190                # Remove bytecode, as it's not properly handled
191                # during easy_install scripts install phase.
192                if entry.endswith('.pyc'):
193                    os.unlink(os.path.join(dist_data_scripts, entry))
194                else:
195                    os.rename(
196                        os.path.join(dist_data_scripts, entry),
197                        os.path.join(egg_info_scripts, entry),
198                    )
199            os.rmdir(dist_data_scripts)
200        for subdir in filter(os.path.exists, (
201            os.path.join(dist_data, d)
202            for d in ('data', 'headers', 'purelib', 'platlib')
203        )):
204            unpack(subdir, destination_eggdir)
205        if os.path.exists(dist_data):
206            os.rmdir(dist_data)
207
208    @staticmethod
209    def _fix_namespace_packages(egg_info, destination_eggdir):
210        namespace_packages = os.path.join(
211            egg_info, 'namespace_packages.txt')
212        if os.path.exists(namespace_packages):
213            with open(namespace_packages) as fp:
214                namespace_packages = fp.read().split()
215            for mod in namespace_packages:
216                mod_dir = os.path.join(destination_eggdir, *mod.split('.'))
217                mod_init = os.path.join(mod_dir, '__init__.py')
218                if not os.path.exists(mod_dir):
219                    os.mkdir(mod_dir)
220                if not os.path.exists(mod_init):
221                    with open(mod_init, 'w') as fp:
222                        fp.write(NAMESPACE_PACKAGE_INIT)
223