• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2#
3# Copyright (C) 2015 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Unit testing payload_info.py."""
19
20from __future__ import print_function
21
22import StringIO
23import collections
24import mock
25import sys
26import unittest
27
28import payload_info
29import update_payload
30
31from contextlib import contextmanager
32
33from update_payload import update_metadata_pb2
34
35class FakePayloadError(Exception):
36  """A generic error when using the FakePayload."""
37
38class FakeOption(object):
39  """Fake options object for testing."""
40
41  def __init__(self, **kwargs):
42    self.list_ops = False
43    self.stats = False
44    self.signatures = False
45    for key, val in kwargs.iteritems():
46      setattr(self, key, val)
47    if not hasattr(self, 'payload_file'):
48      self.payload_file = None
49
50class FakeOp(object):
51  """Fake manifest operation for testing."""
52
53  def __init__(self, src_extents, dst_extents, op_type, **kwargs):
54    self.src_extents = src_extents
55    self.dst_extents = dst_extents
56    self.type = op_type
57    for key, val in kwargs.iteritems():
58      setattr(self, key, val)
59
60  def HasField(self, field):
61    return hasattr(self, field)
62
63class FakePartition(object):
64  """Fake PartitionUpdate field for testing."""
65
66  def __init__(self, partition_name, operations):
67    self.partition_name = partition_name
68    self.operations = operations
69
70class FakeManifest(object):
71  """Fake manifest for testing."""
72
73  def __init__(self, major_version):
74    FakeExtent = collections.namedtuple('FakeExtent',
75                                        ['start_block', 'num_blocks'])
76    self.install_operations = [FakeOp([],
77                                      [FakeExtent(1, 1), FakeExtent(2, 2)],
78                                      update_payload.common.OpType.REPLACE_BZ,
79                                      dst_length=3*4096,
80                                      data_offset=1,
81                                      data_length=1)]
82    self.kernel_install_operations = [FakeOp(
83        [FakeExtent(1, 1)],
84        [FakeExtent(x, x) for x in xrange(20)],
85        update_payload.common.OpType.SOURCE_COPY,
86        src_length=4096)]
87    if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO:
88      self.partitions = [FakePartition('root', self.install_operations),
89                         FakePartition('kernel',
90                                       self.kernel_install_operations)]
91      self.install_operations = self.kernel_install_operations = []
92    self.block_size = 4096
93    self.minor_version = 4
94    FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
95    self.old_rootfs_info = FakePartInfo(1 * 4096)
96    self.old_kernel_info = FakePartInfo(2 * 4096)
97    self.new_rootfs_info = FakePartInfo(3 * 4096)
98    self.new_kernel_info = FakePartInfo(4 * 4096)
99    self.signatures_offset = None
100    self.signatures_size = None
101
102  def HasField(self, field_name):
103    """Fake HasField method based on the python members."""
104    return hasattr(self, field_name) and getattr(self, field_name) is not None
105
106class FakeHeader(object):
107  """Fake payload header for testing."""
108
109  def __init__(self, version, manifest_len, metadata_signature_len):
110    self.version = version
111    self.manifest_len = manifest_len
112    self.metadata_signature_len = metadata_signature_len
113
114  @property
115  def size(self):
116    return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS
117            else 24)
118
119class FakePayload(object):
120  """Fake payload for testing."""
121
122  def __init__(self, major_version):
123    self._header = FakeHeader(major_version, 222, 0)
124    self.header = None
125    self._manifest = FakeManifest(major_version)
126    self.manifest = None
127
128    self._blobs = {}
129    self._payload_signatures = update_metadata_pb2.Signatures()
130    self._metadata_signatures = update_metadata_pb2.Signatures()
131
132  def Init(self):
133    """Fake Init that sets header and manifest.
134
135    Failing to call Init() will not make header and manifest available to the
136    test.
137    """
138    self.header = self._header
139    self.manifest = self._manifest
140
141  def ReadDataBlob(self, offset, length):
142    """Return the blob that should be present at the offset location"""
143    if not offset in self._blobs:
144      raise FakePayloadError('Requested blob at unknown offset %d' % offset)
145    blob = self._blobs[offset]
146    if len(blob) != length:
147      raise FakePayloadError('Read blob with the wrong length (expect: %d, '
148                             'actual: %d)' % (len(blob), length))
149    return blob
150
151  @staticmethod
152  def _AddSignatureToProto(proto, **kwargs):
153    """Add a new Signature element to the passed proto."""
154    new_signature = proto.signatures.add()
155    for key, val in kwargs.iteritems():
156      setattr(new_signature, key, val)
157
158  def AddPayloadSignature(self, **kwargs):
159    self._AddSignatureToProto(self._payload_signatures, **kwargs)
160    blob = self._payload_signatures.SerializeToString()
161    self._manifest.signatures_offset = 1234
162    self._manifest.signatures_size = len(blob)
163    self._blobs[self._manifest.signatures_offset] = blob
164
165  def AddMetadataSignature(self, **kwargs):
166    self._AddSignatureToProto(self._metadata_signatures, **kwargs)
167    if self._header.metadata_signature_len:
168      del self._blobs[-self._header.metadata_signature_len]
169    blob = self._metadata_signatures.SerializeToString()
170    self._header.metadata_signature_len = len(blob)
171    self._blobs[-len(blob)] = blob
172
173class PayloadCommandTest(unittest.TestCase):
174  """Test class for our PayloadCommand class."""
175
176  @contextmanager
177  def OutputCapturer(self):
178    """A tool for capturing the sys.stdout"""
179    stdout = sys.stdout
180    try:
181      sys.stdout = StringIO.StringIO()
182      yield sys.stdout
183    finally:
184      sys.stdout = stdout
185
186  def TestCommand(self, payload_cmd, payload, expected_out):
187    """A tool for testing a payload command.
188
189    It tests that a payload command which runs with a given payload produces a
190    correct output.
191    """
192    with mock.patch.object(update_payload, 'Payload', return_value=payload), \
193         self.OutputCapturer() as output:
194      payload_cmd.Run()
195    self.assertEquals(output.getvalue(), expected_out)
196
197  def testDisplayValue(self):
198    """Verify that DisplayValue prints what we expect."""
199    with self.OutputCapturer() as output:
200      payload_info.DisplayValue('key', 'value')
201    self.assertEquals(output.getvalue(), 'key:                         value\n')
202
203  def testRun(self):
204    """Verify that Run parses and displays the payload like we expect."""
205    payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
206    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
207    expected_out = """Payload version:             1
208Manifest length:             222
209Number of operations:        1
210Number of kernel ops:        1
211Block size:                  4096
212Minor version:               4
213"""
214    self.TestCommand(payload_cmd, payload, expected_out)
215
216  def testListOpsOnVersion1(self):
217    """Verify that the --list_ops option gives the correct output."""
218    payload_cmd = payload_info.PayloadCommand(
219        FakeOption(list_ops=True, action='show'))
220    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
221    expected_out = """Payload version:             1
222Manifest length:             222
223Number of operations:        1
224Number of kernel ops:        1
225Block size:                  4096
226Minor version:               4
227
228Install operations:
229  0: REPLACE_BZ
230    Data offset: 1
231    Data length: 1
232    Destination: 2 extents (3 blocks)
233      (1,1) (2,2)
234Kernel install operations:
235  0: SOURCE_COPY
236    Source: 1 extent (1 block)
237      (1,1)
238    Destination: 20 extents (190 blocks)
239      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
240      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
241"""
242    self.TestCommand(payload_cmd, payload, expected_out)
243
244  def testListOpsOnVersion2(self):
245    """Verify that the --list_ops option gives the correct output."""
246    payload_cmd = payload_info.PayloadCommand(
247        FakeOption(list_ops=True, action='show'))
248    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
249    expected_out = """Payload version:             2
250Manifest length:             222
251Number of partitions:        2
252  Number of "root" ops:      1
253  Number of "kernel" ops:    1
254Block size:                  4096
255Minor version:               4
256
257root install operations:
258  0: REPLACE_BZ
259    Data offset: 1
260    Data length: 1
261    Destination: 2 extents (3 blocks)
262      (1,1) (2,2)
263kernel install operations:
264  0: SOURCE_COPY
265    Source: 1 extent (1 block)
266      (1,1)
267    Destination: 20 extents (190 blocks)
268      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
269      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
270"""
271    self.TestCommand(payload_cmd, payload, expected_out)
272
273  def testStatsOnVersion1(self):
274    """Verify that the --stats option works correctly."""
275    payload_cmd = payload_info.PayloadCommand(
276        FakeOption(stats=True, action='show'))
277    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
278    expected_out = """Payload version:             1
279Manifest length:             222
280Number of operations:        1
281Number of kernel ops:        1
282Block size:                  4096
283Minor version:               4
284Blocks read:                 11
285Blocks written:              193
286Seeks when writing:          18
287"""
288    self.TestCommand(payload_cmd, payload, expected_out)
289
290  def testStatsOnVersion2(self):
291    """Verify that the --stats option works correctly on version 2."""
292    payload_cmd = payload_info.PayloadCommand(
293        FakeOption(stats=True, action='show'))
294    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
295    expected_out = """Payload version:             2
296Manifest length:             222
297Number of partitions:        2
298  Number of "root" ops:      1
299  Number of "kernel" ops:    1
300Block size:                  4096
301Minor version:               4
302Blocks read:                 11
303Blocks written:              193
304Seeks when writing:          18
305"""
306    self.TestCommand(payload_cmd, payload, expected_out)
307
308  def testEmptySignatures(self):
309    """Verify that the --signatures option works with unsigned payloads."""
310    payload_cmd = payload_info.PayloadCommand(
311        FakeOption(action='show', signatures=True))
312    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
313    expected_out = """Payload version:             1
314Manifest length:             222
315Number of operations:        1
316Number of kernel ops:        1
317Block size:                  4096
318Minor version:               4
319No metadata signatures stored in the payload
320No payload signatures stored in the payload
321"""
322    self.TestCommand(payload_cmd, payload, expected_out)
323
324  def testSignatures(self):
325    """Verify that the --signatures option shows the present signatures."""
326    payload_cmd = payload_info.PayloadCommand(
327        FakeOption(action='show', signatures=True))
328    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
329    payload.AddPayloadSignature(version=1,
330                                data='12345678abcdefgh\x00\x01\x02\x03')
331    payload.AddPayloadSignature(data='I am a signature so access is yes.')
332    payload.AddMetadataSignature(data='\x00\x0a\x0c')
333    expected_out = """Payload version:             2
334Manifest length:             222
335Number of partitions:        2
336  Number of "root" ops:      1
337  Number of "kernel" ops:    1
338Block size:                  4096
339Minor version:               4
340Metadata signatures blob:    file_offset=246 (7 bytes)
341Metadata signatures: (1 entries)
342  version=None, hex_data: (3 bytes)
343    00 0a 0c                                        | ...
344Payload signatures blob:     blob_offset=1234 (64 bytes)
345Payload signatures: (2 entries)
346  version=1, hex_data: (20 bytes)
347    31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
348    00 01 02 03                                     | ....
349  version=None, hex_data: (34 bytes)
350    49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
351    20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye
352    73 2e                                           | s.
353"""
354    self.TestCommand(payload_cmd, payload, expected_out)
355
356if __name__ == '__main__':
357  unittest.main()
358