• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Unittests for parsing files in zip64 format"""
19
20import os
21import subprocess
22import tempfile
23import unittest
24import zipfile
25import time
26
27class Zip64Test(unittest.TestCase):
28  @staticmethod
29  def _WriteFile(path, size_in_kib):
30    contents = b'X' * 1024
31    with open(path, 'wb') as f:
32      for i in range(size_in_kib):
33        f.write(contents)
34
35  @staticmethod
36  def _AddEntriesToZip(output_zip, entries_dict=None):
37    contents = b'X' * 1024
38    for name, size in entries_dict.items():
39      # Need to pass a ZipInfo with a file_size
40      # to .open() so that it adds the Zip64 header
41      # on larger files
42      info = zipfile.ZipInfo(name)
43      info.file_size = size * 1024
44      with output_zip.open(info, mode='w') as f:
45        for i in range(size):
46          f.write(contents)
47
48  def _getEntryNames(self, zip_name):
49    cmd = ['ziptool', 'zipinfo', '-1', zip_name]
50    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
51    output, _ = proc.communicate()
52    self.assertEqual(0, proc.returncode)
53    self.assertNotEqual(None, output)
54    return output.decode('utf-8').split()
55
56  def _ExtractEntries(self, zip_name):
57    with tempfile.TemporaryDirectory() as temp_dir:
58      cmd = ['ziptool', 'unzip', '-d', temp_dir, zip_name]
59      proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
60      proc.communicate()
61      self.assertEqual(0, proc.returncode)
62
63  def test_entriesSmallerThan2G(self):
64    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
65      # Add a few entries with each of them smaller than 2GiB. But the entire zip file is larger
66      # than 4GiB in size.
67      with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
68        entry_dict = {'a.txt': 1025 * 1024, 'b.txt': 1025 * 1024, 'c.txt': 1025 * 1024,
69                      'd.txt': 1025 * 1024, 'e.txt': 1024}
70        self._AddEntriesToZip(output_zip, entry_dict)
71
72      read_names = self._getEntryNames(zip_path.name)
73      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
74      self._ExtractEntries(zip_path.name)
75
76
77  def test_largeNumberOfEntries(self):
78    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
79      entry_dict = {}
80      # Add 100k entries (more than 65535|UINT16_MAX).
81      for num in range(0, 100 * 1024):
82        entry_dict[str(num)] = 50
83
84      with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
85        self._AddEntriesToZip(output_zip, entry_dict)
86
87      read_names = self._getEntryNames(zip_path.name)
88      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
89      self._ExtractEntries(zip_path.name)
90
91
92  def test_largeCompressedEntriesSmallerThan4G(self):
93    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
94      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
95                           allowZip64=True) as output_zip:
96        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
97        # sizes in the extra field. Test if our ziptool should be able to parse it.
98        entry_dict = {'e.txt': 4095 * 1024, 'f.txt': 4095 * 1024}
99        self._AddEntriesToZip(output_zip, entry_dict)
100
101      read_names = self._getEntryNames(zip_path.name)
102      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
103      self._ExtractEntries(zip_path.name)
104
105
106  def test_forceDataDescriptor(self):
107    with tempfile.NamedTemporaryFile(suffix='.txt') as file_path:
108      self._WriteFile(file_path.name, 5000 * 1024)
109
110      with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
111        with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
112          pass
113        # The fd option force writes a data descriptor
114        cmd = ['zip', '-fd', zip_path.name, file_path.name]
115        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
116        proc.communicate()
117        read_names = self._getEntryNames(zip_path.name)
118        self.assertEqual([file_path.name[1:]], read_names)
119        self._ExtractEntries(zip_path.name)
120
121
122  def test_largeUncompressedEntriesLargerThan4G(self):
123    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
124      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED,
125                           allowZip64=True) as output_zip:
126        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
127        # sizes in the extra field. Test if our ziptool should be able to parse it.
128        entry_dict = {'g.txt': 5000 * 1024, 'h.txt': 6000 * 1024}
129        self._AddEntriesToZip(output_zip, entry_dict)
130
131      read_names = self._getEntryNames(zip_path.name)
132      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
133      self._ExtractEntries(zip_path.name)
134
135
136  def test_largeCompressedEntriesLargerThan4G(self):
137    with tempfile.NamedTemporaryFile(suffix='.zip') as zip_path:
138      with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
139                           allowZip64=True) as output_zip:
140        # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
141        # sizes in the extra field. Test if our ziptool should be able to parse it.
142        entry_dict = {'i.txt': 4096 * 1024, 'j.txt': 7000 * 1024}
143        self._AddEntriesToZip(output_zip, entry_dict)
144
145      read_names = self._getEntryNames(zip_path.name)
146      self.assertEqual(sorted(entry_dict.keys()), sorted(read_names))
147      self._ExtractEntries(zip_path.name)
148
149
150if __name__ == '__main__':
151  unittest.main(verbosity=2)
152