• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import shutil
18import tempfile
19import time
20import unittest
21import zipfile
22
23import common
24import validate_target_files
25
26
27def random_string_with_holes(size, block_size, step_size):
28  data = ["\0"] * size
29  for begin in range(0, size, step_size):
30    end = begin + block_size
31    data[begin:end] = os.urandom(block_size)
32  return "".join(data)
33
34def get_2gb_string():
35  kilobytes = 1024
36  megabytes = 1024 * kilobytes
37  gigabytes = 1024 * megabytes
38
39  size = int(2 * gigabytes + 1)
40  block_size = 4 * kilobytes
41  step_size = 4 * megabytes
42  two_gb_string = random_string_with_holes(
43        size, block_size, step_size)
44  return two_gb_string
45
46
47class CommonZipTest(unittest.TestCase):
48  def _verify(self, zip_file, zip_file_name, arcname, contents,
49              test_file_name=None, expected_stat=None, expected_mode=0o644,
50              expected_compress_type=zipfile.ZIP_STORED):
51    # Verify the stat if present.
52    if test_file_name is not None:
53      new_stat = os.stat(test_file_name)
54      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
55      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
56
57    # Reopen the zip file to verify.
58    zip_file = zipfile.ZipFile(zip_file_name, "r")
59
60    # Verify the timestamp.
61    info = zip_file.getinfo(arcname)
62    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
63
64    # Verify the file mode.
65    mode = (info.external_attr >> 16) & 0o777
66    self.assertEqual(mode, expected_mode)
67
68    # Verify the compress type.
69    self.assertEqual(info.compress_type, expected_compress_type)
70
71    # Verify the zip contents.
72    self.assertEqual(zip_file.read(arcname), contents)
73    self.assertIsNone(zip_file.testzip())
74
75  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
76    extra_zipwrite_args = dict(extra_zipwrite_args or {})
77
78    test_file = tempfile.NamedTemporaryFile(delete=False)
79    test_file_name = test_file.name
80
81    zip_file = tempfile.NamedTemporaryFile(delete=False)
82    zip_file_name = zip_file.name
83
84    # File names within an archive strip the leading slash.
85    arcname = extra_zipwrite_args.get("arcname", test_file_name)
86    if arcname[0] == "/":
87      arcname = arcname[1:]
88
89    zip_file.close()
90    zip_file = zipfile.ZipFile(zip_file_name, "w")
91
92    try:
93      test_file.write(contents)
94      test_file.close()
95
96      expected_stat = os.stat(test_file_name)
97      expected_mode = extra_zipwrite_args.get("perms", 0o644)
98      expected_compress_type = extra_zipwrite_args.get("compress_type",
99                                                       zipfile.ZIP_STORED)
100      time.sleep(5)  # Make sure the atime/mtime will change measurably.
101
102      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
103      common.ZipClose(zip_file)
104
105      self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
106                   expected_stat, expected_mode, expected_compress_type)
107    finally:
108      os.remove(test_file_name)
109      os.remove(zip_file_name)
110
111  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
112    extra_args = dict(extra_args or {})
113
114    zip_file = tempfile.NamedTemporaryFile(delete=False)
115    zip_file_name = zip_file.name
116    zip_file.close()
117
118    zip_file = zipfile.ZipFile(zip_file_name, "w")
119
120    try:
121      expected_compress_type = extra_args.get("compress_type",
122                                              zipfile.ZIP_STORED)
123      time.sleep(5)  # Make sure the atime/mtime will change measurably.
124
125      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
126        arcname = zinfo_or_arcname
127        expected_mode = extra_args.get("perms", 0o644)
128      else:
129        arcname = zinfo_or_arcname.filename
130        expected_mode = extra_args.get("perms",
131                                       zinfo_or_arcname.external_attr >> 16)
132
133      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
134      common.ZipClose(zip_file)
135
136      self._verify(zip_file, zip_file_name, arcname, contents,
137                   expected_mode=expected_mode,
138                   expected_compress_type=expected_compress_type)
139    finally:
140      os.remove(zip_file_name)
141
142  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
143    extra_args = dict(extra_args or {})
144
145    zip_file = tempfile.NamedTemporaryFile(delete=False)
146    zip_file_name = zip_file.name
147
148    test_file = tempfile.NamedTemporaryFile(delete=False)
149    test_file_name = test_file.name
150
151    arcname_large = test_file_name
152    arcname_small = "bar"
153
154    # File names within an archive strip the leading slash.
155    if arcname_large[0] == "/":
156      arcname_large = arcname_large[1:]
157
158    zip_file.close()
159    zip_file = zipfile.ZipFile(zip_file_name, "w")
160
161    try:
162      test_file.write(large)
163      test_file.close()
164
165      expected_stat = os.stat(test_file_name)
166      expected_mode = 0o644
167      expected_compress_type = extra_args.get("compress_type",
168                                              zipfile.ZIP_STORED)
169      time.sleep(5)  # Make sure the atime/mtime will change measurably.
170
171      common.ZipWrite(zip_file, test_file_name, **extra_args)
172      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
173      common.ZipClose(zip_file)
174
175      # Verify the contents written by ZipWrite().
176      self._verify(zip_file, zip_file_name, arcname_large, large,
177                   test_file_name, expected_stat, expected_mode,
178                   expected_compress_type)
179
180      # Verify the contents written by ZipWriteStr().
181      self._verify(zip_file, zip_file_name, arcname_small, small,
182                   expected_compress_type=expected_compress_type)
183    finally:
184      os.remove(zip_file_name)
185      os.remove(test_file_name)
186
187  def _test_reset_ZIP64_LIMIT(self, func, *args):
188    default_limit = (1 << 31) - 1
189    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
190    func(*args)
191    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
192
193  def test_ZipWrite(self):
194    file_contents = os.urandom(1024)
195    self._test_ZipWrite(file_contents)
196
197  def test_ZipWrite_with_opts(self):
198    file_contents = os.urandom(1024)
199    self._test_ZipWrite(file_contents, {
200        "arcname": "foobar",
201        "perms": 0o777,
202        "compress_type": zipfile.ZIP_DEFLATED,
203    })
204    self._test_ZipWrite(file_contents, {
205        "arcname": "foobar",
206        "perms": 0o700,
207        "compress_type": zipfile.ZIP_STORED,
208    })
209
210  def test_ZipWrite_large_file(self):
211    file_contents = get_2gb_string()
212    self._test_ZipWrite(file_contents, {
213        "compress_type": zipfile.ZIP_DEFLATED,
214    })
215
216  def test_ZipWrite_resets_ZIP64_LIMIT(self):
217    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
218
219  def test_ZipWriteStr(self):
220    random_string = os.urandom(1024)
221    # Passing arcname
222    self._test_ZipWriteStr("foo", random_string)
223
224    # Passing zinfo
225    zinfo = zipfile.ZipInfo(filename="foo")
226    self._test_ZipWriteStr(zinfo, random_string)
227
228    # Timestamp in the zinfo should be overwritten.
229    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
230    self._test_ZipWriteStr(zinfo, random_string)
231
232  def test_ZipWriteStr_with_opts(self):
233    random_string = os.urandom(1024)
234    # Passing arcname
235    self._test_ZipWriteStr("foo", random_string, {
236        "perms": 0o700,
237        "compress_type": zipfile.ZIP_DEFLATED,
238    })
239    self._test_ZipWriteStr("bar", random_string, {
240        "compress_type": zipfile.ZIP_STORED,
241    })
242
243    # Passing zinfo
244    zinfo = zipfile.ZipInfo(filename="foo")
245    self._test_ZipWriteStr(zinfo, random_string, {
246        "compress_type": zipfile.ZIP_DEFLATED,
247    })
248    self._test_ZipWriteStr(zinfo, random_string, {
249        "perms": 0o600,
250        "compress_type": zipfile.ZIP_STORED,
251    })
252
253  def test_ZipWriteStr_large_file(self):
254    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
255    # the workaround. We will only test the case of writing a string into a
256    # large archive.
257    long_string = get_2gb_string()
258    short_string = os.urandom(1024)
259    self._test_ZipWriteStr_large_file(long_string, short_string, {
260        "compress_type": zipfile.ZIP_DEFLATED,
261    })
262
263  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
264    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
265    zinfo = zipfile.ZipInfo(filename="foo")
266    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
267
268  def test_bug21309935(self):
269    zip_file = tempfile.NamedTemporaryFile(delete=False)
270    zip_file_name = zip_file.name
271    zip_file.close()
272
273    try:
274      random_string = os.urandom(1024)
275      zip_file = zipfile.ZipFile(zip_file_name, "w")
276      # Default perms should be 0o644 when passing the filename.
277      common.ZipWriteStr(zip_file, "foo", random_string)
278      # Honor the specified perms.
279      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
280      # The perms in zinfo should be untouched.
281      zinfo = zipfile.ZipInfo(filename="baz")
282      zinfo.external_attr = 0o740 << 16
283      common.ZipWriteStr(zip_file, zinfo, random_string)
284      # Explicitly specified perms has the priority.
285      zinfo = zipfile.ZipInfo(filename="qux")
286      zinfo.external_attr = 0o700 << 16
287      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
288      common.ZipClose(zip_file)
289
290      self._verify(zip_file, zip_file_name, "foo", random_string,
291                   expected_mode=0o644)
292      self._verify(zip_file, zip_file_name, "bar", random_string,
293                   expected_mode=0o755)
294      self._verify(zip_file, zip_file_name, "baz", random_string,
295                   expected_mode=0o740)
296      self._verify(zip_file, zip_file_name, "qux", random_string,
297                   expected_mode=0o400)
298    finally:
299      os.remove(zip_file_name)
300
301class InstallRecoveryScriptFormatTest(unittest.TestCase):
302  """Check the format of install-recovery.sh
303
304  Its format should match between common.py and validate_target_files.py."""
305
306  def setUp(self):
307    self._tempdir = tempfile.mkdtemp()
308    # Create a dummy dict that contains the fstab info for boot&recovery.
309    self._info = {"fstab" : {}}
310    dummy_fstab = \
311        ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
312         "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
313    self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x),
314                                                   2, dummy_fstab)
315
316  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
317    loc = os.path.join(self._tempdir, prefix, name)
318    if not os.path.exists(os.path.dirname(loc)):
319      os.makedirs(os.path.dirname(loc))
320    with open(loc, "w+") as f:
321      f.write(data)
322
323  def test_full_recovery(self):
324    recovery_image = common.File("recovery.img", "recovery");
325    boot_image = common.File("boot.img", "boot");
326    self._info["full_recovery_image"] = "true"
327
328    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
329                             recovery_image, boot_image, self._info)
330    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
331                                                        self._info)
332
333  def test_recovery_from_boot(self):
334    recovery_image = common.File("recovery.img", "recovery");
335    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
336    boot_image = common.File("boot.img", "boot");
337    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
338
339    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
340                             recovery_image, boot_image, self._info)
341    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
342                                                        self._info)
343    # Validate 'recovery-from-boot' with bonus argument.
344    self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
345    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
346                             recovery_image, boot_image, self._info)
347    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
348                                                        self._info)
349
350  def tearDown(self):
351    shutil.rmtree(self._tempdir)
352