# Copyright 2023 The Chromium Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Helper functions for dealing with .zip files.""" import os import pathlib import posixpath import stat import time import zipfile _FIXED_ZIP_HEADER_LEN = 30 def _set_alignment(zip_obj, zip_info, alignment): """Sets a ZipInfo's extra field such that the file will be aligned. Args: zip_obj: The ZipFile object that is being written. zip_info: The ZipInfo object about to be written. alignment: The amount of alignment (e.g. 4, or 4*1024). """ header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename) pos = zip_obj.fp.tell() + header_size padding_needed = (alignment - (pos % alignment)) % alignment # Python writes |extra| to both the local file header and the central # directory's file header. Android's zipalign tool writes only to the # local file header, so there is more overhead in using Python to align. zip_info.extra = b'\0' * padding_needed def _hermetic_date_time(timestamp=None): if not timestamp: return (2001, 1, 1, 0, 0, 0) utc_time = time.gmtime(timestamp) return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour, utc_time.tm_min, utc_time.tm_sec) def add_to_zip_hermetic(zip_file, zip_path, *, src_path=None, data=None, compress=None, alignment=None, timestamp=None): """Adds a file to the given ZipFile with a hard-coded modified time. Args: zip_file: ZipFile instance to add the file to. zip_path: Destination path within the zip file (or ZipInfo instance). src_path: Path of the source file. Mutually exclusive with |data|. data: File data as a string. compress: Whether to enable compression. Default is taken from ZipFile constructor. alignment: If set, align the data of the entry to this many bytes. timestamp: The last modification date and time for the archive member. """ assert (src_path is None) != (data is None), ( '|src_path| and |data| are mutually exclusive.') if isinstance(zip_path, zipfile.ZipInfo): zipinfo = zip_path zip_path = zipinfo.filename else: zipinfo = zipfile.ZipInfo(filename=zip_path) zipinfo.external_attr = 0o644 << 16 zipinfo.date_time = _hermetic_date_time(timestamp) if alignment: _set_alignment(zip_file, zipinfo, alignment) # Filenames can contain backslashes, but it is more likely that we've # forgotten to use forward slashes as a directory separator. assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path assert posixpath.normpath(zip_path) == zip_path, ( f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}') assert zip_path not in zip_file.namelist(), ( 'Tried to add a duplicate zip entry: ' + zip_path) if src_path and os.path.islink(src_path): zipinfo.external_attr |= stat.S_IFLNK << 16 # mark as a symlink zip_file.writestr(zipinfo, os.readlink(src_path)) return # Maintain the executable bit. if src_path: st = os.stat(src_path) for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH): if st.st_mode & mode: zipinfo.external_attr |= mode << 16 if src_path: with open(src_path, 'rb') as f: data = f.read() # zipfile will deflate even when it makes the file bigger. To avoid # growing files, disable compression at an arbitrary cut off point. if len(data) < 16: compress = False # None converts to ZIP_STORED, when passed explicitly rather than the # default passed to the ZipFile constructor. compress_type = zip_file.compression if compress is not None: compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED zip_file.writestr(zipinfo, data, compress_type) def add_files_to_zip(inputs, output, *, base_dir=None, compress=None, zip_prefix_path=None, timestamp=None): """Creates a zip file from a list of files. Args: inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples. output: Path, fileobj, or ZipFile instance to add files to. base_dir: Prefix to strip from inputs. compress: Whether to compress zip_prefix_path: Path prepended to file path in zip file. timestamp: Unix timestamp to use for files in the archive. """ if base_dir is None: base_dir = '.' input_tuples = [] for tup in inputs: if isinstance(tup, str): src_path = tup zip_path = os.path.relpath(src_path, base_dir) # Zip files always use / as path separator. if os.path.sep != posixpath.sep: zip_path = str(pathlib.Path(zip_path).as_posix()) tup = (zip_path, src_path) input_tuples.append(tup) # Sort by zip path to ensure stable zip ordering. input_tuples.sort(key=lambda tup: tup[0]) out_zip = output if not isinstance(output, zipfile.ZipFile): out_zip = zipfile.ZipFile(output, 'w') try: for zip_path, fs_path in input_tuples: if zip_prefix_path: zip_path = posixpath.join(zip_prefix_path, zip_path) add_to_zip_hermetic(out_zip, zip_path, src_path=fs_path, compress=compress, timestamp=timestamp) finally: if output is not out_zip: out_zip.close() def zip_directory(output, base_dir, **kwargs): """Zips all files in the given directory.""" inputs = [] for root, _, files in os.walk(base_dir): for f in files: inputs.append(os.path.join(root, f)) add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs) def merge_zips(output, input_zips, path_transform=None, compress=None): """Combines all files from |input_zips| into |output|. Args: output: Path, fileobj, or ZipFile instance to add files to. input_zips: Iterable of paths to zip files to merge. path_transform: Called for each entry path. Returns a new path, or None to skip the file. compress: Overrides compression setting from origin zip entries. """ assert not isinstance(input_zips, str) # Easy mistake to make. if isinstance(output, zipfile.ZipFile): out_zip = output out_filename = output.filename else: assert isinstance(output, str), 'Was: ' + repr(output) out_zip = zipfile.ZipFile(output, 'w') out_filename = output # Include paths in the existing zip here to avoid adding duplicate files. crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()} try: for in_file in input_zips: with zipfile.ZipFile(in_file, 'r') as in_zip: for info in in_zip.infolist(): # Ignore directories. if info.filename[-1] == '/': continue if path_transform: dst_name = path_transform(info.filename) if dst_name is None: continue else: dst_name = info.filename data = in_zip.read(info) # If there's a duplicate file, ensure contents is the same and skip # adding it multiple times. if dst_name in crc_by_name: orig_filename, orig_crc = crc_by_name[dst_name] new_crc = zipfile.crc32(data) if new_crc == orig_crc: continue msg = f"""File appeared in multiple inputs with differing contents. File: {dst_name} Input1: {orig_filename} Input2: {in_file}""" raise Exception(msg) if compress is not None: compress_entry = compress else: compress_entry = info.compress_type != zipfile.ZIP_STORED add_to_zip_hermetic(out_zip, dst_name, data=data, compress=compress_entry) crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC) finally: if output is not out_zip: out_zip.close()